返回
Featured image of post AbstractQueuedSynchronizer源码探究

AbstractQueuedSynchronizer源码探究

透过ReentrantLock非公平锁的加锁和解锁过程,深入源码揭秘AQS的原理

AQS概念

AbstractQueuedSynchronizer是JUC中非常重要的一个同步控制工具

在JDK的并发包中,我们能看到很多很多的同步器类,比如ReentrantLock(可重入锁)、Semaphore(信号量)和CountDownLatch(倒计时闭锁)等等

在这些类中,都使用了AbstractQueuedSynchronizer(AQS)来构建

它们之间的继承关系如下图所示:

AQS的继承关系
AQS的继承关系

可以看出,在基于AQS构建的同步器类中,都继承了AbstractQueuedSynchronizer的子类,而且名字都叫做Sync

特别地,在ReentrantLock中Sync有两种实现类:NonfairSyncFairSync,分别实现了对不公平锁公平锁的控制

AQS的概念,总共有三点:

  • 状态信息:我们需要一个整数来保存当前的同步状态信息,这个信息在AbstractQueuedSynchronizer中由int属性state保存。它可以表示任意状态,比如ReentrantLock用它来表示线程已经重复获取锁的次数,Semaphore用它来表示剩余许可的数量
  • 获取(aquire):获取锁或许可,通常会阻塞,直至状态信息处于可被获取的状态
  • 释放(release):释放锁或许可,通常不会阻塞,释放操作会唤醒因获取而被阻塞的线程

AQS的内部结构

AQS在类内部维护了一个等待队列,这个队列叫作CLH队列。这个CLH队列实际上是一个双向链表,其示意图如下:

AQS的等待队列(CLH队列)
AQS的等待队列(CLH队列)

CLH队列的优点:

  • 先进先出,可以保证公平性
  • 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
  • 采用了自旋锁思想,所以CLH队列也是一种基于链表的可扩展、高性能、公平的自旋锁

再来看一下AQS的UML图:

AQS的UML图
AQS的UML图

我们会发现:

  • 链表的节点就是AQS内部的抽象类Node

  • AQS中的head就是指向队列的head节点,tail指向队列的尾节点

  • Node中有指向前节点的指针(prev)和指向后节点的指针(next),用waiter存放与此节点关联的线程,status则指明了线程的状态

可以看出,这个等待队列是存放着被阻塞的线程。在这个等待队列中的线程,会不断地尝试acquire,直到获取到锁或许可,改变状态信息为止

实际上,AQS内部不只是有一个等待队列,它为各个Condition对象都维护了一个条件等待队列

图中的ConditionObject就是一个Condition实现类,内部维护了firstWaiter和lastWaiter,指向队列的头和尾

ConditionObject类的结构
ConditionObject类的结构

当线程因为调用Condition类中的await()挂起时,它们会进入条件等待队列中。当它们被signal()或signalAll()唤醒时,会将他们转移到等待队列中,去尝试acquire

等待队列(同步队列)和条件等待队列
等待队列(同步队列)和条件等待队列

因为队列不一样,节点也不一样,因此Node节点派生了3个子类

  • SharedNode:共享锁队列的节点
  • ExclusiveNode:独占锁队列的节点
  • ConditionNode:条件等待队列的节点

总而言之,AQS的结构如下图所示:

AQS的总体结构
AQS的总体结构

源码分析

让我们从一个简单的非公平ReentrantLock出发,分析AQS在加锁时的表现

lock()

首先调用ReentrantLock的lock()

public void lock() {
    // 此处是调用了NonFairSync类的lock方法
	// NonFairSync就是AQS在ReentrantLock中的实现类
    sync.lock();
}

进入NonFairSync类的lock方法

@ReservedStackAccess
final void lock() {
    if (!initialTryLock())
        // 首先尝试获取锁,如果没获取到,则进入acquire(1)
        // 1为要获取的资源数,因为此处为独占锁,所以数量为1
        acquire(1);
}

// 调用initialTryLock()尝试获取锁
// 这个方法排除了锁重入的情况,接下来的情况都是非重入情况
final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) {
        // 使用CAS设置state,尝试获取锁,如果成功,则设置当前线程为owner
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        // 如果是已经持有锁的线程再次获得锁,则记录重复加锁的数量
        int c = getState() + 1;
        // 如果重入次数发生溢出,则抛异常
        if (c < 0)
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}

进入AbstractQueuedSynchronizer的acquire(int arg)方法

实际上这个方法是对核心acquire方法的一层简单包装

public final void acquire(int arg) {
    if (!tryAcquire(arg))
        // 再次尝试获取锁,若失败则进入核心的线程同步方法acquire(),在这个方法中实现了对队列的管理
        acquire(null, arg, false, false, false, 0L);
}

// 调用tryAcquire方法,再次尝试获取锁
protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

接下来,我们将进入核心的acquire方法,这是AQS控制线程同步的核心流程,所有使用AQS的工具类最终都会调用此方法

在分析方法之前,我们先看一下节点有哪些状态(Node的status值)

// 节点处于等待状态,值为1(且必须为1)
static final int WAITING   = 1;
// 节点被取消调度(被中断或Timeout),这个值必须为负数
static final int CANCELLED = 0x80000000;
// 节点处于条件等待队列中
static final int COND      = 2;

关于acquire方法,官方文档中写道:

循环做以下几件事情:

​ 检查节点是否为头节点

​ 如果是,则保证head的稳定性,否则保证有效的前节点

​ 如果节点是头节点或仍未入队,则尝试获取

​ 如果节点还未创建,则创建节点

​ 如果节点未入队,则尝试入队一次

​ 如果线程从阻塞中被唤醒, 则再次尝试获取(直至指定的自旋时间)

​ 如果WAITING状态未设置,则设置并重试

​ 否则,阻塞当前线程,清除WAITING状态并检查是否有异常情况

我们可以对照官方文档,理清acquire方法的逻辑

final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
	Thread current = Thread.currentThread();
    // 自旋的时间
    byte spins = 0, postSpins = 0;
    // 中断标记,头节点标记
    boolean interrupted = false, first = false;
    // 在入队时的前一个节点
    Node pred = null;

    // 死循环,不断尝试获取锁
    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) 
        	// 节点在队列中,且不是头节点时
            if (pred.status < 0) {
                // 如果前一个节点被取消调度,则调用cleanQueue()方法
                // 此方法的作用是反复遍历CLH队列,清除被取消调度的节点,并唤醒正处于等待的节点
                cleanQueue();
                continue;
              else if (pred.prev == null) {
                // 此处的代码笔者暂时也不是很清楚,似乎是节点突然间变成了头节点
                // 所以调用onSpinWait()让出CPU使用权,使得头节点稳定下来(不了解)
                Thread.onSpinWait();
                continue;
            }
        }
        if (first || pred == null) {
            // 如果节点是头节点,或者还没入队,则尝试获取锁
            // 注意,获取锁的顺序并不是严格按照队列的FIFO顺序,未入队的线程也可以尝试获取锁
            // 此处就体现了非公平的理念
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    // 如果是头节点获取到锁,则此节点成为新的head
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                // 如果成功获取到锁,则返回1,所有获取到锁的线程都会从这里return回去
                return 1;
            }
        }
        if (node == null) {
            // 如果节点仍未创建,则创建节点
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {
            // 如果节点未入队,则尝试入队
            node.waiter = current;
            Node t = tail;
            // 将前一个节点和当前节点联系起来
            node.setPrevRelaxed(t);
            if (t == null)
                // 如果队列都没创建,则需要创建队列,初始化head
                tryInitializeHead();
            else if (!casTail(t, node))
                // 利用CAS操作更新tail为当前节点
                // 如果更新失败,则将当前节点的prev更新成null,在下一次循环中重新进行此操作
                node.setPrevRelaxed(null);  // back out
            else
                // 更新成功,把原tail的next设置为当前节点
                t.next = node;
        } else if (first && spins != 0) {
			// 线程被唤醒之后,在自旋时间内持续尝试获取锁
            --spins;
            // 让出CPU使用权
            Thread.onSpinWait();
        } else if (node.status == 0) {
            // 如果节点状态未设置,则设置status为WAITING(1)
            node.status = WAITING;
        } else {
            long nanos;
            // 设置自旋时间
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                // 如果没有设置阻塞时间,则阻塞该线程
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                // 如果设置了阻塞时间则阻塞指定时间
                LockSupport.parkNanos(this, nanos);
            else
                break;
            // 清除状态
            node.clearStatus();
            // 如果有中断到来,则直接跳出循环
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    // 跳出循环说明等待被中断了
    return cancelAcquire(node, interrupted, interruptible);
}

阅读完源码我们可以发现,所有被锁阻塞的线程,都是阻塞在了这个核心的acquire方法中,不断地进行着一个死循环,当获得锁(即CAS改变AQS中的state)时,才能从acquire方法返回,进入到临界区中

当然,笔者只是给出了一个阅读源码的思路,其中的很多细节,笔者也没有完全搞懂,比如:

  • 调用onSpinWait()的意义何在?
  • byte类型的spins和postSpins变量,究竟有什么作用?
  • 官方文档中的head的稳定性(ensure head stable),真正的含义是什么?

Doug Lea的代码精妙至极,奈何笔者才疏学浅,疑惑之处甚多,还望各位朋友指点一二

unlock()

看完了加锁的源码,释放锁的源码就比较简单了,很轻松就能看懂

首先进入ReentrantLock的unlock方法

public void unlock() {
  // 将AQS的资源释放
  sync.release(1);
}

接着进入AbstractQueuedSynchronizer的release方法,以释放资源

public final boolean release(int arg) {
  if (tryRelease(arg)) {
      // 释放资源后,还需唤醒下一个节点
      signalNext(head);
      return true;
  }
  return false;
}

// 尝试释放锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    // 现获取的资源数-要释放的资源数
    int c = getState() - releases;
    // 若释放锁的线程和持有锁的线程不一样,则抛出异常
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);
    if (free)
        // free表示资源没有被线程占用
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}

最后,调用signalNext方法唤醒下一个线程

private static void signalNext(Node h) {
  Node s;
  if (h != null && (s = h.next) != null && s.status != 0) {
      // 取消下一个节点的WAITING状态
      s.getAndUnsetStatus(WAITING);
      // 唤醒下一个节点
      LockSupport.unpark(s.waiter);
  }
}

总结

AQS作为一个线程同步工具,其核心特点为:

  • 维护一个状态量,线程可以获取与释放此状态量,来达到获取锁和释放锁的目的

  • 维护一个等待队列(CLH队列),所有未获取到状态量的线程在其中排队,不断地尝试获取状态量

  • 对于每一个条件对象(Condition Object),也维护一个条件等待队列,条件达成后,节点将进入正常的等待队列

  • 获取锁后,节点被移出队列;释放锁后,节点会唤醒下一个节点