无锁队列【C++】

发布时间:2026/5/20 2:55:33

无锁队列【C++】 文章目录概要实现一个ringbuffer--wait-free设计mpsc队列小结概要有锁队列通过互斥锁或其他同步机制保证线程安全的队列阻塞无锁队列通过原子操作来实现线程安全的队列非阻塞锁的局限①线程阻塞、线程切换导致的资源浪费②存在死锁的风险③在高并发下锁竞争激烈导致吞吐量下降lock-free和wait-free的区别①无锁至少一个线程成功其他可能重试依赖casacq_rel等原子操作有锁只有一个线程成功②无等待所有线程必成功无重试依赖exchange等原子操作volatile:①防止编译期优化②确保内存可见性(同步性拿最新的数据不经过缓存直接读写到主存)内存屏障 std::atomic_thread_fence①防止指令重排序顺序性②保证内存可见性volatile、内存屏障、原子操作之间的关系原子操作解决了操作的原子性一个原子操作中内部会含有多条代码原子操作的原子性保证了其他线程只能看到这个原子操作执行前后的状态不能看到原子操作执行的过程。同时C的原子操作可以指定内存序保证了可见性和顺序型可见性意味着变量是直接往主存进行读写其他核心能及时拿到最新值顺序性指定了CPU和编译器对代码优化的方向比如release前的操作不能优化到release后accquire后的操作不能优化到acquire前。volatile只解决可见性的问题。内存屏障保证了顺序性和可见性。纵轴: 内存序横轴: 操作可见性顺序性relaxed读/写仅保证原子性和修改顺序一致性 (在一个原子变量内部)无任何顺序保证acquire读 (Load)确保能看到由release操作释放的所有数据的最新值阻止本acquire读操作与其后续操作重排序release写 (Store)确保在此操作之前的所有写入对匹配的acquire操作都可见阻止本release写操作与其之前的操作重排序acq_rel读-改-写兼具acquire和release的可见性兼具acquire和release的顺序性seq_cst读/写所有线程看到一个全局统一的操作顺序所有seq_cst操作都遵循这个全局总顺序实现一个ringbuffer–wait-free基于数组的循环队列实现一个spsc单生产者单消费者队列。分析固定大小的数组头尾索引存储任意类型POD类型C的结构体和非POD有虚函数、含有非POD类型成员、自定义了构造函数等特殊成员函数类型模板类std::aligned_storage_tsizeof(T),alignof(T)buffer_[];不调用T的构造函数也不管T是不是POD类型将内存的分配和对象的构造/析构过程解耦。placement_newT*objnew(buffer_)T(args...);// 在 buffer_ 的地址上构造 T 类型的对象FIFO类型提供push(对应tail)判断满和pop(对应pop)判断空SPSC场景下线程安全性能优化内存对齐避免伪共享CPU通过缓存行为单位加载数据线程A和或线程B修改同一缓存行为什么拿到同一缓存行中没有数据竞争的的数据CPU也会通过缓存一致性协议将脏缓存同步到其他核心频繁的缓存行同步会导致性能下降。根据MESI缓存一致性协议core1修改变量A导致core1中该cacheline的变为Modifiedcore2中的同一个cacheline也已经过时了标记为Invalid。当core2需要访问B时由于其缓存中的cacheline已经失效所以必须暂停自己的操作向内存发出请求重新获取缓存消耗大量的时间。如果 Core 1 和 Core 2 频繁地交替写入 A 和 B就会导致这个 Cache Line 在两个核心之间来回“弹跳”伴随着大量的缓存失效和重新加载造成巨大的性能损耗。伪共享两个核心并没有共享一个变量而是在各自操作A和B但是由于A和B在内存中太近导致了不必要的缓存同步。cacheline当 CPU 核心需要读取或写入一个特定的内存地址比如 0x1005时它不会只去取这一个字节。相反它会计算出该地址属于哪个 Cache Line然后CPU 会将整个cacheline的数据作为一个整体从主内存加载到自己的 L1 缓存中。通过内存对齐将没有数据竞争的A和B放在不同的缓存行即可。alignas(64)std::size_t read_;alignas(64)std::size_t write_;内存对齐规则某个类型type的变量的地址 addr 必须是该类型大小 sizeof(type) 的整数倍。结构体的大小还必须是其自身对齐要求最大的倍数。首尾相接实现对数组长度取余将除法运算优化为位运算将capacity指定为2的n次幂next(crr1)(capacity-1);//next (crr 1) % capacity通过竞态断言的告诉编译器这种优化设置public:static_assert(Capacity!(Capacity(Capacity-1)),capacity必须是2的n次幂;完美转发先来了解什么是万能引用形如templatetypenameTvoidfunc(Tparam);// param 是一个万能引用它接受左值也接受右值常常配合std::foward实现完美转发构造时进行拷贝构造还是移动构造都接受。#pragmaonce#includeatomic#includetype_traitstemplatetypenameT,std::size_t CapacityclassRingBuffer{public:static_assert(Capacity!(Capacity(Capacity-1)),Capacity must be power of 2);RingBuffer():read_(0),write_(0){}~RingBuffer(){//只要进行了线程切换,relaxed都具备可见性std::size_t rread_.load(std::memory_order_relaxed);std::size_t wwrite_.load(std::memory_order_relaxed);while(r!w){reinterpret_castT*(buffer_[r])-~T();r(r1)(Capacity-1);}}templatetypenameUboolPush(Uvalue){//只有这一个任务一个线程会去修改writeconststd::size_t wwrite_.load(std::memory_order_relaxed);conststd::size_t next_w(w1)(Capacity-1);// 检查缓冲区是否满if(next_wread_.load(std::memory_order_acquire)){returnfalse;}// 使用 placement new 在 buffer_ 中构造对象new(buffer_w)T(std::forwardU(value));write_.store(next_w,std::memory_order_release);returntrue;}boolPop(Tvalue){conststd::size_t rread_.load(std::memory_order_acquire);// 检查缓冲区是否空if(rwrite_.load()){returnfalse;}// 取出元素并析构valuestd::move(*reinterpret_castT*(buffer_[r]));reinterpret_castT*(buffer_[r])-~T();read_.store((r1)(Capacity-1),std::memory_order_release);returntrue;}std::size_tSize()const{//const修饰this指针conststd::size_t rread_.load(std::memory_order_acquire);conststd::size_t wwrite_.load(std::memory_order_acquire);return(wr)?(w-r):(Capacity-rw);}private:// cache line 64Balignas(64)std::atomicstd::size_tread_;alignas(64)std::atomicstd::size_twrite_;alignas(64)std::aligned_storage_tsizeof(T),alignof(T)buffer_[Capacity];// 支持 pod 和 非 pod 类型};设计mpsc队列#ifndef_MARK_MPSC_QUEUE_H#define_MARK_MPSC_QUEUE_H#includeatomic#includeutilitytemplatetypenameTclassMPSCQueueNonIntrusive{public:MPSCQueueNonIntrusive():_head(newNode()),_tail(_head.load(std::memory_order_relaxed)){Node*front_head.load(std::memory_order_relaxed);front-Next.store(nullptr,std::memory_order_relaxed);}~MPSCQueueNonIntrusive(){T*output;while(Dequeue(output))deleteoutput;Node*front_head.load(std::memory_order_relaxed);deletefront;}// wait-freevoidEnqueue(T*input){Node*nodenewNode(input);Node*prevHead_head.exchange(node,std::memory_order_acq_rel);prevHead-Next.store(node,std::memory_order_release);}boolDequeue(T*result){Node*tail_tail.load(std::memory_order_relaxed);Node*nexttail-Next.load(std::memory_order_acquire);if(!next)returnfalse;resultnext-Data;_tail.store(next,std::memory_order_release);deletetail;returntrue;}private:structNode{Node()default;explicitNode(T*data):Data(data){Next.store(nullptr,std::memory_order_relaxed);}T*Data;std::atomicNode*Next;};std::atomicNode*_head;std::atomicNode*_tail;MPSCQueueNonIntrusive(MPSCQueueNonIntrusiveconst)delete;MPSCQueueNonIntrusiveoperator(MPSCQueueNonIntrusiveconst)delete;};templatetypenameT,std::atomicT*T::*IntrusiveLinkclassMPSCQueueIntrusive{public:MPSCQueueIntrusive():_dummyPtr(reinterpret_castT*(std::addressof(_dummy))),_head(_dummyPtr),_tail(_dummyPtr){// _dummy is constructed from aligned_storage and is intentionally left uninitialized (it might not be default constructible)// so we init only its IntrusiveLink herestd::atomicT**dummyNextnew((_dummyPtr-*IntrusiveLink))std::atomicT*();dummyNext-store(nullptr,std::memory_order_relaxed);}~MPSCQueueIntrusive(){T*output;while(Dequeue(output))deleteoutput;}voidEnqueue(T*input){(input-*IntrusiveLink).store(nullptr,std::memory_order_release);T*prevHead_head.exchange(input,std::memory_order_acq_rel);(prevHead-*IntrusiveLink).store(input,std::memory_order_release);}boolDequeue(T*result){T*tail_tail.load(std::memory_order_relaxed);T*next(tail-*IntrusiveLink).load(std::memory_order_acquire);if(tail_dummyPtr){if(!next)returnfalse;_tail.store(next,std::memory_order_release);tailnext;next(next-*IntrusiveLink).load(std::memory_order_acquire);}if(next){_tail.store(next,std::memory_order_release);resulttail;returntrue;}T*head_head.load(std::memory_order_acquire);if(tail!head)returnfalse;Enqueue(_dummyPtr);next(tail-*IntrusiveLink).load(std::memory_order_acquire);if(next){_tail.store(next,std::memory_order_release);resulttail;returntrue;}returnfalse;}private:std::aligned_storage_tsizeof(T),alignof(T)_dummy;T*_dummyPtr;std::atomicT*_head;std::atomicT*_tail;MPSCQueueIntrusive(MPSCQueueIntrusiveconst)delete;MPSCQueueIntrusiveoperator(MPSCQueueIntrusiveconst)delete;};templatetypenameT,std::atomicT*T::*IntrusiveLinknullptrusingMPSCQueuestd::conditional_tIntrusiveLink!nullptr,MPSCQueueIntrusiveT,IntrusiveLink,MPSCQueueNonIntrusiveT;#endif// MPSCQueue_h__小结在多生产者场景下入队列的三个操作:①判断是不是满②填数据③设置指针怎么能保证是一个原子操作做法是让head指针先移动若满直接返回不满写入数据写完后移动tailheadtail再次重合。https://github.com/0voice

相关新闻