【Linux】线程同步与互斥 - 2(线程同步/条件变量/基于阻塞/环形队列的cp模型/线程池/线程安全/读写锁)

发布时间:2026/6/26 4:33:06

【Linux】线程同步与互斥 - 2(线程同步/条件变量/基于阻塞/环形队列的cp模型/线程池/线程安全/读写锁) 目录同步的概念同步与互斥的关系条件变量生产者-消费者模型基于BlockingQueue的生产者消费者模型信号量基于环形队列的生产消费模型线程池STL,智能指针和线程安全单例模式的线程安全同步的概念同步是保证数据安全的情况下互斥的前提下让线程访问资源具有一定的顺序性。比如多个线程访问同一个资源如果该线程申请锁失败那么它只能在该资源的申请队列的末尾排队等待。既然线程都在排队等待了那为什么还需要锁呢这是因为线程不是乖乖的在队列的末尾排队等待而是直接去申请锁失败了再排队。同步与互斥的关系互斥可以用互斥锁实现但是互斥也有互斥的问题比如调度不均衡竞争不均衡。避免这些问题可以采用一些策略比如同步。即互斥是同步的基础同步是互斥的扩展。互斥可以看作是最简单的同步在纯互斥的场景下哪个线程能申请到锁是不可预见的而在互斥 同步的场景下线程是按照一定顺序申请锁执行临界区代码的线程的顺序是可以预见的。并发控制 ├── 互斥 (Mutual Exclusion) │ ├── 解决同时访问的问题 │ ├── 工具互斥锁、读写锁、信号量 │ └── 结果数据一致性 │ └── 同步 (Synchronization) ├── 解决先后顺序的问题 ├── 工具条件变量、信号量、屏障 └── 结果执行顺序可控条件变量当一个线程在访问完共享资源时在它释放锁之后要敲一下“铃铛”目的是让下一个准备访问该共享资源的线程知道可以访问了。这个“铃铛”和等待队列就叫做“条件变量”条件变量的相关函数条件变量初始化/销毁 - pthread_cond_init/destroyint pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 参数 cond要初始化的条件变量 attrNULL 与互斥锁相同静态全局的条件变量不需要使用 pthread_cond_init/destroy 来初始化/销毁 pthread_cond_t cond PTHREAD_COND_INITIALIZER; int pthread_cond_destroy(pthread_cond_t *cond)线程加入等待队列 - pthread_cond_waitint pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 参数 cond要在这个条件变量上等待 mutex互斥锁线程被唤醒后会从pthread_cond_wait函数中返回并继续执行后续的代码注意在使用时我们通常是这样使用的// ... pthread_mutex_lock(mutex); // 先申请锁 // 在锁的保护下 while(资源未就绪) pthread_cond_wait(cond,mutex); // 让线程去等待队列等待 // 条件满足、被唤醒 { // ... 临界区 } pthread_mutex_unlock(mutex); // ...为什么 pthread_cond_wait 需要传递互斥锁在 if 判断中可能也会访问临界资源所以while 循环判断必须在 pthread_mutex_lock 之后如果条件不满足线程在等待之前又必须释放锁。线程在被唤醒之后在执行临界区代码之前又必须持有锁。这是该函数需要传递互斥锁的一方面。另一方面假如pthread_cond_wait 不需要传递互斥锁在while 循环中调用pthread_mutex_unlockpthread_mutex_lock(mutex); while(资源未就绪) { pthread_mutex_unlock(mutex); // 解锁 // 问题解锁和等待之间有时间窗口 // 假如此时该线程被切走了 // 另一个线程拿到锁修改共享数据使条件满足调用 pthread_cond_signal // 该线程继续执行 // 调用 pthread_cond_wait pthread_cond_wait(cond); // 然后等待 // 问题signal 已经发送过了线程A永远收不到信号 }那么整个 while 循环判断就不会是原子的会有线程安全问题。pthread_cond_wait 函数内部用某种方式将 1、把当前线程加入cond 的等待队列 2 、释放mutex 3、标记线程为等待状态 设计为原子的唤醒等待队列的线程 - pthread_cond_signal/broadcast假如所有线程都加入等待队列了如果没有人唤醒那么它们将一直阻塞等待。通常可以让主线程来控制唤醒线程的逻辑。int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);特性pthread_cond_signalpthread_cond_broadcast唤醒数量唤醒至少一个等待线程唤醒所有等待线程具体唤醒谁由调度策略决定通常是优先级最高的对头所有线程都被唤醒适用场景资源可用只需一个消费者状态变化所有等待者都需处理竞争激烈程度低只有一个线程被唤醒高所有线程被唤醒争夺锁使用实例让 5 个线程按顺序的对一个全局变量 cnt 进行 操作。#include iostream #include unistd.h #include pthread.h int cnt 0; pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond PTHREAD_COND_INITIALIZER; void *Count(void * args) { pthread_detach(pthread_self()); uint64_t number (uint64_t)args; std::cout pthread: number create success std::endl; while(true) { pthread_mutex_lock(mutex); pthread_cond_wait(cond, mutex); //pthread_cond_wait让线程等待的时候会自动释放锁 std::cout pthread: number cnt: cnt std::endl; pthread_mutex_unlock(mutex); } } int main() { // uint64_tunsigned long long,防止等下强制转换为 64 位指针时出现警告 for(uint64_t i 0; i 5; i) { pthread_t tid; pthread_create(tid, nullptr, Count, (void*)i); usleep(1000); // 保证等待队列的顺序就是创建顺序 } sleep(3); // 确保都加入了等待队列 std::cout main thread ctrl begin: std::endl; while(true) { sleep(1); // pthread_cond_signal(cond); //唤醒在cond的等待队列中等待的一个线程默认都是第一个 pthread_cond_broadcast(cond); //唤醒所有 std::cout signal one thread... std::endl; } return 0; }生产者-消费者模型模型简介生产者-消费者模型consumer - producter model简称 cp 模型是并发编程中最经典的同步问题它描述了两类线程生产者和消费者如何共享一个固定大小的缓冲区生产者和消费者的核心约束缓冲区满生产者不能向满的缓冲区放入数据缓冲区空消费者不能从空的缓冲区取出数据互斥访问同一时刻只能有一个线程操作缓冲区缓冲区存在的意义cp 模型的优点1、支持生产和消费的速率不同(生产者不用关心消费者的消费速率只需关心缓冲区是否有空间消费者也不关心生产者的生产速率只关心缓冲区是否有数据2、实现生产和消费的解耦(生产者生产完数据之后不用等待消费者处理直接把数据放入缓冲区消费者直接找生产者要数据而是从缓冲区里取)生产者和消费者的同步与互斥因为缓冲区其实是共享资源多个生产者和消费者同时访问缓冲区必然存在并发问题。所以生产者和生产者之间是互斥关系消费者与消费者之间是互斥关系。生产者不能一直向缓冲区生产导致消费者饥饿所以生产者和消费者之间是同步关系。生产者-消费者模型为什么是高效的生产者生产的数据一定是从外部获取的比如网络、用户、其他线程而消费者在缓冲区拿到数据之后要对数据做加工处理。生产者-消费者模型之所以是高效的是因为生产者从外部获取数据时消费者可能正在从缓冲区拿数据并作加工处理消费者在作加工处理时生产者可能正在从从外部获取数据并把数据放入缓冲区。既然在某一时刻只能由一个生产者生产数据或一个消费者消费数据那为什么还要多生产者和多消费者呢该模型高效的地方不在于对缓冲区拿放的过程而在于在某一时刻可以有多个生产者同时从外部获取数据也可以有多个消费者在消费数据基于BlockingQueue的生产者消费者模型生产者-消费者模型中缓冲区可以是特定的数据结构常见的是阻塞队列(Blocking Queue)其与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出。下面采用C queue模拟阻塞队列的生产消费模型。BlockQueue.hpp 阻塞队列的实现#pragma once #include iostream #include queue #include pthread.h template class T class BlockQueue { static const int defalutnum 20; public: BlockQueue(int maxcap defalutnum) :maxcap_(maxcap) { pthread_mutex_init(mutex_, nullptr); pthread_cond_init(c_cond_, nullptr); pthread_cond_init(p_cond_, nullptr); // low_water_ maxcap_/3; // high_water_ (maxcap_*2)/3; } // 谁来唤醒呢 T pop() { pthread_mutex_lock(mutex_); while (q_.size() 0) { // 如果线程wait时被误唤醒了呢 pthread_cond_wait(c_cond_, mutex_); } T out q_.front(); q_.pop(); // if(q_.size()low_water_) pthread_cond_signal(p_cond_); pthread_cond_signal(p_cond_); // pthread_cond_broadcast pthread_mutex_unlock(mutex_); return out; } void push(const T in) { pthread_mutex_lock(mutex_); while (q_.size() maxcap_) { // 做到防止线程被伪唤醒的情况 // 伪唤醒情况 pthread_cond_wait(p_cond_, mutex_); //1. 调用的时候自动释放锁 2. } // 1. 队列没满 2.被唤醒 q_.push(in); // if(q_.size() high_water_) pthread_cond_signal(c_cond_); pthread_cond_signal(c_cond_); pthread_mutex_unlock(mutex_); } ~BlockQueue() { pthread_mutex_destroy(mutex_); pthread_cond_destroy(c_cond_); pthread_cond_destroy(p_cond_); } private: std::queueT q_; //int mincap_; int maxcap_; // 极值 pthread_mutex_t mutex_; pthread_cond_t c_cond_; pthread_cond_t p_cond_; // int low_water_; // int high_water_; };在多生产者和多消费者的情况下可能出现伪唤醒的情况情景再现假设现在阻塞队列以及满了消费者在消费一个数据之后阻塞队列只有一个空位但是由于某种原因消费者唤醒了多个生产者多个生产者开始同时竞争申请锁一个生产者竞争成功并向阻塞队列放入数据并释放锁之后另一个生产者不小心又申请到了锁又向阻塞队列放入数据这种情况就叫做伪唤醒。解决方法就是判断阻塞队列是否有空位或为空采用 while 循环判断。生产者生产的数据和消费者消费的数据用一个类来模拟一个类对象代表一个简单的运算任务。生产者随机生产数据模拟从外部获取的不同数据。tesk.hpp:#pragma once #include iostream #include string std::string opers -*/%; // 错误码 enum { DivZero 1, ModZero, Unknown }; class Task { public: Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) { } void run() { switch (oper_) { case : result_ data1_ data2_; break; case -: result_ data1_ - data2_; break; case *: result_ data1_ * data2_; break; case /: { if (data2_ 0) exitcode_ DivZero; else result_ data1_ / data2_; } break; case %: { if (data2_ 0) exitcode_ ModZero; else result_ data1_ % data2_; } break; default: exitcode_ Unknown; break; } } // 方便消费者调用任务t() - t.run() void operator ()() { run(); } // 获取任务结果 std::string GetResult() { std::string r std::to_string(data1_); r oper_; r std::to_string(data2_); r ; r std::to_string(result_); r [code: ; r std::to_string(exitcode_); r ]; return r; } // 打印任务内容 std::string GetTask() { std::string r std::to_string(data1_); r oper_; r std::to_string(data2_); r ?; return r; } ~Task() { } private: int data1_; int data2_; char oper_; int result_; int exitcode_; };main 函数主线程用来创建多生产者和多消费者让生产者能够生产数据消费者能够消费数据main.cpp:#define _CRT_SECURE_NO_WARNINGS 1 #include BlockQueue.hpp #include Task.hpp #include unistd.h #include ctime void* Consumer(void* args) { BlockQueueTask* bq static_castBlockQueueTask *(args); while (true) { // 消费 Task t bq-pop(); // 计算 // t.run(); t(); std::cout 处理任务: t.GetTask() 运算结果是 t.GetResult() thread id: pthread_self() std::endl; // t.run(); // sleep(1); } } void* Productor(void* args) { int len opers.size(); BlockQueueTask* bq static_castBlockQueueTask *(args); while (true) { // 模拟生产者生产数据 int data1 rand() % 10 1; // [1,10] usleep(10); int data2 rand() % 10; char op opers[rand() % len]; Task t(data1, data2, op); // 生产 bq-push(t); std::cout 生产了一个任务: t.GetTask() thread id: pthread_self() std::endl; sleep(1); } } int main() { srand(time(nullptr)); BlockQueueTask* bq new BlockQueueTask(); pthread_t c[3], p[5]; for (int i 0; i 3; i) { pthread_create(c i, nullptr, Consumer, bq); } for (int i 0; i 5; i) { pthread_create(p i, nullptr, Productor, bq); } for (int i 0; i 3; i) { pthread_join(c[i], nullptr); } for (int i 0; i 5; i) { pthread_join(p[i], nullptr); } delete bq; return 0; }信号量上面基于BlockingQueue的生产者消费者模型中将阻塞队列看成一个整体在某时刻只允许最多 1 个生产者或消费者访问阻塞队列。现在将阻塞队列看成多份每个生产者/消费者都只能访问该阻塞队列的特定部分。使用信号量对阻塞队列的各部分做管理记录阻塞队列的空闲部分数量。信号量会保证 PV 操作是原子的。在 P 操作和 V 操作之间不需要像条件变量那样对资源是否就绪做判断而是直接使用资源因为 P 操作本身就在做判断如果仍有空闲部分往下执行否则在信号量的等待队列等待。相关函数初始化/销毁信号量 - sem_init/destroy#include semaphore.h int sem_init(sem_t *sem, int pshared, unsigned int value); 参数 sem_t *sem:定义的信号量的地址 pshared:0表示线程间共享非零表示进程间共享默认为 0 即可 value信号量初始值 int sem_destroy(sem_t *sem);等待/发布信号量 sem_wait/post功能等待信号量会将信号量的值减1 int sem_wait(sem_t *sem); //P() 功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。 int sem_post(sem_t *sem);//V()基于环形队列的生产消费模型生产者-消费者模型中缓冲区可以是特定的数据结构现在采用环形队列作为缓冲区。环形队列如何判空和判满呢有两种方法1、用一个计数器记录空闲位置。2、牺牲一个位置,如果 head tail 表示空如果 (head 1)%capacity tail 表示已满。现采用第三种方法使用两个信号量一个信号量表示环形队列还有有多少空间资源生产者的信号量初始为环形队列的总容量另一个信号量表示环形队列还有有多少数据资源消费者的信号量初始为0。开始的时候只有生产者可以执行 P 操作生产者执行 V 操作之后消费者才能执行 P 操作。用两个变量分别记录生产者和消费者的当前位置。由于生产者和消费者对应的下标只有一个如果要实现多生产者和多消费者情况有点复杂所以先实现单生产者单消费者。如果要实现多生产者和多消费者由于环形队列可以支持同时有一个生产者生产数据和一个消费者消费数据所以生产者和消费者应该分别使用两把锁与阻塞队列的 cp 模型不同阻塞队列的生产者和消费者共用一把锁即某时刻只能有一个生产者或消费者在操作阻塞队列。应该先申请信号量再申请锁。先申请信号量是资源的预定对资源的使用是申请锁成功之后。从技术角度讲信号量内部的实现是原子的不需要锁保护。从逻辑角度讲这样做可以支持线程在使用资源的同时其他线程可以申请信号量提前预定提高并发度。RIngQueue.hpp:#pragma once #include iostream #include vector #include semaphore.h #include pthread.h const static int defaultcap 5; templateclass T class RingQueue{ private: void P(sem_t sem) // 一定要加引用 { sem_wait(sem); } void V(sem_t sem)// 一定要加引用 { sem_post(sem); } void Lock(pthread_mutex_t mutex) // 一定要加引用 { pthread_mutex_lock(mutex); } void Unlock(pthread_mutex_t mutex) // 一定要加引用 { pthread_mutex_unlock(mutex); } public: RingQueue(int cap defaultcap) :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0) { sem_init(cdata_sem_, 0, 0); sem_init(pspace_sem_, 0, cap); pthread_mutex_init(c_mutex_, nullptr); pthread_mutex_init(p_mutex_, nullptr); } void Push(const T in) // 生产 { P(pspace_sem_); Lock(p_mutex_); // ? ringqueue_[p_step_] in; // 位置后移维持环形特性 p_step_; p_step_ % cap_; Unlock(p_mutex_); V(cdata_sem_); } void Pop(T *out) // 消费 { P(cdata_sem_); Lock(c_mutex_); // ? *out ringqueue_[c_step_]; // 位置后移维持环形特性 c_step_; c_step_ % cap_; Unlock(c_mutex_); V(pspace_sem_); } ~RingQueue() { sem_destroy(cdata_sem_); sem_destroy(pspace_sem_); pthread_mutex_destroy(c_mutex_); pthread_mutex_destroy(p_mutex_); } private: std::vectorT ringqueue_; int cap_; int c_step_; // 消费者下标 int p_step_; // 生产者下标 sem_t cdata_sem_; // 消费者关注的数据资源 sem_t pspace_sem_; // 生产者关注的空间资源 pthread_mutex_t c_mutex_; pthread_mutex_t p_mutex_; };Tesk.hpp、main.cpp 的大致内容与阻塞队列的相同。线程池线程池是一种线程使用模式。线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的时间代价。线程池不仅能够保证内核的充分利用还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。线程池的应用场景1.需要大量的线程来完成任务且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。因为单个任务小而任务数量巨大你可以想象一个热门网站的点击次数。 但对于长时间的任务比如一个Telnet连接请求线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。2. 对性能要求苛刻的应用比如要求服务器迅速响应客户请求。3.接受突发性的大量请求但不至于使服务器因此产生大量线程的应用。突发性大量客户请求在没有线程池情况下将产生大量线程虽然理论上大部分操作系统线程数目最大值不是问题短时间内产生大量线程可能使内存到达极限出现错误.线程池示例在类内创建固定数量线程池循环从任务队列中获取任务对象。由于线程要执行的函数是在类内创建的那么它就是类的成员函数类的成员函数第一个参数默认是 this 指针而线程要执行的函数必须参数是 void* 返回值是 void* 的所以将线程要执行的函数在类内声明为静态的静态成员函数没有 this 指针。但是该静态函数会访问类的成员比如任务队列所以应该给 pthread_create 函数传递 this 指针。获取到任务对象后执行任务对象中的任务接口#pragma once #include pthread.h #include string #include vector #include queue #include iostream const int defalutnum 5; struct Thread_info { std::string name; pthread_t tid; }; templateclass T class Thread_pool { public: void lock() { pthread_mutex_lock(_mutex); } void unlock() { pthread_mutex_unlock(_mutex); } void sleep() { pthread_cond_wait(_cond,_mutex); } void wakeup() { pthread_cond_signal(_cond); } bool is_empty() { return _tesks.empty(); } const std::string Get_Thread_Name(pthread_t _tid) { for(const auto i : threads) { if(i.tid _tid) return i.name; } return None; } static void* handler(void* args) { Thread_pool* tp static_castThread_pool*(args); const std::string name tp-Get_Thread_Name(pthread_self()); while(true) { tp-lock(); while(tp-is_empty()) {tp-sleep();} T t tp-pop(); tp-unlock(); t(); std::cout name run , result: t.GetResult() std::endl; } return nullptr; } void start() { for(int i 0; i defalutnum; i) { threads[i].name thread- std::to_string(i 1); pthread_create((threads[i].tid),nullptr,handler,this); } } void push(const T t) { lock(); _tesks.push(t); wakeup(); unlock(); } T pop() { T t _tesks.front(); _tesks.pop(); return t; } static Thread_pool* GetInstance() { if(_tp nullptr) { pthread_mutex_lock(_lock); if(_tp nullptr) {_tp new Thread_poolT();} pthread_mutex_unlock(_lock); } return _tp; } private: Thread_pool(int num defalutnum):threads(num) { pthread_mutex_init(_mutex,nullptr); pthread_cond_init(_cond,nullptr); } ~Thread_pool() { pthread_mutex_destroy(_mutex); pthread_mutex_destroy(_lock); pthread_cond_destroy(_cond); } Thread_pool(const Thread_poolT) delete; const Thread_poolT operator(const Thread_poolT) delete; std::vectorThread_info threads; std::queueT _tesks; pthread_mutex_t _mutex; pthread_cond_t _cond; static Thread_poolT* _tp; static pthread_mutex_t _lock; }; templateclass T Thread_poolT* Thread_poolT::_tp nullptr; templateclass T pthread_mutex_t Thread_poolT::_lock PTHREAD_MUTEX_INITIALIZER;STL,智能指针和线程安全STL中的容器是否是线程安全的?不是.原因是,STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.智能指针是否是线程安全的?对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.单例模式的线程安全懒汉方式实现单例模式template typename T class Singleton { static T* inst; public: static T* GetInstance() { if (inst NULL) { inst new T(); } return inst; } };存在一个严重的问题, 线程不安全.第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例// 懒汉模式, 线程安全 template typename T class Singleton { volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化. static std::mutex lock; public: static T* GetInstance() { if (inst NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能. lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new. if (inst NULL) { inst new T(); } lock.unlock(); } return inst; } };注意事项:1.双重 if 判定, 避免不必要的锁竞争第一次调用 GetInstance所有线程都判断 inst NULL 为真都竞争申请锁。往后调用 GetInstance所有线程都判断 inst NULL 为假直接返回不用再重复竞争申请锁。2. 加锁解锁的位置3. volatile关键字防止过度优化

相关新闻