调度算法
前面我们看了可重入锁ReentrantLock,其实这个锁只适用于写多读少的情况,就是多个线程去修改一个数据的时候,适合用这个锁,但是如果多个线程都去读一个数据,还用这个锁的话会降低效率,因为同一时刻只能是一个线程去读取!
本次我们看看读写锁ReentantReadWriteLock,这个锁采用了读写分离的策略,分成了读锁和写锁,多个线程可以同时获取读锁;
一.简单使用读写锁
啥也别问,问就是先会用了再说,还记得前面用ReentrantLock实现了一个线程安全的List吗?我们可以使用读写锁稍微改造一下就好了;
package com.example.demo.study; import java.util.ArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Study0204 { // 线程不安全的List private ArrayList<String> list = new ArrayList<String>(); // 读写锁 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // 获取读锁 private final Lock readLock = lock.readLock(); // 获取写锁 private final Lock writeLock = lock.writeLock(); // 往集合中添加元素,写锁 public void add(String str) { writeLock.lock(); try { list.add(str); } finally { writeLock.unlock(); } } // 删除集合中的元素,写锁 public void remove(String str) { writeLock.lock(); try { list.remove(str); } finally { writeLock.unlock(); } } // 根据索引获取集合中某个元素,读锁 public String get(int index) { readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } }
二.读写锁的结构
这里最核心的还是用了AQS,可以看到里面还是有一个Sync这个内部工具类,然后还有两个内部工具类,一个是读锁ReadLock,一个是写锁WriteLock
我们还能看到Sync这个类就是继承AQS,然后有NonfairSync和FairSync这两个类去继承Sync,到这里结构还是和ReentrantLock是一样的;
我们再看看读锁和写锁,可以看出来就是实现了Lock这个接口,然后通过传进去的sync对象去实现Lock中的所有方法
大概的结构就是这样的,我们可以使用下面这个图显示出来,其实ReentrantReadWriteLock最重要的就是三个类:
一个是Sync工具类用于操作AQS阻塞队列和state的值,而且有基于Sync实现的公平策略和非公平策略;
一个是写锁,实现了Lock接口,内部有个Sync字段,在Lock的实现方法中就是调用Sync对象的方法去实现的
另外一个是读锁,和写锁一样,实现了Lock接口,内部有个Sync字段,在Lock的实现方法也都是调用Sync对象的方法实现
二.分析Sync
上篇博客中我们知道了在ReentrantLock中的state表示的是锁的可重入次数,而且state是AQS中定义的int类型,那么在读写锁这里有两个状态是怎么表示呢?
总有一些人会想到一些花里胡哨的东西,还别说,挺管用的,由于state是int类型,共有32位,我们可以一分为二,前面的16位叫做高16位,表示获取读锁的次数,后面的叫做的低16位,表示写锁的可重入次数,具体的,我们可以看看Sync类的属性,主要是涉及到基本的二进制运算,有兴趣的可以研究一下;
abstract static class Sync extends AbstractQueuedSynchronizer { //这个可以说是读锁(共享锁)移动的位数 static final int SHARED_SHIFT = 16; //读锁状态单位值,这里就是将1有符号左移16位,1用二进制表示为:00000000 00000000 00000000 00000001 //左移16位之后:00000000 00000001 00000000 00000000,也就是2的16次方,就是65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁的线程最大数65535个 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //写锁(排它锁)掩码,这里用二进制表示 00000000 00000000 11111111 11111111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //返回读锁的个数,这里也就是将state无符号右移16位,那么有效数字肯定就是高16位,转成十进制后就是获取读锁的次数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //返回写锁的个数,这里就是将state和上面的写锁掩码做按位与运算,高16位被置为0,有效数字位第16位,转成十进制就是写锁的可重入次数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //省略很多代码 }
其中,Sync中还有几个比较重要的属性如下,不懂不要紧,后面用到了再回头看看就好;
//记录第一个获取读锁的线程 private transient Thread firstReader = null; //记录第一个获取读锁的线程继续获取读锁的可重入次数 private transient int firstReaderHoldCount; //记录最后一个获取读锁的线程获取读锁的可重入次数,HoldCounter类如下 private transient HoldCounter cachedHoldCounter; static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } //记录除去第一个获取读锁的线程外,获取的读锁的可重入次数,ThreadLocalHoldCounter类如下 private transient ThreadLocalHoldCounter readHolds; static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
三.写锁的获取和释放
写锁在获取的时候有一个前提:没有其他线程持有写锁或者读锁,当前线程才能获取写锁,否则就把当前线程丢到阻塞队列里去了,记住,不能一边读还一边写!
1.lock方法:
从头学pytorch(二十一):全连接网络dense net
写锁主要是ReentantReadWriteLock中的内部类WriteLock类实现的,这是一个独占锁,同一时刻只能有一个线程可以获取该锁,时刻重入的;
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //这个方法的是实现static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; },就是将state的低十六位转为十进制, //也就是写锁的可重入次数 int w = exclusiveCount(c); //state不为0,说明读锁或者写锁被占用了 if (c != 0) { //如果w==0,而c!=0,说明c的高16为不为0,即有线程获取了读锁,此时写锁是不能获取的,注意,别人在读的时候是不能写入的呀!返回false //如果w!=0表示有线程获取了写锁,但是占用锁的线程不是当前线程,那么线程获取写锁失败,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //到这里说明写锁可以获取成功,那么就要判断写锁的可重入次数是否大于65535 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //将state加一 setState(c + acquires); return true; } //能到这里来,说明c==0,也就是说读锁和写锁都在空闲着,下面我们要看看公平策略下和非公平下的writerShouldBlock实现 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
我们要看看最后的那里的if语句,其中writerShouldBlock的实现,能到这里说明读锁和写锁都在空闲这,可以随时去获取;
非公平策略下始终返回的是false,于是会走到compareAndSetState(c, c + acquires),这里是用CAS尝试获取写锁,获取失败的话就返回发了;获取成功的话就走到setExclusiveOwnerThread(current);设置占用读锁的线程是当前线程;
公平策略下的话,这个方法前面好像说过,就是判断当前线程节点前面有没有前驱节点,如果有的话那就肯定获取失败啊,要让前驱节点先获取,于是在上面最后的if那里直接返回false;如果这里判断没有前驱节点,这里返回true,那么上面就会走到最后setExclusiveOwnerThread(current)设置当前线程占有写锁
2.tryLock方法
这个方法和上面的lock方法一样,注意,这里最后那里默认使用的是非公平模式;
public boolean tryLock( ) { return sync.tryWriteLock(); } final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } //这里默认使用的是非公平模式 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
3.unlock方法
public void unlock() { sync.release(1); } //这个是AQS中的方法,说过tryRelease留给具体子类去实现的,重点看看怎么实现的 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //isHeldExclusively方法在下面,因为是当前线程调用的release方法,要判断当前线程是不是持有写锁的线程,不是的话就抛错了 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //state减一 int nextc = getState() - releases; //获取低十六位,看是不是等于0,如果等于0,说明此时没有线程占用写锁,于是就调用setExclusiveOwnerThread(null) //将占用写锁的线程设置为null,最后更新state就行了 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
四.读锁的获取和释放
结合前面的写锁一起说一下:
(1).没有其他线程持有写锁或者读锁,当前线程才能获取写锁,否则就把当前线程丢到阻塞队列里去了;当前线程获取了写锁之后,其他线程不能获取写锁和读锁;
(2)没有其他线程获取写锁时,当前线程才能获取读锁,否则就丢到阻塞队列里去了,不能 一边读一边写;当前线程获取了读锁之后,其他线程只能获取读锁,不能获取写锁;
(3)当前线程获取的写锁之后,还能继续获取写锁,这叫做可重入;也可以继续获取读锁,这叫做锁降级;
(4)当前线程获取的读锁之后,还能继续获取读锁;
1.lock方法
public void lock() { //acquireShared方法在AQS中 sync.acquireShared(1); } public final void acquireShared(int arg) { //tryAcquireShared实现在ReentrantReadWriteLock中的Sync中 if (tryAcquireShared(arg) < 0) //这个方法在AQS中,主要是将当前线程放到阻塞队列中 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //这里就是判断:如果其他线程获取了写锁,那么就返回-1 //先判断写锁的可重入次数不为0,表示有线程占用写锁,而且还不是当前线程,那么直接返回-1 //这里注意一下:一个线程在获取写锁之后,还可以再获取读锁,释放的时候两个所都要释放啊!!! if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //这个方法获取读锁的次数,读锁的次数最多只能是65535个 int r = sharedCount(c); //readerShouldBlock方法分为公平策略和非公平策略,这个方法的意思就是:当前线程获取已经获取读锁,再读锁被阻塞了,那么说明还有其他线程正在获取写锁 //如果返回false,说明此时没有线程获取写锁,而且这个方法分为公平策略和非公平策略 //公平策略的话,如果当前线程节点有前驱节点就返回true,没有前驱节点返回false; //非公平策略的话,判断阻塞队列中哨兵节点后面的那个节点是不是正在获取写锁,是的话返回true,不是的话返回false //compareAndSetState(c, c + SHARED_UNIT)方法中,SHARED_UNIT表示65536,这个CAS表示对高16为增加1,对于整个32位来说,就是加2的16次方 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r==0表示读锁空闲,于是记录第一个读锁,和第一个获取读锁的线程获取读锁的可重入次数 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; //如果当前线程就是第一个获取读锁的线程,再获取读锁,这里就将可重入次数加一即可 } else if (firstReader == current) { firstReaderHoldCount++; } else { //能到这里就说明读锁已经被其他线程占用,当前线程是最后一个是最后获取读锁的线程,我们更新一下cacheHoldCounter和readHolds就行了 //cacheHoldCounter表示最后一个获取读锁的线程获取读锁的可重入次数 //readHolds记录了除了第一个获取读锁的线程外,其他线程获取读锁的可重入次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //能到这里说明readerShouldBlock方法返回的是true,而且当前线程在之前已经获取了写锁,再获取读锁,就是锁降级!!! return fullTryAcquireShared(current); } //锁降级操作 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //写锁被其他线程占用,就返回-1 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //获取读锁被阻塞,此时还有其他线程在获取写锁, } else if (readerShouldBlock()) { if (firstReader == current) { } else { //有其他线程在尝试获取写锁,结束当前线程获取读锁,就更新一下readHolds就行了 //就从readHolds中移除当前线程的持有数,然后返回-1,结束尝试获取锁步骤 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //读锁数量达到了最大数量就抛错 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS更新读锁的数量,然后更新一些变量 if (compareAndSetState(c, c + SHARED_UNIT)) { //读锁数量为0,就让当前线程作为第一个获取读锁的线程 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //当前线程已经获取过读锁了,就把第一个获取读锁的可重入次数加一 } else if (firstReader == current) { firstReaderHoldCount++; } else { //这里前面说过了 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
2.tryLock方法
public boolean tryLock() { return sync.tryReadLock(); } final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); //如果当前写锁已经被占用,获取读锁失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); //读锁不能超过最大数量 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //更新state,将高16位加一,更新成功,如果读锁没有线程占有,就把当前线程更新为第一个获取读锁的线程和更新第一个获取读锁的可重入次数 if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; //当前线程就是第一个获取读锁的线程,就将可重入次数加一 } else if (firstReader == current) { firstReaderHoldCount++; } else { //到这里说明当前线程获取读锁成功,虽然不是第一个获取读锁的线程,于是更新一下cachedHoldCounter和readHolds //cachedHoldCounter:最后一个线程获取读锁的可重入次数 //readHolds:除去第一个线程,其他线程获取读锁的可重入次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
3.unlock方法
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { //实现在下面,就是尝试释放读锁,并判断还有没有线程占用读锁,没有线程占用读锁,就会进入到if里面doReleaseShared方法 if (tryReleaseShared(arg)) { //前面可能有一些线程在获取写锁的时候,由于当前线程读锁没有释放,所以那些线程就被阻塞了 //当前方法就是把那些线程释放一个 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //如果当前线程是第一个获取读锁的线程 if (firstReader == current) { //第一个线程获取读锁的可重入次数为1,就释放,否则,可重入次数减一 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //当前线程不是第一个获取读锁的线程就更新cachedHoldCounter和readHolds } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //这里一个无限循环,获取state,将高十六位减一,用CAS更新,更新成功的话,就判断读锁有没有被占用 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
五.总结
我们用下面这个图来总结一下ReentrantReadWriteLock,这个锁就是利用state是32位的,高16位用于表示读锁的个数,低16位表示写锁的可重入次数,通过CAS对其进行了读写分离,适用于读多写少的场景;
微信高并发抢红包秒杀实战案例