读写锁ReentrantReadWriteLock
读写锁ReentrantReadWriteLock
解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程同时获取读锁。
读写锁的内部维护了一个ReadLock和一个WriteLock。它们依赖Sync实现具体功能。而Sync继承自AQS,并且也提供了公平和非公平的实现。AQS中只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,ReentrantReadWriteLock巧妙地使用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SAHRED_SHIFT);
// 共享锁线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT)-1;
// 排它锁(写锁)掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1<<SHARED_SHIFT)-1;
// 返回读锁线程数
static int sharedCount(int c){
return c >>> SHARED_SHIFT;
}
// 返回写锁可重入次数
static int exclusiveCount(int c){
return c & EXCLUSIVE_MASK;
}
|
其中firstReader用来记录第一个获取到读锁的线程,firstReaderHoldCount则记录第一个获取到读锁的线程获取读锁的可重入次数。cachedHolderCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数。
1
2
3
4
5
|
static final class HolderCounter{
int count = 0;
// 线程id
final long tid = getThreadId(Thread.currentThread());
}
|
readHolds是ThreadLocal变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。ThreadLocalHoldCounter继承了ThreadLocal,因而initialValue方法返回一个HoldCounter对象。
1
2
3
4
5
|
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter>{
public HoldCounter initialValue(){
return new HoldCounter();
}
}
|
写锁的获取与释放
在ReentrantReadWriteLock中写锁使用WriteLock来实现。
1、void lock()
写锁是个独占锁,某时只有一个线程可以获取该锁。如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。如果已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。另外,写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单地把可重入次数加一后直接返回。
1
2
3
4
5
6
7
8
9
10
|
public void lock(){
sync.acquire(1);
}
public final void acquire(int arg){
// sync重写的tryAcquire方法
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
|
如以上代码所示,在lock()内部调用了AQS的acquire方法,其中tryAcquire是ReentrantReadWriteLock内部的sync类重写的,代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
protected final boolean tryAcquire(int acquires){
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 1、c!=0说明读锁或者写锁已经被某线程获取
if(c!=0){
// 2、w=0说明已经有线程获取了读锁,w!=0并且当前线程不是写锁拥有者,则返回false
if(w==0 || current != getExclusiveOwnerThread())
return false;
// 3、说明当前线程获取了写锁,判断可重入次数
if(w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 4、设置可重入次数
setState(c+acquires);
return true;
}
// 5、第一个写线程获取写锁
if(writerShouldBlock() || !compareAndSetState(c,c+acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
|
在代码1中,如果当前AQS状态值不为0则说明当前已经有线程获取到了读锁或者写锁。在代码2中,如果w==0说明状态值的低16位为0,而AQS状态值不为0,则说明高16位不为0,这表示已经有线程获取了读锁,所以直接返回false。
而如果w!=0则说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,如果不是则返回false。
执行到代码3说明当前线程之前已经获取到了该锁,所以判断该线程的可重入次数是不是超过了最大值,是则抛出异常,否则执行代码4增加当前线程的可重入次数,然后返回true。
如果AQS的状态值等于0则说明目前没有线程获取到读锁和写锁,所以执行代码5,其中,对于writerShouldBlock方法,非公平锁的实现为:
1
2
3
|
final boolean writerShouldBlock(){
return false;
}
|
如果代码对于非公平锁来说总是返回false,则说明代码5抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回true,否则返回false。
公平锁的实现为:
1
2
3
|
final boolean writerShouldBlock(){
return hasQueuedPredcessors();
}
|
这里还是使用hasQueuedPredcessors来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限,直接返回false。
2、void lockInterruptibly()
类似于lock()方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出异常InterruptedException异常。
1
2
3
|
public void lockInterruptibly() throws InterruptedException{
sync.acquireInterruptibly(1);
}
|
3、boolean tryLock()
尝试获取写锁,如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回true。如果当前已经有其他线程持有写锁或者读锁则该方法直接返回false,且当前线程并不会被阻塞。如果当前线程已经持有了该写锁则简单增加AQS的状态值后直接返回true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public boolean tryLock(){
return sync.tryWriteLock();
}
final boolean tryWriteLock(){
Thread current = Thread.currentThread();
int c = getState();
if(c != 0){
int w = exclusiveCount(c);
if(w == 0 || current != getExclusiveOwnerThread())
return false;
if(w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if(!compareAndSetState(c,c+1))
return false;
setExclusiveOwnerThread(current);
return true;
}
|
与tryAcquire方法类似,不同在于这里使用的是非公平策略。
4、boolean tryLock(long timeout,TimeUnit unit)
与tryAcquire()的不同之处在于,多了超时时间参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回false。另外,该方法会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出InterruptedException异常。
1
2
3
|
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException{
return sync.tryAcquireNanos(1,unit.toNanos(timeout));
}
|
5、void unlock()
尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的AQS状态值减1,如果减去1后当前状态值为0则当前线程会释放该锁,否则仅仅减1而已。如果当前线程没有持有该锁而调用了该方法则会抛出IllegalMonitorStateException异常。代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public void unlock(){
sync.release(1);
}
public final boolean release(int arg){
// 调用ReentrantReadWriteLock中sync实现的tryRelease方法
if(tryRelease(arg)){
// 激活阻塞队列里面的一个线程
Node h = head;
if(h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases){
// 6、看是否是写锁拥有者调用的unlock
if(!isHeldExclusively())
throw new IllegalMonitorStateException();
// 7、获取可重入值,这里没有考虑高16位,因为获取写锁时读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 8、如果写锁可重入值为0则释放锁,否则只是简单地更新状态值
if(free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
|
在如上代码中,tryRelease首先通过isHeldExclusively判断是否当前线程是该写锁的持有者,如果不是则抛出异常,否则执行代码7,这说明当前线程持有写锁,持有写锁说明状态值的高16位为0,所以这里nextc值就是当前线程写锁的剩余可重入次数,代码8判断当前可重入次数是否为0,如果free为true则说明可重入次数为0,所以当前线程会释放写锁,将当前锁的持有者设置为null。如果free为false则简单地更新可重入次数。
读锁的获取与释放
ReentrantReadWriteLock中的读锁是使用ReadLock来实现的。
1、void lock()
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS的状态值的高16位的值会增加1,然后方法返回。否则如果其他一个线程持有写锁,则当前线程会被阻塞。
1
2
3
4
5
6
7
8
9
10
11
|
public void lock(){
sync.acquireShared(1);
}
public final void acquireShared(int arg){
// 调用ReentrantReadWriteLock中的sync的tryAcquireShared方法
if(tryAcquireShared(arg) < 0){
// 调用AQS的doAcquireShared方法
doAcquireShared(arg);
}
}
|
在如上代码中,读锁的lock方法调用了AQS的acquireShared方法,在其内部调用了ReentrantReadWriteLock中的sync重写的tryAcquireShared方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
protected final int tryAcquireShared(int unused){
// 1、获取当前状态值
Thread current = Thread.currentThread();
int c = getState();
// 2、判断是否写锁被占用
if(exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 3、获取读锁计数
int r = sharedCount(c);
// 4、尝试获取锁,多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
if(!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c,c+ SHARED_UNIT)){
// 5、第一个线程获取读锁
if(r == 0){
firstReader = current;
firstReaderHoldCount = 1;
}
// 6、如果当前线程是第一个获取读锁的线程
else if(firstReader == current){
firstReaderHoldCount++;
}else{
// 7、记录最后一个获取读锁的线程或记录其他线程读锁的的可重入数
HolderCounter rh = cachedHoldCounter;
if(rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if(rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
// 8、类似tryAcquireShared,但是是自旋获取
return fullTryAcquireShared(current);
}
|
如上代码首先获取了当前AQS的状态值,然后代码2查看是否已经有其他线程获取到了写锁,如果是则直接返回-1,而后调用AQS的doAcquireShared方法把当前线程放入AQS阻塞队列。如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。
否则执行代码3得到获取到的读锁的个数,到这里说明目前没有线程获取到写锁,但是可能有线程持有读锁,然后执行代码4。其中非公平锁的readerShouldBlock实现代码:
1
2
3
4
5
6
7
8
9
10
11
|
final boolean readerShouldBlock(){
return apparentlyFirstQueuedIsExclusive();
}
final boolean apprentlyFirstQueuedIsExclusive(){
Node h,s;
return (h = head) != null &&
(s= h.next) != null &&
!s.isShared() &&
s.thread != null;
}
|
如上代码的作用是,如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否达到了最大值。最后执行CAS操作将AQS状态值的高16位值增加1。
代码5、6记录第一个获取读锁的线程并统计该线程获取读锁的可重入次数。代码7使用cachedHoldCounter记录最后一个获取到读锁的线程和该线程获取读锁的可重入次数。readHolds记录了当前线程获取读锁的可重入数。
如果readerShouldBlock返回true则说明有其他线程正在获取写锁,所以执行代码8,fullTryAcquireShared的代码与tryAcquiredShared类似,它们的不同在于,前者通过循环自旋获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
final int fullTryAcquireShared(Thread current){
HolderCounter rh = null;
for(;;){
int c = getState();
if(exclusiveCount(c) != 0){
if(exclusive(c) != 0){
if(getExclusiveOwnerThread() != current)
return -1;
}else if(readerShouldBlock()){
if(firstReader == current){
}else {
if(rh == null){
rh = cachedHolderCounter;
if(rh == null || rh.tid != getThreadId(current)){
rh = readHolds.get();
if(rh.count == 0)
readHolds.remove();
}
}
if(rh.count ==0 )
return -1;
}
}
if(sharedCount(c) == MAX_COUNT)
thorw new Error("Maximum lock count exceeded);
if(compareAndSetState(c,c+SHARED_UNIT)){
if(sharedCount(c) == 0){
firstReader = current;
firstReaderHolderCount = 1;
}else if(firstReader == current){
firstReaderHoldCount++;
}else{
if(rh == null)
rh = cachedHolderCounter;
if(rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
}
|
2、void lockInterruptibly()
类似于lock()方法,不同之处在于,该方法会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出InterruptedException异常。
3、boolean tryLock()
尝试获取读锁,如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回true。如果当前已经有其他线程持有写锁则该方法直接返回false,但当前线程并不会被阻塞。如果当前线程已经持有了该读锁则简单增加AQS的状态值高16位后直接返回true。其代码类似tryLock的代码。
4、boolean tryLock(long timeout,TimeUnit unit)
与tryLock()的不同之处在于,多了超时时间参数,如果尝试获取读锁失败会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果此时还没有获取到读锁则返回false。另外,该方法对中断响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出Interrupted异常。
5、void unlock()
1
2
3
|
public void unlock(){
sync.releaseShared(1);
}
|
如上代码具体释放锁的操作是委托给Sync来做的,sync.releaseShared方法的代码如下:
1
2
3
4
5
6
7
|
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
|
其中tryReleaseShared的代码如下:
1
2
3
4
5
6
7
8
9
10
11
|
protected final boolean tryReleaseShared(int unused){
Thread current = Thread.currentThread();
// 循环直到自己的读计数-1,CAS更新成功
for(;;){
int c = getState();
int nextc = c -SHARED_UNIT;
if(compareAndSetState(c,nextc))
return nextc == 0;
}
}
|
如以上代码所示,在无限循环里边,首先获取当前AQS状态值并将其保存到变量c,然后变量c被减去一个读计数单位后使用CAS操作更新AQS状态值,如果更新成功则查看当前AQS状态值是否为0,为0则说明当前已经没有读线程占用读锁,则tryReleaseShared返回true。然后会调用doReleaseShared方法释放一个由于获取写锁而被阻塞的线程,如果当前AQS状态值不为0,则说明当前还有其他线程持有了读锁,所以tryReleaseShared返回false。如果tryReleaseShared中的CAS更新AQS状态值失败,则自旋重试直到成功。
使用ReentrantReadWriteLock
之前用ReentrantLock实现过线程安全的list,但是由于ReentrantLock是独占锁,所以在读锁写少的情况下性能很差。下面使用ReentrantReadWriteLock改造:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public static class ReentrantLockList{
// 线程不安全的list
private ArrayList<String> array = new ArrayList<String>();
// 独占锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Locl writeLock = lock.writeLock();
// 添加元素
public void add(String e){
writeLock.lock();
try{
array.lock();
}finally{
writeLock.unlock();
}
}
// 删除元素
public void remove(String e){
writeLock.lock();
try{
array.remove(e);
}finally{
writeLock.unlock();
}
}
// 获取数据
public String get(int index){
readLock.lock();
try{
return arrays.get(index);
}finally{
readLock.unlock();
}
}
}
|
以上代码调用get方法时使用的是读锁,这样运行多个读线程来同时访问list的元素,这在读多写少的情况下性能会更好。