三、AQS中的独占模式
本文以ReentrantLock的非公平锁为例。上半部分主要讲述了获取锁的过程,下半部分主要讲述释放锁的过程。
ReentrantLock中的加锁操作
ReentrantLock是一个可重入锁!compareAndSetState(0, 1)可以理解为获取锁的操作,如果返回true,则获取锁成功。
下面以ReentrantLock.NonfairSync为例,分析ReentrantLock非公平锁的加锁解锁过程。
final void lock() {
//直接尝试获取锁,如果获取锁成功,设置锁的占用线程为当前线程。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
//如果获取锁失败,调用acquire(args)方法,参数为1
else
acquire(1);
}
/**
* acquire由AbstractQueuedSynchronizer提供默认实现
* 1.重新尝试获取锁,若成功,标记锁被占用的线程为当前线程,返回
* 2.失败,将自己添加到队列尾部。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中final boolean compareAndSetState(int expect, int update)
方法,调用的是Unsafe的compareAndSwapInt,保证原子性的。
acquire方法可以分为三步:
- 尝试获取锁,如果成功,直接返回–>tryAcquire(arg)
- 获取锁失败,将自己添加到队列尾部,addWaiter(Node.EXCLUSIVE)
- 如果当前结点的前置节点就是head节点,尝试获取锁,获取失败,执行下一步,成功return
- 挂起当前结点
非公平锁的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 1.重新尝试获取锁,如果成功,设置state,标记锁被占用的线程
* 2.如果锁被当前线程占用,增加重入次数(重入锁,state = currentState + acquires)
* 3.return false
* 4.tryAcquire返回false,继续执行acquireQueued方法。
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
acquireQueued方法的实现
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):
- addWaiter(Node.EXCLUSIVE), arg):创建一个独占模式的入队节点,入队
- 如果当前结点的前置节点就是head节点,尝试获取锁。
- acquireQueued(final Node node, int arg):判断是否需要挂起,不需要则重新尝试获取锁,需要的话,挂起等待被唤醒。
private Node addWaiter(Node mode) {
//基于当前线程创建新结点
Node node = new Node(Thread.currentThread(), mode);
// 直接在队尾添加元素(快速入队)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//并发情况下,compareAndSetTail可能会失败,则快速入队失败
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//enqueue,入队操作
enq(node);
return node;
}
//死循环,支持多线程并发,实际入队操作使用了CAS,提升效率
//只有入队成功,才会退出当前死循环。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果队列没有初始化,初始化队列,重新入队。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//新创建的结点指向队列的尾部。并发下如果CAS操作失败,不断重试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//Node是刚刚入队成功的结点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//如果当前结点的的前置节点就是head,尝试获取锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取锁成功,设置当前结点为头结点
setHead(node);
//前置节点的next设置为null(此时,前置节点p就没有引用,可以被gc掉了)
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败,判断是否需要挂起,如果需要挂起,则挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//中断标记位设置为true
interrupted = true;
}
} finally {
//分析获取锁是否失败,如果失败了,取消获取锁。
if (failed)
cancelAcquire(node);
}
}
前面我们分析过Node中waitStatus的几个状态码的含义:
- SIGNAL: -1,当前结点的后置结点被挂起。所以当前结点释放锁或取消时,当前节点的后继节点必须被唤醒(unpark)
- CANCELLED: 1,节点因为超时或中断等原因被取消了。一旦处于该状态,其状态就不能被改变,特别是不能被挂起。
- CONDITION: -2,当前结点在条件队列中。在结点状态设置为0之前,当前结点都不会作为同步队列中的结点被使用。
- PROPAGATE: -3,共享模式下,当前结点的状态需被无条件的传播下去
- 0: 以上状态都不是
结合上面的状态码,继续分析判断是否需要挂起和线程挂起部分的源码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前置节点的waitStatus状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置结点被挂起,返回true,调用parkAndCheckInterrupt()挂起线程
return true;
if (ws > 0) {
//前置节点被取消了,跳过被取消的结点,知道不是被取消的结点(CANCELLED=1>0,唯一一个大于0的状态)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//此时,剩余的状态只有0和PROPAGATE(-3)。
//设置前置节点为Node.SIGNAL,重新进入acquireQueued的for循环
//(其实就是重新进入shouldParkAfterFailedAcquire方法)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//调用LockSupport.park(this)方法,挂起当前结点
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
当前结点的前置节点的waitStatus为SIGNAL时,当前结点(对应的线程)应当挂起。 因为其前置结点释放锁时,会通知其随后的结点。(参见SIGNAL状态的含义)。
acquire过程总结
ReentrantLock是一个可重入的互斥锁。互斥锁意味着只能由某一个线程占有锁;可重入意味着可被占有锁的线程重复获取锁。
在ReentrantLock的非公平锁中,在锁时可获取状态时(前一个占有锁的线程完全释放锁),所有的结点都会尝试获取锁。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果获取锁失败,则进行入队操作。 如果当前结点的前置节点就是头结点,重新尝试获取锁。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//入队
enq(node);
return node;
}
//竞争失败,挂起当前线程,等待其被前置结点唤醒。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
ReentrantLock中的释放锁操作
释放锁的操作,相较于加锁,还是简单很多的。ReentrantLock中释放锁的主要分为以下几个步骤:
- 释放当前的锁(如果当前线程发生重入,一次只释放一个锁)
- 如果当前线程完全释放了锁,判断是否有挂起的后继节点,如果有唤醒后继节点
//unlock调用release方法
public void unlock() {
sync.release(1);
}
//release方法做两部分的工作:1.释放锁,2.唤醒挂起的后继节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁的逻辑如下:
protected final boolean tryRelease(int releases) {
//释放当前的锁,如果被重入了,这里也只释放这一次的锁
int c = getState() - releases;
//如果当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果释放了当前锁后,当前线程依然拥有锁(重入),设置state为c,返回false
//如果当前线程不在拥有锁,是指锁的持有线程为null,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果当前线程完全释放了锁,如果其后继节点挂起了,唤醒后继节点。
private void unparkSuccessor(Node node) {
//如果当前结点状态小于0,设置当前结点为0,
//如果失败了的话,等待线程在获取锁的时候,会将自身结点设置为head
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//如果后置结点被取消了或者为null,从队列的尾部向前遍历,直到找到最近的那个需要被唤醒的结点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//执行唤醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
ReentrantLock中的公平锁
ReentrantLock的公平锁与非公平锁相比,只是在获取锁的处理上有不同。
//直接调用acquire方法
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors看名称可知,当前是否存在等待的前置节点
//如果不存在,cas获取锁。成功后设置锁被当前线程持有,reture true
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程就是占有锁的线程,重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败
return false;
}
如果获取锁失败,当前线程会被加入到队列中。当持有锁的线程释放锁时,队列中前面的结点会获取锁(FIFO)。
- 公平锁(Fair):加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得
- 非公平锁(Nonfair):加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待
ReentrantLock默认的lock()方法采用的是非公平锁;非公平锁的效率大概是共公平锁的5-10倍。