在实际开发中经常使用到 ReentrantLock 来这把锁来实现线程安全,但是只知道怎么用,却不知道底层到底怎么去实现的,而 ReentrantLock 又是基于 AQS (AbstractQueuedSynchronizer)实现的,本文源码角度分析从创建一把 Lock 锁开始看看内部到底是怎么加锁解锁,以及如何处理锁竞争失败的。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 分别有两个构造函数,一个是无参的构造函数默认创建的是一个非公平的同步队列器,而另一个有参的可以通过参数指定创建一个公平或非公平的同步队列器,fair 为 ture 的话,说明创建的是一个公平的同步队列器,那么这个 sync 成员变量是什么东西了?和我们的 AQS 有是什么关系了?在 ReentrantLock 有三个内部类,而这些同步队列器就是我们的这三个内部类。

abstract static class Sync extends AbstractQueuedSynchronizer
Sync 是一个抽象类并且继承自 AQS
static final class FairSync extends Sync
公平的同步队列器 FairSync 则是继承自 Sync
static final class NonfairSync extends Sync
公平的同步队列器 FairSync 同样也是继承自 Sync
另外 ReentrantLock 还实现了 Lock 接口,这个接口定义的就是我们通常使用锁会有那些方法,接下来就通过 Lock 接口来分析 ReentrantLock 是如何去实现这些方法的。
public interface Lock {
/**
* 加锁没有等待时间上限,被中断以后不会抛异常,继续运行尝试获取锁或者继续阻塞等待
*/
void lock();
/**
* 加锁,被中断以后会抛出异常结束
*/
void lockInterruptibly() throws InterruptedException;
/**
* 加锁,尝试获取锁一次,获取成功返回ture,失败返回false
*/
boolean tryLock();
/**
* 加锁有等待时间上限,在等待过程中被中断则抛出异常结束
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 解锁
*/
void unlock();
/**
* 条件等待队列
*/
Condition newCondition();
/**
* 实现 Lock 接口的 lock() 方法
*/
public void lock() {
sync.lock();
}
首先在 ReentrantLock 的 lock 方法内部调用了同步队列器的 lock 方法,而这里的 sync 就取决于你在创建 ReentrantLock 对象时最终赋值的是 FairSync 公平同步队列器还是 NonfairSync 非公平同步队列器,这里我们假定 sync 是一个 NonfairSync 类型的即一把非公平锁。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 该方法实现自父类 Sync
*/
final void lock() {
// 使用CAS将 AbstractQueuedSynchronizer类的 state 属性从0修改为1
// 如果CAS修改成功说明获取锁成功
if (compareAndSetState(0, 1))
// 设置当前锁的持有线程为自己
setExclusiveOwnerThread(Thread.currentThread());
else
// 获取锁失败流程
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 非公平方式获取锁
return nonfairTryAcquire(acquires);
}
}
在 AQS 类有一个成员变量 state,对于 ReentrantLock 这种独占锁来说,其实所谓的获取锁,就是如果能通过 CAS 将 state 这个变量从 0 改为 1 就说明获得了锁,而释放锁就更简单了就是把 1 改成 0,而剩下要解决的工作就是当多个线程竞争锁,获取锁失败的线程要如何妥当处置它们,然后锁释放了以后,又如何去选择唤醒正在阻塞等待的线程。
private volatile int state; // AbstractQueuedSynchronizer类的成员变量
接下来我们看 acquire(1) 获取锁失败时怎么处理的
// 定义在 AbstractQueuedSynchronizer 类的
public final void acquire(int arg) {
// tryAcquire(arg) 再次尝试获取锁,返回false则获取锁失败
if (!tryAcquire(arg) &&
// addWaiter(Node.EXCLUSIVE) 将当前线程放入阻塞队列,并表示该节点是以独占模式
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 当获取锁失败且被中断过时会进入到这里执行
selfInterrupt();
}
首先在进入阻塞前,还会尝试调用 tryAcquire 方法去获取锁,其实这里最终调用的是 NonfairSync 非公平同步队列器的 tryAcquire 方法,而在这个方法里面又调用了父类 Sync 的 nonfairTryAcquire 方法来尝试获取锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果当前 AQS 的 state 等于0,说明锁没有线程被持有
// 那么就尝试使用 CAS 获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前的线程与持有锁线程是同一个
else if (current == getExclusiveOwnerThread()) {
// 则将 AQS 的state 加上 acquires
// 支持锁的重入
int nextc = c + acquires;
if (nextc < 0) // overflow
// 检查锁的重入次数是否达到 int 最大值已经溢出
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果这里调用 tryAcquire 尝试获取锁失败了返回 false,因为它对这个结果取反了,所以会进入到第二个与条件流程执行:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),首先会调用 addWaiter 方法将当前线程加入到阻塞等待队列。
private Node addWaiter(Node mode) {
// 将节点类型与当前线程引用封装成一个Node 节点对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 如果pred不等于null,说明这个队列已经有阻塞节点存在了
node.prev = pred; // 那么将新加入的节点前置指针指向当前队列的尾部节点,即尾插法
if (compareAndSetTail(pred, node)) { // 使用 CAP 将 tail 变更指向为新加入的节点
pred.next = node; // 如果变更成功,最后再把新加入的尾节点与它的前置节点关联,形成双向链表
return node;
}
}
// 走到这里的话,有两种情况:
// 1.compareAndSetTail 修改尾节点失败
// 2.当前队列还是空的
enq(node);
return node;
}
这里加入阻塞等待队列,对于新加入的节点使用的是尾插法,如果队列是空的还未初始化或者 CAS 修改尾节点指针失败的话会进入 enq 方法:
private Node enq(final Node node) {
for (;;) { // 注意这里会不断循环CAS 尾插确保每个节点都能入队完成
Node t = tail;
if (t == null) { // Must initialize
// 初始化队列,创建一个空的节点作为头节点
if (compareAndSetHead(new Node()))
// 并将尾节点也指向这个空的头节点
tail = head;
} else {
// 将新加入的节点前置指针指向尾节点
node.prev = t;
// 然后使用CAS修改尾结点指向最新加的参数node节点,来达到尾插入队的操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
当完成当前线程入队以后,此时还未调用 park 方法真正暂停线程,只是将创建入队完成的节点返回了,接下来会执行 acquireQueued 方法:
// 返回值记录当前线程是否被打断过
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 无限循环
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果当前节点的前置节点是头节点的话,会再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,将当前节点设置为头节点
setHead(node);
// 旧的头节点 next 指向引用取消
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire(p, node) 确保当前的前置节点 waitStatus 变更为 SIGNAL
if (shouldParkAfterFailedAcquire(p, node) &&
// 调用park阻塞当前线程,返回中断标记位并清除
parkAndCheckInterrupt())
// 表示被中断过
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
并发编程 — Dec 20, 2022