AQS的基本数据结构
AQS的背景
AbstractQueuedSynchronizer(简称AQS)是整个JUC的基础,大部分的同步器都是基于AQS的。 Doug Lea在设计之初,再设计之初,希望建立一个小框架,AQS类,来为构造这些同步器提供一种通用的机制。
我们平常所熟悉的,Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock都是基于AQS构建的。
AQS的功能介绍
Synchronizers possess two kinds of methods : at least one
acquire operation that blocks the calling thread unless/until the
synchronization state allows it to proceed, and at least one
release operation that changes synchronization state in a way that
may allow one or more blocked threads to unblock.
同步器一般包含两种方法,acquire、release(名称可能不同)。acquire负责阻塞调用线程,直到同步状态允许其继续执行。 release负责改变同步状态,使一个(独占)或多个(共享)被阻塞的线程继续执行。
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
AQS提供了两种锁机制:独占锁和共享锁。
AQS的数据结构
AQS的基本数据结构
+------+ prev +-----+ +-----+
head | Node | <---- | Node| <---- | Node| tail
+------+ +-----+ +-----+
AQS的tail指向由Node组成的双向链表的尾,head指向链表的头部。state表示同步状态,如果为0,表示当前锁可用,如果不为0,表示不可用。(CountDownLatch中设置的count数就是state的初始值。)
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
AQS提供了独占锁模式下的获取锁和释放锁,共享模式下的获取锁和释放锁。
/**
* 独占锁模式获取锁
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 独占锁模式释放锁
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 共享锁模式获取锁
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 共享锁模式释放锁
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Node数据结构
Node是组成双向链表的基本单元,每一个结点都对应着一个线程。其中的waitStatus字段,标识当前结点的状态。下面是Node的源码。
static final class Node {
/** 共享模式 */
static final Node SHARED = new Node();
/** 独占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: -1,成功拿到锁,但是当前节点的后继节点必须被唤醒(unpark)
* CANCELLED: 1,节点因为超时或中断等原因被取消了。一旦处于该状态,其状态就
* 不能被改变,特别是不能被阻塞。
* CONDITION: -2,当前结点在条件队列中。在结点状态设置为0之前,当前结点
* 都不会作为同步队列中的结点被使用。
* PROPAGATE: -3,共享模式下,当前结点的状态需被无条件的传播下去
* 0: 以上状态都不是
*
* 普通的同步模式,初始化状态为0。需被原子更新(CAS)。
*/
volatile int waitStatus;
/** 前继结点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 在队列上的当前结点对应的线程 */
volatile Thread thread;
/**
* condition队列中下一个等待结点(独占模式)
* 或者为static final Node SHARED(共享模式)
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中acquire和release实现的一般形式
acquire操作的主循环次数依赖于具体实现类中tryAcquire的实现方式。另一方面,在没有“取消”操作的情况下,每一个组件的acquire和release都是一个O(1)的操作,忽略park中发生的所有操作系统线程调度。
支持“取消”操作主要是要在acquire循环里的park返回时检查中断或超时。由超时或中断而被取消等待的线程会设置其节点状态,然后unpark其后继节点。在有“取消”的情况下,判断其前驱节点和后继节点以及重置状态可能需要O(n)的遍历(n是队列的长度)。由于“取消”操作,该线程再也不会被阻塞,节点的链接和状态字段可以被快速重建。
acquire操作
if(!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor;
while (pred is not head node || !tryAcquire(arg)) {
if (pred's signal bit is set)
pard()
else
compareAndSet pred's signal bit to true;
pred = node's effective predecessor;
}
head = node;
}
release操作
if(tryRelease(arg) && head node's signal bit is set) {
compareAndSet head's bit to false;
unpark head's successor, if one exist
}