25-AQS

nobility 发布于 2021-06-27 1724 次阅读


AQS

AQS(AbstarctQueuedSynchronizer)是一个用于构建锁、同步工具的工具类(框架),AQS解决了在实现同步容器时大量的细节问题,比如ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch等底层都用到了AQS

AQS内部原理

  • state状态:在AQS中state是使用volatile修饰的变量,为了保证线程安全需要使用AQS中提供的getState()setState()compareAndSetState()方法来操作,这些方法都依赖与原子类,根据用户的使用情况state的意义也不同,比如
    • Semaphore中就表示剩余信号量的数量
    • CountDownLatch中就表示倒数到几了
    • ReentrantLock中就表示锁的重入次数
  • 控制线程抢锁和配合的先进先出双向队列:AQS维护的一个用来存放等待线程的队列,是一个双向链表形式,头节点是已经拿到锁的线程
    • 当多个线程竞争同一把锁时,竞争失败的线程会被AQS放入该队列,同时阻塞该线程
    • 当锁释放时,AQS会从队列中唤醒线程来占有刚刚释放的锁
  • 期望我们去实现的获取和释放等方法:这些方法由我们去实现,AQS内部会用到我们实现的方法,根据实现情况意义也不相同
    • 获取方法:依赖state变量,在获取不到锁时会被阻塞;独占锁重写tryAcquire()方法/共享锁重写
    • 释放方法:不会造成阻塞;独占锁重写tryRelease()方法/共享锁重写``

AQS使用方式

根据阅读ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch等底层源码发现AQS的使用流程大致如下

  1. 内部写一个私有的静态内部类Sync继承自AbstarctQueuedSynchronizer

  2. 从使用层面来说,AQS的锁功能分独占锁和共享锁,所以要重写不同的方法

    • 独占锁:
      • boolean tryAcquire(int arg):配合调用acquire()acquireInterruptibly()控制线程方法,该方法返回false线程会被阻塞
      • boolean tryRelease(int arg):配合调用release()控制线程方法,该方法返回true会释放锁
    • 共享锁:
      • int tryAcquireShared(int arg):配合调用acquireShared()acquireInterruptibly()控制线程方法,该方法返回小于0的值线程会被阻塞
      • boolean tryReleaseShared(int arg):配合调用releaseShared()控制线程方法,该方法返回true会唤醒等待线程
  3. 使用内部类Sync创建对象,构造方法中传入state的初始值

  4. 控制线程方法中调用Sync对象的方法

    • 独占锁:

      • void acquire(int arg):会调用到我们重写的tryAcquire()方法
      • void acquireInterruptibly(int arg):会调用到我们重写的tryAcquire()方法,该方法可被中断
      • boolean release(int arg):会调用到我们重写的tryRelease()方法
    • 共享锁:

      • void acquireShared(int arg):会调用到我们重写的tryAcquireShared()方法
      • void acquireSharedInterruptibly(int arg):会调用到我们重写的tryAcquireShared()方法,该方法可被中断
      • boolean releaseShared(int arg):会调用到我们重写的tryReleaseShared()方法

AQS使用示例

/**
 * 只有一个信号量的Semaphore工具类
 */
public class MySemaphore {
  private static final class Sync extends AbstractQueuedSynchronizer {
    public Sync() {
      setState(1);
    }

    @Override
    protected int tryAcquireShared(int acquires) {
      if (getState() == 0) {  //若已经没有信号量了,就被阻塞
        return -1;
      }
      do {  //否则,就是有信号量,就自旋将信号量设置为0,代表被线程拿走了
        int state = getState();
        if (compareAndSetState(state, 0)) {
          return 1;  //设置成功后该线程继续执行
        }
      } while (true);
    }

    @Override
    protected boolean tryReleaseShared(int releases) {
      setState(1);  //归还信号量,将信号量设置为1
      return true;  //信号量归还后,唤醒其他线程来使用信号量
    }
  }

  private final Sync sync;

  public MySemaphore() {
    this.sync = new Sync();
  }

  public void acquire() {
    sync.acquireShared(0);
  }

  public void release() {
    sync.releaseShared(0);
  }

  public static void main(String[] args) throws InterruptedException {
    MySemaphore mySemaphore = new MySemaphore();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 50; i++) {  //开启50个线程执行
      executorService.submit(() -> {
        mySemaphore.acquire();  //获取信号量
        System.out.println(Thread.currentThread().getName() + "拿到许可证正在执行...");
        try {
          Thread.sleep(new Random().nextInt(3000));
          System.out.println(Thread.currentThread().getName() + "执行结束,归还许可证");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {  //有可能会抛出异常的地方要在finally块儿中,以防发生异常而未执行归还操作,导致程序无法结束
          mySemaphore.release();  //任务结束就归还信号量
        }
      });
    }
    executorService.shutdown();
  }
}
此作者没有提供个人介绍
最后更新于 2021-06-27