编程

Java ExecutorService 指南

1262 2024-07-08 01:15:00

1. 介绍

ExecutorService 是一个 JDK API,它简化了在异步模式下运行任务。一般来说,ExecutorService 自动提供一个线程池和一个 API,用于向其分配任务。

2. 实例化 ExecutorService

2.1. Executors 类的工厂方法

创建 ExecutorService 最简单的方法是使用 Executors 类的工厂方法。

比如,下面代码用来创建一个拥有 10 个线程的线程池:

ExecutorService executor = Executors.newFixedThreadPool(10);

还有一些其他方法用来创建满足特定用例的预定义 ExecutorService。要找到满足你需求的最好方法,请查看 Oracle 的官方文档

2.2. 直接创建 ExecutorService

因为 ExecutorService 是接口,其任何实现的实例都可以使用。java.util.concurrent 包中有几个实现可供选择,或者你也可以创建自己的实现。

比如,ThreadPoolExecutor 类有一些构造函数,可用于配置执行器服务(Executor Service)及其内部线程池。

ExecutorService executorService = 
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,   
  new LinkedBlockingQueue<Runnable>());

你可能注意到了上述代码与工厂方法 newSingleThreadExecutor() 源码非常相似。对于大部分情况,不需要详细的手动配置。

3. 分配任务给 ExecutorService

ExecutorService 可以执行 RunnableCallable 任务。为了在本文中保持简单,将使用两个基本任务。请注意,我们在这里使用 lambda 表达式,而不是匿名内部类:

Runnable runnableTask = () -> {
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

我们可以使用多个方法,包括 execute()submit()invokeAny()invokeAll(),来将任务分配给 ExecutorService这些方法继承自 Executor 接口。

execute() 的方法是 void 并且不提供任何获取任务执行结果或检查任务状态(是否在运行中)的可能性:

executorService.execute(runnableTask);

submit() 提交一个 Callable 或者 Runnable 任务给 ExecutorService 并返回一个 Future 类型的结果:

Future<String> future = 
  executorService.submit(callableTask);

invokeAny() 将一组任务分配给 ExecutorService,使每个任务运行,并返回一个任务成功执行的结果(如果执行成功):

String result = executorService.invokeAny(callableTasks);

invokeAll() 将一个任务集合分配给 ExecutorService,使每个任务运行,并以 Future 类型的对象列表的形式返回所有任务执行的结果:

List<Future<String>> futures = executorService.invokeAll(callableTasks);

进一步讨论之前,我们还需要讨论两个项目:关闭 ExecutorService 和处理 Future 返回类型。

4. 关闭 ExecutorService

一般情况下,当没有任务要处理时,ExecutorService 不会自动销毁。它保持活跃并等待新的任务。

在某些情况下,这种方式很有帮助,比如当应用需要处理不规则出现的任务或者在编译时任务数量未知时。

另一方面,一个应用可能会到达终点但不会停止,因为等待的 ExecutorService 会导致 JVM 继续运行。

为了正确关闭 ExecutorService,我们有shutdown()shutdownNow() API。

shutdown() 不会导致 ExecutorService 立即析构。它会让 ExecutorService 停止接受新任务,并且在所有正在运行的线程完成它们当前任务后关闭服务:

executorService.shutdown();

shutdownNow() 方法尝试立即销毁 ExecutorService,但它不保证所有正在运行的线程会同时停止:

List<Runnable> notExecutedTasks = executorService.shutDownNow();

此方法返回等待处理的任务列表。由开发人员决定如何处理这些任务。

关闭 ExecutorService(Oracle 也建议这样做)的一个好方法是,将这两种方法与 awaitTermination() 方法结合使用:

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    } 
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

使用这种方法,ExecutiorService 将首先停止执行新任务,然后等待指定的时间段,以完成所有任务。如果该时间到期,则立即停止执行。

5. Future 接口

submit() invokeAll() 方法返回 Future 类型的对象或者对象集合,允许我们获得任务执行结果或者检查任务状态(是否在运行中)。

Future 接口提供了一个特殊的阻塞方法 get(),它返回 Callable 任务执行的实际结果,如果是 Runnable 任务返回 null:

Future<String> future = executorService.submit(callableTask);
String result = null;
try {
    result = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

当任务仍在运行是调用 get() 方法可能会导致执行阻塞直到任务正确执行并且可得到结果。

get() 方法导致的长时间阻塞,可能导致应用性能下降。如果生成的数据不是关键数据,则可以通过使用超时来避免此类问题:

String result = future.get(200, TimeUnit.MILLISECONDS);

如果执行周期长于指定的时间( 本例中为 200 毫秒),则会抛出 TimeoutException

我们可以使用 isDone() 方法来检查分配的任务是否已经处理。

Future 接口还提供了用 cancel() 取消任务执行和用 isCancelled() 方法检查取消的功能:

boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

6. ScheduledExecutorService 接口

ScheduledExecutorService 在一些预定义的延迟之后和/或周期性地运行任务。

初始化 ScheduledExecutorService 的最好方法是使用 Executors 类的工厂方法。

这一章中,我们使用了带有一个线程的 ScheduledExecutorService

ScheduledExecutorService executorService = Executors
  .newSingleThreadScheduledExecutor();

要在一些固定的延迟后调度单个任务执行,请使用 ScheduledExecutorServicescheduled() 方法。

有两个 scheduled() 方法允许你执行 RunnableCallable 任务:

Future<String> resultFuture = 
  executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

scheduleAtFixedRate() 方法让我们可用在一些固定的延迟后周期性地调度以任务。上述代码延迟一秒后执行 callableTask

下面的代码块在初始延迟 100 毫秒后运行任务。此后,它将每 450 毫秒运行一次相同的任务:

executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

如果处理器运行分配的任务所需的时间超过 scheduleAtFixedRate() 方法的 period 参数,则 ScheduledExecutiorService 将等到当前任务完成后再启动下一个任务。

如果任务的迭代之间需要固定长度的延迟,则应使用 scheduleWithFixedDelay()

例如,以下代码将保证在当前执行结束和另一个执行开始之间有 150 毫秒的暂停:

executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

根据 scheduleAtFixedRate()scheduleWithFixedDelay() 方法契约,任务的周期性执行会在 ExecutorService 终止或者任务执行期间泡池异常时结束。

7. ExecutorService vs Fork/Join

在 Java 7 发布后,许多开发者绝对使用 fork/join 框架替换 ExecutorService 框架。

然而,这并不总是正确的决定。尽管 fork/join 具有简单性和频繁的性能提升,但它降低了开发人员对并发执行的控制。

ExecutorService 使开发人员能够控制生成的线程的数量和应由单独线程运行的任务的粒度。ExecutorService 的最佳用例是根据“一个线程一个任务”的方案处理独立任务,如事务或请求

相比之下,根据 Oracle 的文档,fork/join 的设计是为了加快可以递归分解为更小部分的工作。

8. 结论

尽管 ExecutorService 相对简单,但也有一些常见的陷阱。

让我们总结一下:

使得未被使用的 ExecutorService 保持活跃:请查阅本文第 4 部分:如何关闭 ExecutorService

使用固定长度线程池时可能导致线程池容量错误:确定应用高效运行任务所需的线程数非常重要。过大的线程池将导致不必要的开销,只为了创建大部分处于等待模式的线程。太少可能会使应用看起来没有响应,因为队列中的任务等待时间很长。

在任务取消后调用 Futureget() 方法:试图获取已取消任务的结果会触发 CancellationException

使用 Future 的 get() 方法将产生出乎意料的长阻塞:应该使用超时来避免意味的等待。

Github 代码仓库: https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-simple