一文让你彻底搞懂AQS(通俗易懂的AQS)

发布时间:2026/5/19 12:53:31

一文让你彻底搞懂AQS(通俗易懂的AQS) 一、什么是AQSAQSAbstract Queued Synchronizer (简称AQS)‌是一个用来构建锁和同步器的框架它通过一个int类型的同步状态 (state)和一个先进先出 (FIFO) 的线程等待队列来管理资源访问并支持独占和共享两种同步模式 。比如我们提到的ReentrantLockSemaphore其他的诸如ReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS的。当然我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。二、前置知识学习AQS需要大家对同步锁有一定的概念。同时大家要知道LockSupport的使用可以参考我这篇文章。LockSupport从入门到深入理解三、AQS 的核心思想AQS核心思想是如果被请求的共享资源空闲则将当前请求资源的线程设置为有效的工作线程并且将共享资源设置为锁定状态。如果被请求的共享资源被占用那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制这个机制AQS是用CLH队列锁实现的即将暂时获取不到锁的线程加入到队列中。 CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。 AQS使用一个int成员变量来表示同步状态通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。 (图一为节点关系图private volatile int state;//共享变量使用volatile修饰保证线程可见性四、AQS 案例分析public class AQSDemo { private static int num; public static void main(String[] args) { ReentrantLock lock new ReentrantLock(); new Thread(new Runnable() { Override public void run() { lock.lock(); try { Thread.sleep(1000); num 1000; System.out.println(A 线程执行了1秒,num num); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },A).start(); new Thread(new Runnable() { Override public void run() { lock.lock(); try { Thread.sleep(500); num 500; System.out.println(B 线程执行了0.5秒,num num); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },B).start(); new Thread(new Runnable() { Override public void run() { lock.lock(); try { Thread.sleep(100); num 100; System.out.println(C 线程执行了0.1秒,num num); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },C).start(); } }执行的某一种结果! 这个代码超级简单但是执行结果却是可能不一样大家可以自行实验。对比一下三种结果大家会发现无论什么样的结果num最终的值总是1600这说明我们加锁是成功的。五、AQS 源码分析使用方法很简单线程操纵资源类就行。主要方法有两个lock()和unlock.我们深入代码去理解。我在源码的基础上加注释希望大家也跟着调试源码。其实非常简单5.1 AQS 的数据结构AQS 主要有三大属性分别是 head ,tail, state,其中state 表示同步状态head为等待队列的头结点tail 指向队列的尾节点。/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;还需要再去了解 Node的数据结构class Node{ //节点等待状态 volatile int waitStatus; // 双向链表当前节点前节点 volatile Node prev; // 下一个节点 volatile Node next; // 当前节点存放的线程 volatile Thread thread; // condition条件等待的下一个节点 Node nextWaiter; }waitStatus 只有特定的几个常量相应的值解释如下本次源码讲解我们以ReentranLock的非公平锁为例。我们主要关注的方法是lock(),和unlock()。5.2 lock源码分析首先我们看一下lock()方法源代码直接进入非公平锁的lock方法final void lock() { //1、判断当前state 状态, 没有锁则当前线程抢占锁 if (compareAndSetState(0, 1)) // 独占锁 setExclusiveOwnerThread(Thread.currentThread()); else // 2、锁被人占了尝试获取锁关键方法了 acquire(1); }进入 AQS的acquire()方法public final void acquire(int arg) { if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }总-分-总lock方法主要由tryAquire()尝试获取锁addWaiter(Node.EXCLUSIVE) 加入等待队列acquireQueued(node,arg)等待队列尝试获取锁。示意图如下5.2.1 tryAquire 方法源码既然是非公平锁那么我们一进来就想着去抢锁不管三七二一直接试试能不能抢到抢不到再进队列。final boolean nonfairTryAcquire(int acquires) { //1、获取当前线程 final Thread current Thread.currentThread(); // 2、获取当前锁的状态0 表示没有被线程占有0 表示锁被别的线程占有 int c getState(); // 3、如果锁没有被线程占有 if (c 0) { // 3.1、 使用CAS去获取锁 为什么用case呢防止在获取c之后 c的状态被修改了保证原子性 if (compareAndSetState(0, acquires)) { // 3.2、设置独占锁 setExclusiveOwnerThread(current); // 3.3、当前线程获取到锁后直接发挥true return true; } } // 4、判断当前占有锁的线程是不是自己 else if (current getExclusiveOwnerThread()) { // 4.1 可重入锁加1 int nextc c acquires; if (nextc 0) // overflow throw new Error(Maximum lock count exceeded); // 4.2 设置锁的状态 setState(nextc); return true; } return false; }5.2.2 addWaiter 方法的解析private NodeaddWaiter(Node mode)当前线程没有货得锁的情况下进入CLH队列private Node addWaiter(Node mode) { // 1、初始化当前线程节点虚拟节点 Node node new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 2、获取尾节点初始进入节点是null Node pred tail; // 3、如果尾节点不为null,怎将当前线程节点放到队列尾部并返回当前节点 if (pred ! null) { node.prev pred; if (compareAndSetTail(pred, node)) { pred.next node; return node; } } // 如果尾节点为null其实是链表没有初始化,怎进入enq方法 enq(node); return node; } // 这个方法可以认为是初始化链表 private Node enq(final Node node) { // 1、入队 为什么要用循环呢 for (;;) { // 获取尾节点 Node t tail; // 2、尾节点为null if (t null) { // Must initialize // 2.1 初始话头结点和尾节点 if (compareAndSetHead(new Node())) tail head; } // 3、将当前节点加入链表尾部 else { node.prev t; if (compareAndSetTail(t, node)) { t.next node; return t; } } } }有人想明白为什么enq要用for(;;)吗 咋一看最多只要循环2次啊 答疑来了这是对于单线程来说确实是这样的但是对于多线程来说有可能在第2部完成之后就被别的线程先执行入链表了这时候第3步cas之后发现不成功了怎么办只能再一次循环去尝试加入链表直到成功为止5.2.3 acquireQueued()方法详解addWaiter 方法我们已经将没有获取锁的线程放在了等待链表中但是这些线程并没有处于等待状态。acquireQueued的作用就是将线程设置为等待状态final boolean acquireQueued(final Node node, int arg) { // 失败标识 boolean failed true; try { // 中断标识 boolean interrupted false; for (;;) { // 获取当前节点的前一个节点 final Node p node.predecessor(); // 1、如果前节点是头结点那么去尝试获取锁 if (p head tryAcquire(arg)) { // 重置头结点 setHead(node); p.next null; // help GC // 获得锁 failed false; // 返回false,节点获得锁然后现在只有自己一个线程了这个时候就会自己唤醒自己 // 使用的是acquire中的selfInterrupt(); return interrupted; } // 2、如果线程没有获得锁且节点waitStatus0shouldParkAfterFailedAcquire并将节点的waitStatus赋值为-1 //parkAndCheckInterrupt将线程park进入等待模式 if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) interrupted true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws pred.waitStatus; if (ws Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev pred pred.prev; } while (pred.waitStatus 0); pred.next node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but dont park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }好了这个源码的解释就结束了大家是不是还是云里雾里不得不承认这个代码太优雅了。不愧大神我用白话给大家串起来讲一下吧 我们以reentrantLock的非公平锁结合我们案例4来讲解。当线程A 到lock()方法时通过compareAndSetState0,1获得锁并且获得独占锁。当B,C线程去争抢锁时运行到acquire(1),C线程运行tryAcquire(1),接着运行nonfairTryAcquire1方法未获取锁最后返回false,运行addWaiter(),运行enqnode,初始化head节点同时C进入队列再进入acquireQueuednode,1方法初始化waitStatus -1,自旋并park进入等待。接着B线程开始去抢锁B线程运行tryAcquire(1),运行nonfairTryAcquire1方法未获得锁最后返回false,运行addWaiter()直接添加到队尾同时B进入队列在进入acquireQueuednode,1方法初始化waitStatus -1,自旋并park进入等待。5.3 unlock源码分析unlock释放锁。主要利用的是LockSupportpublic final boolean release(int arg) { // 如果成功释放独占锁 if (tryRelease(arg)) { Node h head; // 如果头结点不为null且后续有入队结点 if (h ! null h.waitStatus ! 0) //释放当前线程并激活等待队里的第一个有效节点 unparkSuccessor(h); return true; } return false; } // 如果释放锁着返回true,否者返回false // 并且将sate 设置为0 protected final boolean tryRelease(int releases) { int c getState() - releases; if (Thread.currentThread() ! getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free false; if (c 0) { free true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws node.waitStatus; if (ws 0) // 重置头结点的状态waitStatus compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取头结点的下一个节点 Node s node.next; // s.waitStatus 0 为取消状态 结点为空且被取消 if (s null || s.waitStatus 0) { s null; // 获取队列里没有cancel的最前面的节点 for (Node t tail; t ! null t ! node; t t.prev) if (t.waitStatus 0) s t; } // 如果节点s不为null则获得锁 if (s ! null) LockSupport.unpark(s.thread); }锁的释放这个还是很简单。参考https://blog.csdn.net/u010445301/article/details/125590758?spm1001.2014.3001.5502

相关新闻