24-同步工具

nobility 发布于 2021-06-18 1784 次阅读


同步工具

同步工具的作用就是为了更容易的让线程之间进行相互合作来满足业务需求

CountDownLatch倒数门闩

一部分任务执行倒数操作,一部分任务等待倒数结束,倒数结束后等待线程才继续执行,同一个任务中可多次倒数,该类是不能够回滚重置的 ,一旦倒数值结束该对象就废掉了,要想在使用就需要重新new一个

方法名 描述
CountDownLatch(int count) 构造方法,传入要倒数的值,count要大于0
void await() 调用该方法的线程会等待其他线程的倒数,直到倒数结束
void countDown() 调用该方法一次会让倒数值减一,倒数值减到0后就会唤醒等待倒数结束的线程
public class Main {
  public static void main(String[] args) throws InterruptedException {
    //oneAwaitMany();  //一等多,多次倒数,一个等待
    manyAwaitOne();  //多等一,一次倒数,多个等待
    //也可以多等多
  }

  private static void oneAwaitMany() throws InterruptedException {  //一个线程等待多个任务都执行完毕再执行
    CountDownLatch latch = new CountDownLatch(3);  //多个任务进行倒数,一个线程进行等待
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 3; i++) {
      int finalI = i;
      executorService.execute(() -> {
        System.out.println("任务" + finalI + "准备执行");
        try {
          Thread.sleep(new Random().nextInt(3000));
          System.out.println("任务" + finalI + "执行结束");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {  //有可能会抛出异常的地方要在finally块儿中,以防发生异常而未执行倒数,导致程序无法结束
          latch.countDown();  //任务结束就执行倒数
        }
      });
    }
    System.out.println("等待所有任务结束");
    latch.await();  //主线程等待
    System.out.println("所有任务执行结束");
    executorService.shutdown();
  }

  private static void manyAwaitOne() throws InterruptedException {  //多个线程等待一个任务执行完再执行
    CountDownLatch latch = new CountDownLatch(1);  //多个线程进行等待,一个任务进行倒数
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 3; i++) {
      int finalI = i;
      executorService.execute(() -> {
        System.out.println(Thread.currentThread().getName() + "正在等待倒数");
        try {
          latch.await();
          System.out.println(Thread.currentThread().getName() + "执行结束");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    }
    Thread.sleep(5000);  //主线程等待一会在进行倒数
    latch.countDown();  //主线程倒数
    System.out.println(Thread.currentThread().getName() + "执行结束");
    executorService.shutdown();
  }
}

Semaphore信号量

一般情况下,信号量数量少于线程数量,只有拿到信号量的线程才能执行,未拿到信号量的线程会陷入阻塞状态,所以信号量的获取与归还数量要一致,以防程序无法终止,但是信号量的获取和归还操作并非必须在同一个线程

方法名 描述
Semaphore(int permits, boolean fair) 构造方法,传入信号量的个数,和是否公平,默认不公平(公平指的是等待队列中谁等待时间最长谁优先执行,不公平指的是等待队列中允许插队)
void acquire(int permits) 获取指定数量的信号量,默认获取一个,需要手动处理中断异常
void acquireUninterruptibly(int permits) 获取指定数量的信号量,默认获取一个,无需手动处理中断异常
boolean tryAcquire(int permits) 尝试获取指定数量的信号量,默认获取一个,立刻返回,获取到返回true,获取不到返回false
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 尝试获取指定数量的信号量,默认获取一个,指定时间内获取到信号量返回true,超时获取不到返回false
void release(int permits) 归还指定数量的信号量,默认归还一个

最好设置为公平的情况,由于线程数量过多,若允许插队,可能造成线程饥饿的情况

public static void main(String[] args) throws InterruptedException {
  Semaphore semaphore = new Semaphore(3, true);  //初始化3个信号量,并且是公平的
  ExecutorService executorService = Executors.newCachedThreadPool();
  for (int i = 0; i < 50; i++) {  //开启50个线程执行
    executorService.submit(() -> {
      semaphore.acquireUninterruptibly();
      System.out.println(Thread.currentThread().getName() + "拿到许可证正在执行...");
      try {
        Thread.sleep(new Random().nextInt(3000));
        System.out.println(Thread.currentThread().getName() + "执行结束,归还许可证");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {  //有可能会抛出异常的地方要在finally块儿中,以防发生异常而未执行归还操作,导致程序无法结束
        semaphore.release();  //任务结束就归还信号量
      }
    });
  }
  executorService.shutdown();
}

Condition接口条件对象

Condition接口是配合Lock接口使用的,Condition接口也提供了类似Object的线程等待与唤醒机制,与Lock配合可以实现;和Object的线程等待与唤醒机制一样,必须先持有锁,否则会抛出IllegalMonitorStateException非法monitor锁状态异常;相对于Object的线程等待与唤醒机制只能使用synchronized绑定的单个锁对象来控制等待与唤醒,Condition接口可使用多个对象来控制等待与唤醒机制,所以相对性能也高一些

  • await():调用该方法的线程会处于等待状态,同时释放调用该方法对象的Lock锁,需要手动处理中断异常
  • awaitUninterruptibly():调用该方法的线程会处于等待状态,同时释放调用该方法对象的Lock锁,无需手动处理中断异常
  • await(time):调用该方法的线程会处于等待状态,同时释放调用该方法对象的monitor锁,若超时自动唤醒,需手动处理中断异常
  • signal():唤醒一个等待线程,该方法是公平的,只会唤醒等待时间最长的那个线程
  • notifyAll():唤醒全部等待线程

下面是未使用阻塞队列,且使用多个Condition实例来控制线程等待和唤醒,的生产者消费者模式

public class Main {
  private static Queue<Integer> queue = new LinkedList<>();  //存储仓库
  private static int capacity = 5;  //仓库容量
  private static ReentrantLock lock = new ReentrantLock();
  private static Condition produc = lock.newCondition();
  private static Condition consum = lock.newCondition();

  public static void main(String[] args) {
    Runnable producer = () -> {
      for (int i = 0; i < 50; i++) {
        lock.lock();
        try {
          while (queue.size() == capacity) {
            System.out.println("仓库已满,等待消费");
            produc.await();  //生产者等待
          }
          queue.add(i);  //仓库未慢,直接生产
          System.out.println("Producer生产了一个" + i);
          consum.signalAll();  //通知消费者消费
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
    };
    Runnable consumer = () -> {
      for (int i = 0; i < 50; i++) {
        lock.lock();
        try {
          while (queue.size() == 0) {
            System.out.println("仓库已空,等待生产");
            consum.await();  //消费者等待
          }
          Integer poll = queue.poll();  //仓库未空,直接消费
          System.out.println("Consumer消费了一个" + poll);
          produc.signalAll();  //通知生产者者生产
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
    };
    new Thread(producer).start();
    new Thread(consumer).start();
  }
}

CyclicBarrier循环栅栏

让多个线程聚集阻塞后,当聚集数到达设置的数量时,这些阻塞线程就会统一执行

方法名 描述
CyclicBarrier(int parties, Runnable barrierAction) 构造方法,传入要聚集线程的个数,parties要大于0,barrierAction是当聚集好后统一执行前的Runnable对象(由最后到达的线程调用该run()方法),默认为null
int await() 聚集阻塞线程,返回已经聚集了多少个阻塞线程了
public static void main(String[] args) throws InterruptedException {
  CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
    System.out.println(Thread.currentThread().getName() + "聚集线程完毕,准备统一执行");
  });
  for (int i = 0; i < 10; i++) {
    new Thread(() -> {
      try {
        System.out.println(Thread.currentThread().getName() + "正在前往聚集地");
        Thread.sleep(new Random().nextInt(3000));
        System.out.println(Thread.currentThread().getName() + "到达聚集地,等待其他线程到达");
        cyclicBarrier.await();
        System.out.println(Thread.currentThread().getName() + "再次执行");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }

    }).start();
  }
}
此作者没有提供个人介绍
最后更新于 2021-06-18