shared_future

发布时间:2026/6/30 20:27:57

shared_future 若需要使用一个线程的结果让多个线程获取呢可以使用shared_future可多次调用get()与std::future不同shared_future的get()可多次调用线程安全多个线程可同时调用get()但返回引用类型时要小心数据竞争异常传播异常会被存储每次get()都会重新抛出生命周期共享状态由所有副本共同管理最后一个副本销毁时释放资源复制廉价复制操作只增加引用计数适合传递到多个线程值语义优先尽量返回值类型而非引用类型避免悬挂引用检查有效性使用前检查valid()避免操作空的shared_future内存模型get()提供memory_order_acquire语义确保结果可见性1 #include iostream 2 #include thread 3 #include chrono 4 #include functional 5 #include future 6 7 using namespace std; 8 void computeA(promiseint prom){ 9 cout 进入A! endl; 10 this_thread::sleep_for(chrono::seconds(5));//5s 11 cout A完成! 5s endl; 12 prom.set_value(1);//设置结果值 13 } 14 15 16 void computeB(shared_futureint shared_fut){ 17 cout 进入B endl; 18 shared_fut.get(); //等待A完成 19 this_thread::sleep_for(chrono::seconds(4));//4s 20 cout B完成! 4s endl; 21 22 } 23 24 25 void computeC(shared_futureint shared_fut){ 26 cout 进入C endl; 27 shared_fut.get(); 28 cout C完成! endl; 29 } 30 31 int main(){ 32 33 promiseint prom_i; 34 futureint resultFutI prom_i.get_future(); 35 shared_futureint shared_fut resultFutI.share();//两步获取shared_future 36 // 直接从 promise 获取 shared_future 37 //share_futureint share_fut prom_i.get_future().share(); 38 39 thread threadA thread(computeA , move(prom_i)); 40 41 thread threadB thread(computeB , shared_fut); 42 43 thread threadC thread(computeC , shared_fut); 44 45 46 threadA.join(); 47 48 threadB.join(); 49 50 threadC.join(); 51 }aysnc1 #include iostream 2 #include thread 3 #include chrono 4 #include functional 5 #include future 6 7 using namespace std; 8 int computeA(){ 9 cout 进入A! endl; 10 this_thread::sleep_for(chrono::seconds(5));//5s 11 cout A完成! 5s endl; 12 return 100; 13 } 14 15 16 void computeB(shared_futureint shared_fut){ 17 cout 进入B endl; 18 int result shared_fut.get(); //等待A完成 19 this_thread::sleep_for(chrono::seconds(4));//4s 20 cout B完成! 4s result result endl; 21 22 } 23 24 int main(){ 25 coutmain fun! endl; 26 futureint fut async(launch::async,computeA); 27 shared_futureint shared_fut fut.share(); 28 thread threadA thread(computeB,shared_fut); 29 for(int i 0;i50;i){ 30 coutmain: i i endl; 31 } 32 threadA.join(); 33 return 0; 34 35 }线程同步线程同步是多线程编程中协调线程执行顺序的机制通过控制多个线程对共享资源的访问顺序防止数据竞争与不可预知的数据损坏。其核心在于保证同一时刻仅有一个线程操作关键数据段。为什么要线程同步解决竞争条件和数据不一致性。线程同步的本质就是保证数据操作原子性。线程同步的方法互斥锁、读写锁、条件变量、原子变量、线程局部存储。互斥锁 mutex和原子变量 atomicmutex:提供基本的锁定和解锁功能;recursive_mutex:递归互斥锁允许同一个线程多次锁定同一个互斥锁避免自死锁。timed_mutex:带超时功能的互斥锁可以尝试锁定一段时间避免永久阻塞。recursive_timed_mutex:结合递归和超时功能的互斥锁.锁管理器 (RAII机制)lock_guard:轻量级自动释放构造时加锁unique_lock:支持延时锁定手动锁定/解锁所有权转移延迟锁定std::unique_lockstd::mutex lock(mtx, std::defer_lock);scoped_lock - 多锁管理 C 17std::mutex mtx1, mtx2, mtx3; // 同时锁定多个互斥锁避免死锁 std::scoped_lock lock(mtx1, mtx2, mtx3);atomic内存序内存序Memory Order是因为编译器和 CPU 为了性能会进行指令重排Instruction Reordering。memory_order_relaxedrelaxed读/写无同步仅保证操作原子化常用于计数器。consume读比 acquire 更轻只同步有数据依赖的变量不推荐初学者使用。acquire读配合 release防止读操作被重排到后面。release写配合 acquire防止写操作被重排到前面。acq_rel读-改-写同时具有 acquire 和 release 的特性。seq_cst读/写最严格所有线程看到完全一致的顺序。默认。实例1 #include iostream 2 #include thread 3 #include chrono 4 #include functional 5 #include future 6 #include mutex 7 #include atomic 8 9 using namespace std; 10 //atomicint shared_data(0); atomic适用于基本类型 11 int shared_data 0; 12 mutex g_mutex; 13 14 //测试不准确 15 //不加锁 0ms 结果错误 16 //atomic 3ms 17 //mutex 加锁 12ms 18 //lock_guard 13ms 19 //unique_lock 15ms 20 21 void addValue() { 22 for(int i 0;i100000;i){ 23 // mutex加锁 24 // g_mutex.lock(); 25 // shared_data; 26 // g_mutex.unlock(); 27 28 unique_lockmutex lock(g_mutex,defer_lock); 29 lock.lock(); 30 shared_data; 31 lock.unlock(); 32 33 } 34 } 35 36 37 int main(){ 38 auto startTime chrono::high_resolution_clock::now(); 39 coutmain fun! endl; 40 thread threadA thread(addValue); 41 thread threadB thread(addValue); 42 threadA.join(); 43 threadB.join(); 44 auto stopTime chrono::high_resolution_clock::now(); 45 auto duration chrono::duration_castchrono::milliseconds(stopTime-startTime).count(); 46 coutsharedData: shared_data 时间duration msendl; 47 return 0; 48 49 }读写锁允许多个读线程同时访问共享资源但只允许一个写线程独占访问读锁共享锁多个线程可以同时持有读锁写锁独占锁同一时间只能有一个线程持有写锁且持有写锁时不能有读锁// 读写锁的状态转换 // 无锁状态 - 可以加读锁或写锁 // 有读锁时 - 可以再加读锁不能加写锁 // 有写锁时 - 不能加读锁也不能加写锁 // 写者优先Writer-preference 或防止写饥饿Write starvation prevention 的策略。1 #include iostream 2 #include thread 3 #include shared_mutex 4 #include vector 5 #include chrono 6 7 class ThreadCounter { 8 private: 9 mutable std::shared_mutex mutex_; 10 int value_ 0; 11 12 public: 13 // 读取操作使用共享锁 14 int read(int i) const { 15 std::cout读线程调用std::endl; 16 std::shared_lockstd::shared_mutex lock(mutex_); // 共享锁 17 std::cout (线程ID: std::this_thread::get_id() ) 读操作 顺序号 i std::end l; 18 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); 19 return value_; 20 } 21 22 // 写入操作使用独占锁 23 void write(int i) { 24 std::cout 写线程调用std::endl; 25 std::unique_lockstd::shared_mutex lock(mutex_); // 独占锁 26 std::cout (线程ID: std::this_thread::get_id() ) 写操作 顺序号 i std::en dl; 27 std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 28 value_; 29 } 30 31 // 写入操作重置值 32 void reset() { 33 std::unique_lockstd::shared_mutex lock(mutex_); // 独占锁 34 std::cout (线程ID: std::this_thread::get_id() ) 重置操作 std::endl; 35 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 36 value_ 0; 37 } 38 }; 39 40 int main() { 41 std::cout 基本读写锁示例 std::endl; 42 43 ThreadCounter counter; 44 std::vectorstd::thread threads; 45 46 // 启动多个读线程 47 for (int i 0; i 5; i) { 48 threads.emplace_back([counter, i]() { 49 for (int j 0; j 3; j) { 50 counter.read(i); 51 } 52 }); 53 } 54 55 // 启动写线程 56 threads.emplace_back([counter]() { 57 for (int i 0; i 2; i) { 58 counter.write(i); 59 } 60 }); 61 62 for (auto t : threads) { 63 t.join(); 64 } 65 return 0; 66 }条件变量 condition_variable条件变量实现多个线程间的同步操作当条件不满足时相关线程被一直阻塞直到某种条件出现这些线程才会被唤醒典型流程mutex 条件变量运行状态切换时的同步condition_variable 等待/唤醒共享数据条件1 #include iostream 2 #include queue 3 #include thread 4 #include mutex 5 #include condition_variable 6 #include vector 7 8 class ProducerConsumer { 9 private: 10 std::queueint queue; // 共享资源缓冲区队列 11 std::mutex mtx; // 互斥锁保护队列 12 std::condition_variable cv_prod; // 条件变量控制生产者当队列满时等待 13 std::condition_variable cv_cons; // 条件变量控制消费者当队列空时等待 14 size_t capacity; // 缓冲区最大容量 15 16 public: 17 explicit ProducerConsumer(size_t capacity) : capacity(capacity) {} 18 19 // 生产者调用的入队函数 20 void prod(int value) { 21 // 获取锁保护共享资源 queue 22 std::unique_lockstd::mutex lock(mtx); 23 24 // 等待判断如果队列满了生产者阻塞并释放锁直到消费者消费后唤醒 25 // 使用 lambda 表达式防止虚假唤醒 26 //wait()的谓词返回true时继续等待返回false时才退出等待 27 cv_prod.wait(lock, [this]() { return queue.size() capacity; }); 28 29 // 执行生产 30 queue.push(value); 31 std::cout Produced: value | Queue size: queue.size() std::endl; 32 33 // 唤醒告诉正在等待的消费者现在有货了 34 cv_cons.notify_one(); 35 36 // 作用域结束lock 自动析构并释放锁 37 } 38 39 // 消费者调用的出队函数 40 int cons() { 41 std::unique_lockstd::mutex lock(mtx); 42 43 // 等待判断如果队列空了消费者阻塞并释放锁直到生产者生产后唤醒 44 cv_cons.wait(lock, [this]() { return !queue.empty(); }); 45 46 // 执行消费 47 int value queue.front(); 48 queue.pop(); 49 std::cout Consumed: value | Queue size: queue.size() std::endl; 50 51 // 通知告诉正在等待的生产者现在有空位了 52 cv_prod.notify_one(); 53 54 return value; 55 } 56 }; 57 58 // --- 测试代码 --- 59 void producer_task(ProducerConsumer q, int id) { 60 for (int i 0; i 5; i) { 61 q.prod(id * 100 i); // 生产数据 62 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时 63 } 64 } 65 66 void consumer_task(ProducerConsumer q) { 67 for (int i 0; i 10; i) { 68 q.cons(); // 消费数据 69 std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费耗时 70 } 71 } 72 73 int main() { 74 ProducerConsumer q(5); // 缓冲区容量为 3 75 76 // 开启 2 个生产者线程和 1 个消费者线程 77 std::thread p1(producer_task, std::ref(q), 1); 78 std::thread p2(producer_task, std::ref(q), 2); 79 std::thread c1(consumer_task, std::ref(q)); 80 81 p1.join(); 82 p2.join(); 83 c1.join(); 84 85 return 0; 86 }

相关新闻