
目录一、volatile(轻量级同步机制作用1. 保证可见性2. 禁止指令重排序2.1指令重排的场景3. 不保证原子性(简单赋值的话安全)适用场景:1、开销较低的读写锁策略2、 状态标志位二、无锁/CAS(Compare-And-Swap)作用1、保证原子性2、实现无锁并发无阻塞特点1、乐观锁机制2、高性能但CPU 友好度差3、依赖硬件指令4、 存在 ABA 问题应用场景1、原子类操作2、自旋锁3、并发集合三、互斥同步(加锁)1、synchronized关键字特点自动管理可重入不可中断性能进化单等待队列锁升级机制简略详细应用场景1、修饰实例方法2、修饰静态方法3、修饰代码块核心原理注意2、ReentrantLock常用方法特点手动管理尝试获取两种方法重载可中断示例:公平性耗时对比示例多条件队列对比synchronized的单队列等待机制使用ReentrantLock的多队列模型3、ReentrantReadWriteLock特点读写锁适用于读远多于写锁降级执行示例四、ThreadLocal特点绝对的线程隔离性数据跟随线程生命周期父子线程默认隔离应用场景可继承的InheritableThreadLocal原理触发时机数据的浅拷贝不支持线程池简单示例TransmittableThreadLocal支持线程池五、并发工具类1、 CountDownLatch一、volatile(轻量级同步机制作用1. 保证可见性当一个线程修改了 volatile 变量的值,新值会立即刷新到主内存中其他线程读取该变量时,会从主内存重新读取最新值,而不是使用工作内存中的缓存值确保了多线程间变量的可见性2. 禁止指令重排序volatile 通过内存屏障机制,防止编译器和处理器对指令进行重排序优化保证了代码执行的有序性2.1指令重排的场景new Singleton()不是原子操作分为三步1. 分配内存空间2. 调用构造函数初始化对象3. 将 instance 指向分配的内存地址如果发生指令重排导致线程A先执行13后执行2public class Singleton { private static Singleton instance; public static Singleton getInstance() { if (instance null) { // 第1次检查(无锁) synchronized (Singleton.class) { if (instance null) { instance new Singleton(); } } } return instance; } }new Singleton()不是原子操作分为三步1. 分配内存空间2. 调用构造函数初始化对象3. 将 instance 指向分配的内存地址如果发生指令重排导致线程A先执行13后执行2此时instance!null但还没有初始化线程B检测到instance!null就直接返回了没初始化的instance就会发生错误。要解决的话需要在instance前加volatile修饰,会在 instance new Singleton() 前后插入内存屏障让其按顺序执行。3. 不保证原子性(简单赋值的话安全)volatile如果写操作是简单赋值不依赖当前值的话也是安全的 但不能保证复合操作(如 i)的原子性。如果需要原子性,需要使用 synchronized、Lock 或 AtomicInteger 等适用场景:1、开销较低的读写锁策略结合synchronized修饰更新public class Value1 { private volatile int count 0; public int getCount() { return count; // 频繁读取,无锁 } public synchronized void increment() { count; // 偶尔写入,使用同步 } }2、 状态标志位最常见的用法,用于控制线程的执行状态加入volatile修饰后在其他线程(例如B)更改这个变量正在run()的线程A也能立即响应不加的话线程 A 可能一直使用工作内存中的旧值 true会出现死循环。public class TaskRunner { private volatile boolean running true; public void run() { while (running) { // 执行任务 } } public void stop() { running false; // 其他线程调用此方法可以停止任务 } }二、无锁/CAS(Compare-And-Swap)作用CAS 的经典应用场景主要集中在无锁并发编程中。简单来说就是多个线程同时修改一个变量时不加锁而是通过比较并交换来保证原子性。1、保证原子性2、实现无锁并发无阻塞避免了线程上下文切换的开销避免了死锁风险。特点1、乐观锁机制2、高性能但CPU 友好度差在低竞争环境下性能远高于上述两种锁因为没有线程上下文切换的开销。如果竞争极其激烈大量的自旋会消耗大量 CPU 资源。3、依赖硬件指令CAS 不是 Java 独有的它底层依赖 CPU 提供的原子指令。Java 通过 Unsafe 类调用这些底层指令所以 CAS 的执行速度非常快纳秒级。4、 存在 ABA 问题具体来说线程1 读到 owner 是 null (A)。线程2 把 owner 改成 Thread-2 (B)然后又改回 null (A)。线程1 再次检查发现还是 null (A)于是 CAS 成功。应用场景1、原子类操作////Java 的 java.util.concurrent.atomic 包下的所有类都基于 CAS import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger count new AtomicInteger(0); public void increment() { count.incrementAndGet(); // 底层用 CAS 实现 i } public int getCount() { return count.get(); } }作用多线程下 i 不安全加 synchronized 太重CAS 性能更好。2、自旋锁import java.util.concurrent.atomic.AtomicReference; public class SpinLock { private AtomicReferenceThread owner new AtomicReference(); private int holdCount 0; // 记录重入次数 public void lock() { Thread current Thread.currentThread(); // 如果当前线程已经持有锁直接增加计数 if (owner.get() current) { holdCount; return; } // 如果 owner 是 null就改成当前线程否则一直循环重试 int cnts 0; while (!owner.compareAndSet(null, current)) { cnts; if(cnts10){ Thread.yield(); // 尝试多次后让出 CPU } } } public void unlock() { Thread current Thread.currentThread(); if (owner.get() ! current) { throw new IllegalMonitorStateException(当前线程不持有锁); } holdCount--; if (holdCount 0) { owner.set(null); // 完全释放 } } }作用避免线程挂起和唤醒的开销适合锁持有时间短的场景。上方代码实现了可重复加锁while循环就是自旋的实现。3、并发集合例如并发队列ConcurrentLinkedQueueimport java.util.concurrent.ConcurrentLinkedQueue; ConcurrentLinkedQueueString queue new ConcurrentLinkedQueue(); // 多个线程同时添加内部用 CAS 保证线程安全 queue.add(item1); queue.add(item2); String item queue.poll(); 第一步包装节点 把你传进来的元素 e 变成一个 Node 对象。此时这个节点的 next 是 null。 第二步寻找“真正的”尾节点 这是最关键的一步。因为 tail 指针可能滞后HOPS 优化所以不能直接把新节点挂在 tail 后面。 线程会从 tail 开始顺着 next 往后找。 直到找到一个节点 p它的 next 是 null。这个 p 才是当前队列里最后一名。 第三步CAS 抢位置 (p.casNext(null, newNode)) 线程会尝试执行一个原子操作 “如果 p 的 next 还是 null就把我的新节点挂上去。” 情况 1成功说明没人跟你抢你的节点正式进入队列了 情况 2失败说明在你寻找尾部和执行 CAS 的这一瞬间Thread B 已经手速更快把一个节点挂在 p 后面了。 此时p.next 不再是 null。 你的 CAS 返回 false。 动作回到 for (;;) 循环开头重新找最新的尾节点再次尝试。这就是自旋。 第四步更新 tail 指针 (casTail) 当你的节点成功挂上去后你会尝试把全局的 tail 指针移到你的新节点上。 注意这一步不是必须成功的。 如果此时又有别人插队导致你的 casTail 失败了没关系。你的数据已经在链表里了这就够了。tail 指针可以暂时不动等下次有机会再移。作用传统队列加锁性能差CAS 可以实现高并发的无锁队列。三、互斥同步(加锁)1、synchronized关键字这是 Java 最原生、最简单的锁。特点自动管理不需要手动加锁/解锁JVM 自动处理不会发生“忘了释放锁”的情况。可重入同一个线程可以多次获取同一把锁。不可中断一旦进入阻塞状态只能等待锁释放不能通过 interrupt() 强行打断。性能进化在 JDK 1.6 之后引入了偏向锁、轻量级锁、重量级锁的升级机制性能已经非常接近 ReentrantLock。单等待队列synchronized 维护了两个主要的队列EntryList入口队列那些想进房间但还没拿到锁的线程在这里排队。WaitSet等待队列那些拿到了锁但因为条件不满足调用 wait() 而释放锁的线程在这里睡觉。同一对象不同被synchronized修饰的方法中调用wait()的线程都会添加到同一个WaitSet。被notifyAll()的时候都会被唤醒可能导致额外资源占用。锁升级机制简略没有线程竞争时就使用低开销的“偏向锁”此时没有额外的 CAS 操作轻度竞争时使用“轻量级锁”采用 CAS 自旋避免线程阻塞只有在重度竞争时才使用“重量级锁”由 Monitor 机制实现需要线程阻塞。详细锁升级的四个阶段 第一阶段无锁 状态对象没有被任何线程锁定。 表现Mark Word 存储的是对象的 HashCode 和分代年龄。 触发刚创建的对象。 第二阶段偏向锁 核心思想“偏心”。大多数情况下锁不仅不存在竞争而且总是由同一个线程多次获得。 过程 当第一个线程Thread A访问同步块时JVM 会在对象头的 Mark Word 中记录 Thread A 的 ID。 以后 Thread A 再来加锁JVM 发现 Mark Word 里的 ID 就是自己直接执行无需任何 CAS 或系统调用。 优点消除所有同步原语性能最高。 缺点一旦出现第二个线程Thread B来抢锁偏向锁就失效了。 第三阶段轻量级锁 核心思想“自旋/CAS”。当偏向锁失效有另一个线程来尝试获取但竞争还不算太激烈时。 过程 偏向锁撤销升级为轻量级锁。 线程会在自己的栈帧中创建一个 Lock Record锁记录。 线程尝试通过 CAS 将对象头的 Mark Word 替换为指向自己 Lock Record 的指针。 如果成功获取锁执行代码。 如果失败说明有竞争。当前线程会通过自旋循环重试尝试获取锁。 优点避免了线程挂起和唤醒的系统调用开销。 缺点如果自旋很久还拿不到锁会白白消耗 CPU。 第四阶段重量级锁 核心思想“阻塞/排队”。当竞争非常激烈或者自旋次数超过阈值时。 过程 轻量级锁膨胀为重量级锁。 对象头的 Mark Word 指向堆中的 Monitor Object监视器锁。 没抢到锁的线程会被挂起Blocked进入操作系统的等待队列。 持有锁的线程释放后操作系统负责唤醒等待队列中的线程。 优点线程不占用 CPU适合长时间等待。 缺点涉及用户态和内核态切换性能开销最大。应用场景1、修饰实例方法同一时间只有一个线程能调用该对象的同步方法public class Counter { private int count 0; public synchronized void increment() { count; } public synchronized void decrement() { count--; } }2、修饰静态方法所有实例共享一把锁锁定整个类public class Counter { private static int count 0; public static synchronized void increment() { count; } }3、修饰代码块核心原理线程想要执行 synchronized 代码块中的内容必须先获取括号内对象的监视器锁。注意只要锁定的是同一个对象引用内容是否一样都互斥。1. this当前实例对象 public void method() { synchronized (this) { // 锁定当前实例 } } 2. 类对象Class对象 public void method() { synchronized (MyClass.class) { // 锁定整个类所有实例共享 } }//等价于 synchronized 修饰静态方法。 3. 任意对象引用 public class Counter { private final Object lock new Object(); private final String lockStr lock; public void method1() { synchronized (lock) { // 推荐专用锁对象 // 同步代码 } } public void method2() { synchronized (lockStr) { // 也可以但不推荐 // 同步代码 } } } 4. 成员变量对象 public class Bank { private final ListString accounts new ArrayList(); public void addAccount(String account) { synchronized (accounts) { /* 锁定accounts对象只有当多个线程操作的是同一个 accounts 对象时才会产生锁竞争*/ accounts.add(account); } } }2、ReentrantLock这是 Java 5 引入的基于 AQS实现的显式锁。提供比 synchronized 更灵活的锁控制。常用方法lock()获取锁。如果锁被占用则一直等待。tryLock()尝试获取锁。如果锁可用则返回 true否则立即返回 false不等待。tryLock(long time, TimeUnit unit)限时等待获取锁。在指定时间内尝试获取超时则返回 false。lockInterruptibly()可中断地获取锁。如果在等待过程中线程被中断会抛出异常并停止等待。示例见特点内的公平性unlock()释放锁。isLocked():判断锁是否被任何线程持有。isHeldByCurrentThread()判断锁是否被当前线程持有常用于断言或递归逻辑检查。getHoldCount()获取当前线程持有该锁的次数因为是可重入锁可能多次加锁。hasQueuedThreads()判断是否有线程在等待获取这把锁。getQueueLength()获取正在等待这把锁的线程数量。newCondition()创建一个与该锁绑定的 Condition 对象。可以创建多个实现精准通知示例见特点中的多条件队列。特点手动管理必须手动调用 lock() 和 unlock()通常在 finally 块中。尝试获取支持 tryLock()如果拿不到锁可以立即返回或等待指定时间避免死等。两种方法重载1、tryLock()不等待立即尝试获取锁获取成功返回 true失败返回 false不会阻塞线程2、tryLock(long time, TimeUnit unit) - 等待指定时间返回true在超时前成功获取锁false超时仍未获取到锁可能抛出的异常InterruptedException等待过程中被中断可中断支持 lockInterruptibly()等待锁的过程中可以响应中断。示例:// 线程2尝试获取锁但可以被中断 Thread thread2 new Thread(() - { try { System.out.println([线程2] 尝试获取锁...); lock.lockInterruptibly(); // 可中断地获取锁 try { System.out.println([线程2] 成功获取锁); } finally { lock.unlock(); } } catch (InterruptedException e) { System.out.println([线程2]被中断不再等待锁执行其他逻辑); } }, Thread-2); // 中断线程2 thread2.interrupt(); //等待线程结束 thread2.join();公平性可以选择创建公平锁先到先得或非公平锁默认允许插队性能更高。使用公平锁的场景需要严格的顺序。公平锁能确保执行顺序。注意线程持有锁时间较长的时候和使用非公平锁相差时间不大此时公平锁耗时的劣势不明显但线程持有锁时间较短的时候耗时会多很多。使用非公平锁的场景大多数通用场景默认选择追求高吞吐量线程持有锁的时间很短。原因非公平锁性能更好例如线程C发来请求的时候刚好线程A释放线程C直接获取锁避免了如果先唤醒B再唤醒CC会多一次唤醒操作的开销。耗时对比示例import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; public class main { public static void main(String[] args) throws InterruptedException { /** * 演示公平锁 */ // 创建公平锁构造函数传入 true ReentrantLock fairLock new ReentrantLock(true); // 创建多个线程竞争锁 Runnable task () - { for (int i 0; i 100; i) { try { fairLock.lock(); } catch (Exception e) { e.printStackTrace(); } finally { fairLock.unlock(); } } }; int threadCount20; Thread[] threads new Thread[threadCount]; for (int i 0; i threadCount; i) threads[i] new Thread(task, 线程 i); long fairStartTime System.currentTimeMillis(); for (Thread t:threads){ t.start(); } for (Thread t:threads){ t.join(); } long fairEndTime System.currentTimeMillis(); long fairCost fairEndTime - fairStartTime; /** * 演示非公平锁 */ // 创建非公平锁构造函数传入 false默认 ReentrantLock nonFairLock new ReentrantLock(false); // 创建多个线程竞争锁 Runnable task2 () - { for (int i 0; i 100; i) { try { nonFairLock.lock(); } catch (Exception e) { e.printStackTrace(); } finally { nonFairLock.unlock(); } } }; Thread[] threads2new Thread[threadCount]; for (int i 0; i threadCount; i) threads2[i] new Thread(task2, 线程 i); long nonFairStartTime System.currentTimeMillis(); for (Thread t:threads2) t.start(); for (Thread t:threads2) t.join(); long nonFairEndTime System.currentTimeMillis(); long nonFairCost nonFairEndTime - nonFairStartTime; System.out.println(公平锁耗时: fairCost ms); System.out.println(非公平锁耗时: nonFairCost ms); } } //运行结果 公平锁耗时: 23 ms 非公平锁耗时: 3 ms多条件队列可以绑定多个 Condition实现精确唤醒。对比synchronized的单队列等待机制当使用synchronized的时候JVM会为每个对象维护一个等待队列wait进入的队列是属于对象的而不是属于方法的意思就是同一对象的消费者和生产者方法中调用的wait()和notifyall()都会直接应用于对象的等待队列。例如生产者消费者模型单队列等待会浪费更多资源public class BlockingQueue { private QueueInteger queue new LinkedList(); private final int MAX_SIZE 10; public synchronized void produce(int item) throws InterruptedException { while (queue.size() MAX_SIZE) { wait(); // 队列满生产者进入 WaitSet 等待 } queue.add(item); notifyAll(); // 唤醒 WaitSet 中的所有线程 } public synchronized int consume() throws InterruptedException { while (queue.isEmpty()) { wait(); // 队列空消费者也进入同一个WaitSet 等待 } int item queue.poll(); notifyAll(); // 唤醒 WaitSet 中的所有线程 } }当队列满了的时候又有10个生产者在等待如果消费者取出了一个元素唤醒了所有线程此时一个生产者线程抢到了锁添加了元素后又会唤醒其他所有生产者线程这一步是无用浪费了CPU资源的。使用ReentrantLock的多队列模型public class ReentrantLockTest { private final QueueInteger queue new LinkedList(); private final int capacity;// 队列容量 private final ReentrantLock lock new ReentrantLock(); // 1. 创建两个条件队列 private final Condition notFull lock.newCondition(); // 队列不满的条件生产者 private final Condition notEmpty lock.newCondition(); // 队列不空的条件消费者 public ReentrantLockTest(int capacity) { this.capacity capacity; } /** * 生产者放入元素 */ public void produce(int item) throws InterruptedException { lock.lock(); try { // 如果队列满了在 notFull 队列里等待 while (queue.size() capacity) { notFull.await(); // 释放锁进入 notFull 等待队列 } queue.add(item); // 只唤醒一个正在等待的消费者 notEmpty.signal(); } finally { lock.unlock(); } } /** * 消费者取出元素 */ public int consume() throws InterruptedException { lock.lock(); try { // 如果队列空了在 notEmpty 队列里等待 while (queue.isEmpty()) { notEmpty.await(); // 释放锁进入 notEmpty 等待队列 } int item queue.poll(); // 只唤醒一个正在等待的生产者 notFull.signal(); return item; } finally { lock.unlock(); } } }这里的Condition.signal()每次都只会从指定等待队列中唤醒一个线程如果队列为空也不会报错程序会继续运行。3、ReentrantReadWriteLock特点与ReentrantLock像可重入、独占互斥、公平与非公平模式以及基于AQS底层实现这些核心机制本质上都是完全通用的。读写锁ReentrantReadWriteLock 内部维护了一对相关的锁一个用于只读操作读锁另一个用于写入操作写锁。读锁是共享的只要没有线程持有写锁多个线程可以同时获取读锁。这极大地提高了读多写少场景下的并发吞吐量。写锁是独占的当有线程持有写锁时其他任何线程无论是读还是写都必须等待。适用于读远多于写在读写比例接近如 5:5甚至写操作更多的场景下ReentrantReadWriteLock 由于内部需要维护读、写两种复杂的状态其性能反而会低于 简单直接的 ReentrantLock。 只有当读操作远远多于写操作例如 80%~90% 以上都是读时ReentrantReadWriteLock 的性能优势才会爆发出来。锁降级允许一个线程在持有写锁的情况下再去获取读锁然后释放写锁。这样做的好处是线程在更新完数据后可以立刻以读锁的身份继续持有数据保证其他线程无法在释放写锁的瞬间插入修改从而保证了数据的可见性和一致性。执行示例核心步骤 获取写锁线程首先获取独占的写锁准备修改数据。 获取读锁在持有写锁的同时再去获取共享的读锁。 释放写锁释放独占的写锁但此时线程依然持有读锁。 执行读操作线程带着读锁去读取刚刚更新的数据。 释放读锁读取完毕后释放读锁完成整个降级过程。 private final ReentrantReadWriteLock rwLock new ReentrantReadWriteLock(); // 1. 获取写锁独占开始修改数据 rwLock.writeLock().lock(); private Object data; try { // 执行业务写操作比如更新缓存数据 data 最新的数据; // 2. 在释放写锁之前先获取读锁 rwLock.readLock().lock(); } finally { // 3. 释放写锁此时其他写线程依然无法获取锁因为当前线程还拿着读锁 rwLock.writeLock().unlock(); } try { // 4. 执行读操作此时可以安全地读取刚刚写入的数据不用担心被其他线程修改 System.out.println(读取到的数据 data); } finally { // 5. 释放读锁 rwLock.readLock().unlock(); }四、ThreadLocal特点绝对的线程隔离性为每个使用该变量的线程提供独立的变量副本线程之间互不干扰。数据跟随线程生命周期ThreadLocal 的变量副本是绑定在当前线程上的。只要线程存活副本就一直存在当线程结束销毁后对应的变量副本也会被垃圾回收。父子线程默认隔离普通的 ThreadLocal 在父子线程之间是完全隔离的。子线程无法直接访问父线程的变量副本如果需要传递需要使用它的子类 InheritableThreadLocal。应用场景在过滤器中将信息存入ThreadLocal。/* 封装一个工具类来管理 */ public class UserContextHolder { // 使用 private static 修饰全局唯一 private static final ThreadLocalLong USER_ID_HOLDER new ThreadLocal(); // 封装 set 方法 public static void setUserId(Long userId) { USER_ID_HOLDER.set(userId); } // 封装 get 方法 public static Long getUserId() { return USER_ID_HOLDER.get(); } // 封装 remove 方法 public static void clear() { USER_ID_HOLDER.remove(); } } /* 在拦截器或过滤器中 */ public void doFilter(...) { try { //请求进来时把用户信息存入 ThreadLocal UserContextHolder.setUserId(i); //后续任何地方都能通过 UserContextHolder.getUserId() 拿到当前用户ID chain.doFilter(request, response); } finally { //不清楚在线程池环境下会导致数据串用和内存泄漏 UserContextHolder.clear(); } }也可以封装一个上下文对象代替这里的userId存到一个ThreadLocal中。可继承的InheritableThreadLocal原理在 Thread 类的定义中每个线程都有两个 ThreadLocalMappublic class Thread { /* 普通 ThreadLocal 变量存储的地方 */ ThreadLocal.ThreadLocalMap threadLocals null; /* InheritableThreadLocal 变量存储的地方 */ ThreadLocal.ThreadLocalMap inheritableThreadLocals null; }threadLocals用于存储普通的 ThreadLocal 变量子线程不能继承父线程的 ThreadLocal 变量inheritableThreadLocals用于存储 InheritableThreadLocal 变量子线程可以继承父线程的 InheritableThreadLocal 变量在创建子线程时JVM 会将父线程的 inheritableThreadLocals 复制到子线程中触发时机当父线程通过 new Thread() 创建一个新线程时JVM 会在子线程的初始化阶段进行检测。如果发现父线程的 inheritableThreadLocals 不为空就会触发数据复制逻辑。数据的浅拷贝这里的复制是浅拷贝。对于对象类型的数据父子线程共享同一个对象引用。如果子线程修改了对象内部的属性父线程获取到的对象也会受到影响。不支持线程池InheritableThreadLocal 的数据复制仅发生在线程被 new 出来的那一瞬间。而线程池的核心机制是“线程复用”池子里的工作线程早就被创建好了。当父线程向线程池提交新任务时工作线程并不会重新创建因此根本不会触发数据复制的逻辑。简单示例// 创建 InheritableThreadLocal InheritableThreadLocalString context new InheritableThreadLocal(); // 在父线程中设置值 context.set(父线程的数据); // 创建子线程子线程会自动继承这个值 Thread childThread new Thread(() - { // 子线程中可以获取到父线程设置的值 System.out.println(context.get()); // 输出: 父线程的数据 }); childThread.start();TransmittableThreadLocal支持线程池//先在pom.xml加入下列依赖 dependencies !-- TransmittableThreadLocal 依赖 -- dependency groupIdcom.alibaba/groupId artifactIdtransmittable-thread-local/artifactId version2.14.5/version /dependency /dependencies import java.util.concurrent.*; import com.alibaba.ttl.TransmittableThreadLocal; import com.alibaba.ttl.threadpool.TtlExecutors; public class Main { private static final TransmittableThreadLocalString context new TransmittableThreadLocal(); public static void main(String[] args) throws InterruptedException { // 设置上下文数据 context.set(主线程数据); // 创建线程池并用 TTL 包装 ExecutorService executor Executors.newFixedThreadPool(3); ExecutorService ttlExecutor TtlExecutors.getTtlExecutorService(executor); // 提交任务 - 子线程能正确获取父线程的值 ttlExecutor.submit(() - { String value context.get(); System.out.println(子线程获取到的值: value); }); // 关闭线程池 ttlExecutor.shutdown(); // 等待线程池关闭 try { if (!ttlExecutor.awaitTermination(5, TimeUnit.SECONDS)) { ttlExecutor.shutdownNow(); // 再次等待确认强制关闭 if (!ttlExecutor.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println(错误线程池未能正常关闭); } } else { System.out.println(所有任务已完成); } } catch (InterruptedException e) { System.out.println(等待过程中被中断强制关闭线程池); ttlExecutor.shutdownNow(); Thread.currentThread().interrupt(); } finally { // 清理 TTL 上下文防止内存泄漏 context.remove(); } } }五、并发工具类1、 CountDownLatch作用允许一个或多个线程等待其他线程完成一组特定的操作。核心方法await()阻塞当前线程直到计数器归零。countDown()将计数器减 1通常在工作线程的 finally 块中调用。底层原理基于 AQS的共享模式实现内部维护一个 volatile int state 作为计数器通过 CAS 操作保证线程安全。特点一次性使用计数器归零后无法重置。应用场景主线程等待多个子任务如初始化资源、并行查询接口全部完成后再继续执行后续逻辑。示例CountDownLatch latch new CountDownLatch(nodes.size()); while (true) { String nodeId readyQueue.poll(); if (nodeId null) break; // 队列为空说明所有可执行任务已提交 WorkflowNode node nodes.stream().filter(n - n.getId().equals(nodeId)).findFirst().orElse(null); if (node null) continue; // 提交任务到线程池 executorService.submit(() - { try { // --- 执行业务逻辑 --- NodeExecutor executor new ExecutorFactory().getExecutor(node.getType()); // 假设工厂类已实现 // 从全局上下文收集输入 (这里简单传入所有实际可按需过滤) MapString, Object input new HashMap(globalContext); MapString, Object output executor.execute(node, input); // 结果写入全局上下文 globalContext.put(node.getId(), output); System.out.println(节点 node.getId() 执行完成); } catch (Exception e) { e.printStackTrace(); // 生产环境需处理失败逻辑如阻断下游 } finally { // --- 调度逻辑通知下游节点 --- ListString successors reverseDeps.get(node.getId()); if (successors ! null) { for (String successorId : successors) { // 入度减 1如果变为 0 则加入就绪队列 if (inDegreeMap.get(successorId).decrementAndGet() 0) { readyQueue.offer(successorId); } } } latch.countDown(); } }); } // 7. 等待所有任务执行完毕 latch.await(); System.out.println(DAG 全部执行完毕); executorService.shutdown(); } }---------------------------------------------------------------------------------------------------------------------------------本篇完。