深入并发包-lock

仅供学习交流,如有错误请指出,如要转载请加上出处,谢谢

前言

在并发访问一些共享资源的时候,锁是一种实现安全并发访问的手段,在深入并发关键字-synchronized一文中对JVM内置的锁机制进行了深入的分析,而里程碑意义的JDK1.5版本,JAVA大神Doug lea在java的代码层面上开发了一套锁机制,如下图所示:

Lock

在最顶层是一个Lock的接口,他实现了比使用Synchronized的方法和语句更加广泛的锁定操作,代码如下所示(以下代码基于JDK1.8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {
//获取锁
void lock();
//获取未中断线程的锁
void lockInterruptibly() throws InterruptedException;
//尝试在锁为空闲状态获取锁
boolean tryLock();
//尝试在一定的时间段内,并且锁为空闲状态和为中断时获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//获取一个绑定该锁的condition
Condition newCondition();
}

如上代码所示,Lock接口定义了锁的基本语义和生命周期,而在获取锁的方式上,Lock提供了三种不同方式的获取锁操作:(1)可中断,(2)不可中断,(3)定时,适应不同的生产环境中,在其性能特征,排序保证或者其他的实现质量上会有所不同

ReentrantLock

在上一篇文章深入并发包-AQS中,以ReentrantLock为例介绍了AQS在并发同步上的控制,这里就不在赘述了

ReadWriteLock

在ReentrantLock中,每一次获取锁都是独占锁,每一次只能是一个线程进行访问,但是在一些只是读取一些共享变量的场景下,并不需要独占锁来访问,这样并不利于并发性能,于是乎Doug lea将锁分割成一对相关的读锁和写锁更小的两个粒度,这和MySQL中的共享锁和排它锁类似(共享读锁,独享写锁),读锁允许多个线程同时保持运行,写锁是独占锁,只能一个线程保持运行,他们保持着一致性的内存同步,读锁的线程会看见写锁之前的版本所做的更新,ReadWriteLock接口提供了这样的语义,代码如下所示

1
2
3
4
5
6
public interface ReadWriteLock {
//获取读锁
Lock readLock();
//获取写锁
Lock writeLock();
}

ReentrantReadWriteLock

ReentrantReadWriteLock是ReentrantLock接口的实现,在读写锁的基础上加上了重入性,来避免死锁,现在通过一个例子来进入ReentrantReadWriteLock的源码世界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RWLock {
ReadWriteLock readWriteLock ;
private int i;
public RWLock(){
readWriteLock= new ReentrantReadWriteLock();
i = 0;
}
public void setI(int i) {
Lock lock = readWriteLock.writeLock();
lock.lock();
this.i = i;
lock.unlock();
}
public int getI() {
Lock lock = readWriteLock.readLock();
lock.lock();
int r = i;
lock.unlock();
return r;
}
}

对于共享变量i的读与写,通过读写锁来控制,利用语句new ReentrantReadWriteLock()来创建对象

1
2
3
4
5
6
7
8
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

和ReentrantLock一样,锁的机制有两种,默认的是非公平锁,另一种是公平锁,sync实现了AQS,用于并发锁控制(这里不再赘述了,详情可以查看深入并发包-AQS),读写锁共享一个同步器sync,ReadLock和WriteLock是内部类,是Lock的实现类

1
2
3
4
public static class WriteLock implements Lock, java.io.Serializable
{......}
public static class ReadLock implements Lock, java.io.Serializable
{......}

我们先来看写锁的获取与释放

WriteLock

获取锁

在上述的例子,i的setter中,会创建一个写锁来控制共享变量i的写入,通过readWriteLock.writeLock();来创建一个写锁

1
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

然后再通过lock.lock();获取写锁

1
2
3
public void lock() {
sync.acquire(1);
}

这里会调用前面实现的同步器sync的acquire方法

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {
// 这里通过tryAcquire(1)会尝试获取锁,如果获取成功,直接返回true
// 因为有可能直接就成功了呢,也就不需要进队列排队了,
// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

当前线程会调用tryAcquire(arg)方法,尝试去获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//c是锁状态为:高位16位表示共享锁的数量,低位16位表示独占锁的数量
int w = exclusiveCount(c);//取低16位的值,也就是写锁状态位:不等于0表示写锁被占用
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//这里c != 0 and w == 0表示当前没有写锁,只有读锁被占用或者当前线程不是当前独占锁的线程(重入锁的特性),就直接失败,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) //锁的最大值限制最多支持65535个写锁和读锁
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//运行到这里,表示是重入锁,并修改锁的状态
return true;
}
//c==0表示如果当前没有线程获取锁,该线程会获取锁并且CAS设置状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); //成功就设置当前线程获取独占锁
return true;
}
//运行完tryAcquire方法我们回头看看上一级的语句,如下
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt()

如果当前尝试获取失败,会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)放入阻塞队列中,再中断挂起,接下来看看addWaiter方法(注意:加入阻塞队列的方法和ReentrantLock是一样的,如果在深入并发包-AQS已经了解了,可以忽略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 此方法的作用是把线程包装成node,同时进入到队列中
// 参数mode此时是Node.EXCLUSIVE,代表独占模式
private Node addWaiter(Node mode) {
//mode值下一个等待的线程节点,默认为null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
Node pred = tail;
// tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
if (pred != null) {
// 设置自己的前驱 为当前的队尾节点
node.prev = pred;
// 用CAS把自己设置为队尾, 如果成功后,tail == node了
if (compareAndSetTail(pred, node)) {
// 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
// 上面已经有 node.prev = pred
// 加上下面这句,也就实现了和之前的尾节点双向连接了
pred.next = node;
// 线程入队了,可以返回了
return node;
}
}
// 仔细看看上面的代码,如果会到这里,
// 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
enq(node);
return node;
}

如果队列是空的,或者有竞争,就调用enq方法采用自旋的方式来加入阻塞队列,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 之前说过,队列为空也会进来这里
if (t == null) { // Must initialize
// 初始化head节点
// 细心的读者会知道原来head和tail初始化的时候都是null,反正我不细心
// 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
if (compareAndSetHead(new Node()))
// 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
// 这个时候有了head,但是tail还是null,设置一下,
// 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
// 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
// 所以,设置完了以后,继续for循环,下次就到下面的else分支了
tail = head;
} else {
// 下面几行,和上一个方法 addWaiter 是一样的,
// 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 然后再次回到这段代码了
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();

我们再来看一看acquireQueued方法,在进入阻塞队列中,他会再次尝试获取锁,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取当前node的前一个node
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁
// 这里我们说一下,为什么可以去试试:
// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
// tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
// 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

当前线程获取锁失败会调用shouldParkAfterFailedAcquire方法,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//下面的三个判断符合下列三种规则
// 规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark(唤醒),则返回成功,此时 acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
// 规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
// 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
// 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。这里需要知道这点:
// 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔细想想,如果进入到这个分支意味着什么
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 回到下面的代码
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())

如果此线程取消了竞争(也就是waitStatus>0),就会进入阻塞中断,这个时候需要一个契机将他唤醒,所有要不断循环前一个节点作为需要作为唤醒的契机(这在后面的释放锁操作会有说明),返回false,如果返回true的话,接下来会调用parkAndCheckInterrupt方法来检查中断操作,如下所示

1
2
3
4
5
6
// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

释放锁

上述就是获取写锁的过程,在i的setter方法中,将i写入之后会调用lock.unLock()方法去释放锁(这里与深入并发包-AQS中独占锁的解锁过程一样,如有了解,可以忽略)

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}
他会调用release(1)方法,如下所示
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

上述会尝试执行释放锁的操作,调用FairSync实现的tryRelease方法,和获取锁的路子类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁
boolean free = false;
// 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

如果释放锁成功,且head不为空就会调用unparkSuccessor方法来唤醒后继的线程,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果head节点当前waitStatus<0, 将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
// 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}

唤醒的线程会在如下代码继续进行

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}

注意:LockSupport是并发包中针对线程阻塞操作的工具类

ReadLock

分析完例子中i的setter方法中写锁的获取锁与释放锁,现在来看看例子中i的getter方法中读锁的获取锁与释放锁

获取锁

在例子中i的getter方法,通过readWriteLock.readLock()获取读锁

1
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

下面再通过lock.lock()获取读锁

1
2
3
public void lock() {
sync.acquireShared(1);
}

这里会调用同步器sync的acquireShared方法

1
2
3
4
5
//这里会调用tryAcquireShared方法来尝试以共享的模式获取锁,忽略中断
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);//阻塞排队
}

和独占锁的套路一样,都会先去通过tryAcquireShared尝试获取共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
//c是锁状态为:高位16位表示共享锁的数量,低位16位表示独占锁的数量
int c = getState();
//exclusiveCount(c) 取低16位的值,也就是写锁状态位:不等于0表示写锁被占用
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
//有写锁被其他线程占用,获取读锁失败
return -1;
//取高16位的值,读锁状态位
int r = sharedCount(c);
//readerShouldBlock()根据读锁获取策略,返回是否阻塞当前读锁获取操作。后面会详细说明此方法
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//cas修改高16位的读锁状态,即获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//首次获取读锁
if (r == 0) {
//缓存首次获取读锁的线程,及其读锁重入次数
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //如果获取锁(并不是第一个),直接读锁的数量++
firstReaderHoldCount++;
} else {//如果是重入锁
//cachedHoldCounter是最后获取锁的线程的读锁重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//readHolds是缓存了当前线程的读锁重入次数的ThreadLocal
//当前线程自然是最后获取锁的线程,故将当前线程的holdCounter赋给cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
//缓存当前线程的holdCounter
//fullTryAcquireShared()方法中,
//获取读锁失败的线程会执行:readHolds.remove(),故此时需要重新设置
readHolds.set(rh);
rh.count++;
}
return 1;
}
//首次获取读锁失败后,重试获取
return fullTryAcquireShared(current);
}

获取读锁失败的话,会调用fullTryAcquireShared方法以自旋的方式去重新获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
//rh表示当前线程的锁计数器
HoldCounter rh = null;
for (;;) {
int c = getState();
//写锁被占用
if (exclusiveCount(c) != 0) {
//如果其他线程占用,读锁获取失败。如果当前线程占用,表示“锁降级”。
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
//重入锁不需要阻塞。
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
//当前线程就是第一个获取读锁的线程,那么此时当然是重入锁。
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
//线程阻塞之前,清空readHolds
readHolds.remove();
}
}
if (rh.count == 0)
//当前线程的锁计数器为0,非重入锁,需要阻塞。
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//cas设置读锁状态位
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
//缓存首次获取读锁的线程,以及锁计数器
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
//锁计数器放入ThreadLocal
readHolds.set(rh);
rh.count++;
//此时rh就是最后获取读锁的线程的锁计数器
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//回头再看看上一级的方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);//阻塞排队

注意:这里涉及到锁降级的情况,如果在当前线程内,已经拥有了写锁了,再去获取读锁的话,就会将当前线程的写锁降级为读锁,在代码 if (exclusiveCount(c) != 0)if (getExclusiveOwnerThread() != current)两个去判断

如果获取读锁失败的话,调用doAcquireShared方法排队阻塞,和独占锁的acquireQueued方法类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入阻塞等待队列
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取当前node的前一个node
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁
if (p == head) {
int r = tryAcquireShared(arg);//尝试获取锁
if (r >= 0) { //如果获取成功
setHeadAndPropagate(node, r);//设置当前节点为head,并且唤醒后续的节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

我们来看看setHeadAndPropagate方法来唤醒后续的节点

1
2
3
4
5
6
7
8
9
10
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//设置当前节点为头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())//后继节点为共享节点,即读锁
doReleaseShared();//唤醒后续的节点
}
}

我们接下来看看唤醒后续节点的doReleaseShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 判断同步等待队列是否为空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue; // loop to recheck cases
}
// 唤醒 h 的后继节点(同时将 h 等待状态置为 CANCELED)
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
continue; // loop on failed CAS
}
}
if (h == head){
break;// loop if head changed
}
}
}

因为读锁和写锁不同,他是共享锁,他会在队列中唤醒共享锁的所有线程

释放锁

在读完共享变量,就会调用lock.unlock()去释放锁

1
2
3
public void unlock() {
sync.releaseShared(1);
}

在这里会调用releaseShared方法

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

调用tryReleaseShared方法尝试去释放读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果想要释放锁的线程为第一个获取锁的线程
if (firstReader == current) {
//仅获取了一次,则需要将firstReader 设置null,否则 firstReaderHoldCount - 1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//获取rh对象,并更新“当前线程获取锁的信息”
//HoldCounter是一个绑定共享锁的数量和当前线程ID,相当于锁的计数器
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//CAS更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;// state读锁状态减一
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

如果状态更新成功的话就会直接调用doReleaseShared方法去唤醒后续的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;//取头结点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头结点状态为signal
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//更新状态为PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

总结

锁在并发场景中是比较重量级的,但是随着JDK版本的发展,以及对锁的一步步改进,在很多情况下已经显得并不是那么重,这里通常会与JVM内置锁synchronized来相比,这里借用JDK的描述:Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

参考

http://www.cnblogs.com/haolong/p/6268550.html
http://blog.csdn.net/chenssy/article/details/68059443