java锁之ReentrantLock

ReentrantLock 的实现三个关键技术自旋、park—unpark、CAS、AQS

它是一个默认非公平可实现公平的的锁

1
ReentrantLock lock = new ReentrantLock(true);//true表示公平锁    默认false非公平

synchronize 1.6以前即使交替执行也会调用os层面的函数依然是重量级的锁 1.8以后与ReentrantLock差别不大,但是ReentrantLock的Api更加丰富

单线程—-交替执行其实与队列无关–java级别解决同步问题

他是一个可重入锁,会去判断当前线程与正持有锁的线程是否是同一个如果是就调用内部计数器加一并将这个数赋给state。

1
2
3
4
5
6
7
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

AQS全称:AbstractQueuedSynchronizer,抽象队列式同步器。

AQS队列:

galleries-1

整个AQS 队列里队头thread对应的那个node里面的thread不管什么情况都为空,当t2得到线程时会把自己的thread设为null并将与上一节点的引用去掉方便gc回收,队头设为自己。

1
2
setHead(node);
p.next = null;

非公平锁在存在线程竞争的情况下会去尝试加锁成功就获得锁如果失败执行的acquire(1);方法与下面公平锁执行流程一致。

以下是公平锁存在线程竞争执行的时候reentrantlock的执行情况大致如下:

galleries-2

图中根据锁的状态分为了两个分支先说右边分支,t2调用lock方法会去判断锁的状态如果锁处于被持有状态就会去排队调用addWaiter(Node.EXCLUSIVE), arg)。

1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

先实例化一个node再根据线程实例化一个node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

如果队列没被初始化就会再实例一个thread为null的node对象并将它设置为aqs的头和尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

维护t2链表关系,t2入队执行acquireQueued方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//自旋第一次调用tryAcquire尝试获得锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

成功就获得锁失败则进入shouldParkAfterFailedAcquire(p, node)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 上一节点的ws=0
if (ws == Node.SIGNAL) // -1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //将前一个节点的ws状态设置为-1
}
return false;
}

第一次返回false继续执行上层是一个死循环继续执行tryAcquire(arg)方法成功就获得锁失败就继续执行shouldParkAfterFailedAcquire(p, node)进行第二次自旋,此时前一个节点的waitStatus已经是-1直接返回true,继续执行

1
parkAndCheckInterrupt()方法  //执行 LockSupport.park(this); t2挂起

为什么要由后一个节点来设置前一个节点的ws状态呢?

因为上一个节点已经park()了不再往后执行了,一个睡着了的人还能通知外界我睡着了吗?,当然是别人看到我睡着了帮我把门关上告诉其他人我睡着了。为什么不再park()前将自己ws状态设为-1呢?,因为如果你修改了状态说它睡着了但是它park()执行失败了呢,并没有睡着,反而它在房间里偷偷玩起了手机呢。就会出现问题啦,所以才有了这样的设定。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

为什么这里还要自旋呢?

因为能不调用park()方法就尽量不去调用park()方法,park()方法调用的是os层面的函数,一旦调用了就成了一把重量级锁,至于为什么是两次是Doug Lea决定的这涉及到计算机科学了就不是我能考虑的范畴了)

左边分支,如果锁的状态处于自由状态就要去判断自己是否需要排队,判断自己是否需要排队时又两个情况,队列是否初始化,如果没有初始化hasQueuedPredecessors()就会返回false加上!结果就为true,就执行cas加锁。

1
2
3
4
5
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

如果已经被初始化又可以分为队列中元素大于一和队列中元素等于一两种情况,先说大于一的情况。队列中元素大于一 h != t 就返回true继续向下,既然队列元素大于一那(s =h.next) == null必然不成立,为什么这里还有个这个判断呢因为 h != t 不是原子操作在执行这行代码时队列中可能已经出队只剩下一个元素了。不过这种情况应该不大概率出现但是不得不说Doug Lea考虑的是真的周全。回到正题 (s =h.next) == null 等于false继续执行s.thread != Thread.currentThread() 这里是判断当前来询问的线程是否是位于队列排队第一的线程如果是就返回false整体返回false加上!得true执行cas加锁,如果不是位于第一的线程他就没有加锁的资格s.thread != Thread.currentThread()就返回true整体返回true加上!得false就不加锁,叫它继续睡还没轮到它。那什么情况下队列中只有一个元素呢,当整个队列队尾拿到锁的时候,就只有它一个元素h != t因为只有它一个所以头尾都是它,此处返回false不再继续执行整体返回false然后取反得到true执行cas加锁。

打断

1
return Thread.interrupted();要结合lockInterruptibly()方法去看

这个方法会判断在尝试加锁之前是否被打断如果被打断就做出正确响应。其他和lock方法差不多

1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

不同之处在:

lock():

1
2
3
4
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

lockInterruptibly():

1
2
3
4
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}

因为lock和lockInterruptibly都调用的parkAndCheckInterrupt(),所以interrupted = true;是为了整体返回true去执行selfInterrupt();方法去还原用户行为,return Thread.interrupted();对于lockInterruptibly()方法就是为了打断能做出正确响应,对于lock()方法本身而言没有其他的影响及意义。

解锁 unlock()

解锁过程大致如下:

调用unlock()方法后就会对state进行减一操作然后去判断来解锁的线程是否是当前线程如果不是则抛出异常如果是就继续判断c是否等于0也就是判断是否还有线程持有锁,如果不等于0返回false表示只是释放了一次重入锁,如果等于0就将当前持有锁的线程设置为null并将state设置为0。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //state-1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

继续执行会去判断队列中是否有人排队,如果没有就返回true方法正常返回,如果有会将头结点的waitStatus值设为0,线程被唤醒会继续从原本阻塞的位置继续往下执行。

1
2
3
4
5
6
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //设置waitStatus为0
return true;
}
查看评论