AQS
AQS(AbstarctQueuedSynchronizer)是一个用于构建锁、同步工具的工具类(框架),AQS解决了在实现同步容器时大量的细节问题,比如ReentrantLock
,ReentrantReadWriteLock
、Semaphore
、CountDownLatch
等底层都用到了AQS
AQS内部原理
- state状态:在AQS中state是使用
volatile
修饰的变量,为了保证线程安全需要使用AQS中提供的getState()
、setState()
和compareAndSetState()
方法来操作,这些方法都依赖与原子类,根据用户的使用情况state的意义也不同,比如- 在
Semaphore
中就表示剩余信号量的数量 - 在
CountDownLatch
中就表示倒数到几了 - 在
ReentrantLock
中就表示锁的重入次数
- 在
- 控制线程抢锁和配合的先进先出双向队列:AQS维护的一个用来存放等待线程的队列,是一个双向链表形式,头节点是已经拿到锁的线程
- 当多个线程竞争同一把锁时,竞争失败的线程会被AQS放入该队列,同时阻塞该线程
- 当锁释放时,AQS会从队列中唤醒线程来占有刚刚释放的锁
- 期望我们去实现的获取和释放等方法:这些方法由我们去实现,AQS内部会用到我们实现的方法,根据实现情况意义也不相同
- 获取方法:依赖state变量,在获取不到锁时会被阻塞;独占锁重写
tryAcquire()
方法/共享锁重写 - 释放方法:不会造成阻塞;独占锁重写
tryRelease()
方法/共享锁重写``
- 获取方法:依赖state变量,在获取不到锁时会被阻塞;独占锁重写
AQS使用方式
根据阅读ReentrantLock
,ReentrantReadWriteLock
、Semaphore
、CountDownLatch
等底层源码发现AQS的使用流程大致如下
-
内部写一个私有的静态内部类
Sync
继承自AbstarctQueuedSynchronizer
-
从使用层面来说,AQS的锁功能分独占锁和共享锁,所以要重写不同的方法
- 独占锁:
boolean tryAcquire(int arg)
:配合调用acquire()
和acquireInterruptibly()
控制线程方法,该方法返回false线程会被阻塞boolean tryRelease(int arg)
:配合调用release()
控制线程方法,该方法返回true会释放锁
- 共享锁:
int tryAcquireShared(int arg)
:配合调用acquireShared()
和acquireInterruptibly()
控制线程方法,该方法返回小于0的值线程会被阻塞boolean tryReleaseShared(int arg)
:配合调用releaseShared()
控制线程方法,该方法返回true会唤醒等待线程
- 独占锁:
-
使用内部类
Sync
创建对象,构造方法中传入state
的初始值 -
控制线程方法中调用
Sync
对象的方法-
独占锁:
void acquire(int arg)
:会调用到我们重写的tryAcquire()
方法void acquireInterruptibly(int arg)
:会调用到我们重写的tryAcquire()
方法,该方法可被中断boolean release(int arg)
:会调用到我们重写的tryRelease()
方法
-
共享锁:
void acquireShared(int arg)
:会调用到我们重写的tryAcquireShared()
方法void acquireSharedInterruptibly(int arg)
:会调用到我们重写的tryAcquireShared()
方法,该方法可被中断boolean releaseShared(int arg)
:会调用到我们重写的tryReleaseShared()
方法
-
AQS使用示例
/**
* 只有一个信号量的Semaphore工具类
*/
public class MySemaphore {
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync() {
setState(1);
}
@Override
protected int tryAcquireShared(int acquires) {
if (getState() == 0) { //若已经没有信号量了,就被阻塞
return -1;
}
do { //否则,就是有信号量,就自旋将信号量设置为0,代表被线程拿走了
int state = getState();
if (compareAndSetState(state, 0)) {
return 1; //设置成功后该线程继续执行
}
} while (true);
}
@Override
protected boolean tryReleaseShared(int releases) {
setState(1); //归还信号量,将信号量设置为1
return true; //信号量归还后,唤醒其他线程来使用信号量
}
}
private final Sync sync;
public MySemaphore() {
this.sync = new Sync();
}
public void acquire() {
sync.acquireShared(0);
}
public void release() {
sync.releaseShared(0);
}
public static void main(String[] args) throws InterruptedException {
MySemaphore mySemaphore = new MySemaphore();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 50; i++) { //开启50个线程执行
executorService.submit(() -> {
mySemaphore.acquire(); //获取信号量
System.out.println(Thread.currentThread().getName() + "拿到许可证正在执行...");
try {
Thread.sleep(new Random().nextInt(3000));
System.out.println(Thread.currentThread().getName() + "执行结束,归还许可证");
} catch (InterruptedException e) {
e.printStackTrace();
} finally { //有可能会抛出异常的地方要在finally块儿中,以防发生异常而未执行归还操作,导致程序无法结束
mySemaphore.release(); //任务结束就归还信号量
}
});
}
executorService.shutdown();
}
}
Comments NOTHING