18-Callable

nobility 发布于 2021-06-05 771 次阅读


Callable

对比Runnable

相对于Runnable弥补了如下几点缺陷,之所以设计成这样是因为调用run()方法的类是Java提供的,不是我们编写,所以即使抛出异常或返回返回值,我们也进行处理

  • 没有返回值
  • 不能抛出检查异常
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;  //允许抛出异常,允许有返回值
}

Future接口

Future是一个存储器,存储了Callable接口任务的执行结果,用来操作实现了Callable接口异步任务的接口,该接口中定义的方法:可获取到Callable接口返回的执行结果、Callable任务的执行状态、任务的取消,限时获取结果等

方法名 描述
V get() 获取异步任务的执行结果
V get(long timeout, TimeUnit unit) 指定时间内获取异步任务的执行结果
boolean cancel(boolean mayInterruptIfRunning) 取消异步任务的执行,参数表示是否发送中断信号,返回是否取消成功
boolean isDone() 判断异步任务是否执行完毕
boolean isCancelled() 判断异步任务是否已经被取消

get方法注意事项

call()方法未执行完之前,调用get()方法的线程就会被阻塞,直到call()方法返回了结果后get()方法才会获取到结果,被get()方法阻塞的线程才会被唤醒

  • 异步任务已经正常完成:get()方法会立刻返回
  • 异步任务尚未完成(未开始或进行中):调用get()方法的线程就会被阻塞
  • 异步任务中抛出Exception异常:get()方法会抛出ExecutionException异常
  • 异步任务已经被取消:get()方法会抛出CancellationException异常
  • 异步任务超时:get(time)方法会抛出TimeoutException异常

cancel方法注意事项

  • 参数传入true适用于:异步任务能正确处理中断信号
  • 参数传入false适用于:异步任务不能正确处理中断信号,不清楚异步任务是否支持取消操作,以及想让为开始的异步任务进行取消
  • 异步任务还未开始执行:正常被取消,未来也不会被执行,方法返回true
  • 异步任务已经执行结束或已经取消:取消失败,方法返回false
  • 异步任务执行中:若传入参数是true会向异步任务发送中断信号,传入参数是false则不会向异步任务发送中断信号

FutureTask

FutureTast是一种包装器,可以将Callable转化成Future和Runnable,因为他同时实现了两者的接口,所以FutureTast既可以作为Future得到Callable的返回值,也可以做为Runnable被线程或线程池执行

FutureTask类类继承结构

public static void main(String[] args) {
  FutureTask<Integer> futureTask = new FutureTask<>(() -> {  //构造方法包装Callable实例
    int random = new Random().nextInt(3000);
    Thread.sleep(random);
    return random;
  });
  new Thread(futureTask).start();  //线程的启动使用FutureTask实例
  try {
    Integer result = futureTask.get();  //获取返回值使用FutureTask实例
    System.out.println("异步任务执行的时间为:" + result);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
}

线程池submit

线程池的submit()方法可接收Callable实例,并对应的返回一个Future实例用来操作异步任务

public static void main(String[] args) {
  ExecutorService executorService = Executors.newFixedThreadPool(2);
  ArrayList<Future<Integer>> futures = new ArrayList<>();  //将submit()方法返回的Future实例添加到该集合中
  for (int i = 0; i < 20; i++) {
    Future<Integer> future = executorService.submit(() -> {
      int random = new Random().nextInt(3000);
      Thread.sleep(random);
      return random;
    });
    futures.add(future);  //将返回的Future实例添加到集合中
  }
  for (Future<Integer> future : futures) {  //统一获取异步任务的返回值
    try {
      Integer result = future.get();
      System.out.println("异步任务执行的时间为:" + result);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }
  executorService.shutdown();
}

Future注意事项

  • Future的生命周期和线程池的生命周期一样不能后退,一旦完成就永久停留在已完成
  • 当使用for循环批量获取future的结果时,容易发生一部分异步任务很慢,导致后续已经出结果的异步任务只能等待前面异步任务获取到结果才能获取到结果,性能损耗较大,所以可以使用get(time)方法限时获取的方式进行获取,也可以使用completionServiceCompletableFuture
public class Main {
  public static void main(String[] args) {
    //completionService();
    completableFuture();
  }

  private static void completableFuture() {
    ExecutorService executorService = Executors.newFixedThreadPool(20);
    for (int i = 0; i < 20; i++) {
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        int random = new Random().nextInt(3000);
        try {
          Thread.sleep(random);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return random;
      }, executorService)  //异步的使用线程池进行执行异步任务
          .thenApply(result -> "thenApply(" + result + ")")  //对结果再次进行处理,变化了返回值类型
          .whenComplete((result, e) -> {  //对结果再次处理
            if (e == null) {  //忽略异常
              System.out.println("异步任务执行的时间为:" + result);
            }
          });
    }
    executorService.shutdown();
  }

  private static void completionService() {
    ExecutorService executorService = Executors.newFixedThreadPool(20);
    List<Future<Integer>> futureList = new ArrayList<>();
    CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);  //构造方法包装线程池
    for (int i = 0; i < 20; i++) {
      Future<Integer> future = completionService.submit(() -> {  //使用包装后的线程池
        int random = new Random().nextInt(3000);
        Thread.sleep(random);
        return random;
      });
      futureList.add(future);  //将返回的Future实例添加到集合中
    }
    for (int i = 0; i < 20; i++) {
      try {
        Future<Integer> future = completionService.take();  //内部维护阻塞队列,获取优先完成的异步任务
        Integer result = future.get();  //再获取该任务的执行结果
        System.out.println("异步任务执行的时间为:" + result);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    executorService.shutdown();
  }
}
此作者没有提供个人介绍
最后更新于 2021-06-05