Java ExecutorService 指南
1. 介绍
ExecutorService
是一个 JDK API,它简化了在异步模式下运行任务。一般来说,ExecutorService
自动提供一个线程池和一个 API,用于向其分配任务。
2. 实例化 ExecutorService
2.1. Executors 类的工厂方法
创建 ExecutorService
最简单的方法是使用 Executors
类的工厂方法。
比如,下面代码用来创建一个拥有 10 个线程的线程池:
ExecutorService executor = Executors.newFixedThreadPool(10);
还有一些其他方法用来创建满足特定用例的预定义 ExecutorService
。要找到满足你需求的最好方法,请查看 Oracle 的官方文档。
2.2. 直接创建 ExecutorService
因为 ExecutorService
是接口,其任何实现的实例都可以使用。java.util.concurrent
包中有几个实现可供选择,或者你也可以创建自己的实现。
比如,ThreadPoolExecutor
类有一些构造函数,可用于配置执行器服务(Executor Service)及其内部线程池。
ExecutorService executorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
你可能注意到了上述代码与工厂方法 newSingleThreadExecutor()
的源码非常相似。对于大部分情况,不需要详细的手动配置。
3. 分配任务给 ExecutorService
ExecutorService
可以执行 Runnable
和 Callable
任务。为了在本文中保持简单,将使用两个基本任务。请注意,我们在这里使用 lambda 表达式,而不是匿名内部类:
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
我们可以使用多个方法,包括 execute()
、submit()
、invokeAny()
和 invokeAll()
,来将任务分配给 ExecutorService
。这些方法继承自 Executor
接口。
execute()
的方法是 void
并且不提供任何获取任务执行结果或检查任务状态(是否在运行中)的可能性:
executorService.execute(runnableTask);
submit()
提交一个 Callable
或者 Runnable
任务给 ExecutorService
并返回一个 Future
类型的结果:
Future<String> future =
executorService.submit(callableTask);
invokeAny()
将一组任务分配给 ExecutorService
,使每个任务运行,并返回一个任务成功执行的结果(如果执行成功):
String result = executorService.invokeAny(callableTasks);
invokeAll()
将一个任务集合分配给 ExecutorService
,使每个任务运行,并以 Future
类型的对象列表的形式返回所有任务执行的结果:
List<Future<String>> futures = executorService.invokeAll(callableTasks);
进一步讨论之前,我们还需要讨论两个项目:关闭 ExecutorService
和处理 Future
返回类型。
4. 关闭 ExecutorService
一般情况下,当没有任务要处理时,ExecutorService
不会自动销毁。它保持活跃并等待新的任务。
在某些情况下,这种方式很有帮助,比如当应用需要处理不规则出现的任务或者在编译时任务数量未知时。
另一方面,一个应用可能会到达终点但不会停止,因为等待的 ExecutorService
会导致 JVM 继续运行。
为了正确关闭 ExecutorService
,我们有shutdown()
和 shutdownNow()
API。
shutdown()
不会导致 ExecutorService
立即析构。它会让 ExecutorService
停止接受新任务,并且在所有正在运行的线程完成它们当前任务后关闭服务:
executorService.shutdown();
shutdownNow()
方法尝试立即销毁 ExecutorService
,但它不保证所有正在运行的线程会同时停止:
List<Runnable> notExecutedTasks = executorService.shutDownNow();
此方法返回等待处理的任务列表。由开发人员决定如何处理这些任务。
关闭 ExecutorService
(Oracle 也建议这样做)的一个好方法是,将这两种方法与 awaitTermination()
方法结合使用:
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
使用这种方法,ExecutiorService
将首先停止执行新任务,然后等待指定的时间段,以完成所有任务。如果该时间到期,则立即停止执行。
5. Future 接口
submit()
和 invokeAll()
方法返回 Future
类型的对象或者对象集合,允许我们获得任务执行结果或者检查任务状态(是否在运行中)。
Future
接口提供了一个特殊的阻塞方法 get()
,它返回 Callable
任务执行的实际结果,如果是 Runnable
任务返回 null
:
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
当任务仍在运行是调用 get()
方法可能会导致执行阻塞直到任务正确执行并且可得到结果。
get()
方法导致的长时间阻塞,可能导致应用性能下降。如果生成的数据不是关键数据,则可以通过使用超时来避免此类问题:
String result = future.get(200, TimeUnit.MILLISECONDS);
如果执行周期长于指定的时间( 本例中为 200 毫秒),则会抛出 TimeoutException
。
我们可以使用 isDone()
方法来检查分配的任务是否已经处理。
Future
接口还提供了用 cancel()
取消任务执行和用 isCancelled()
方法检查取消的功能:
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6. ScheduledExecutorService 接口
ScheduledExecutorService
在一些预定义的延迟之后和/或周期性地运行任务。
初始化 ScheduledExecutorService
的最好方法是使用 Executors
类的工厂方法。
这一章中,我们使用了带有一个线程的 ScheduledExecutorService
:
ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
要在一些固定的延迟后调度单个任务执行,请使用 ScheduledExecutorService
的 scheduled()
方法。
有两个 scheduled()
方法允许你执行 Runnable
或 Callable
任务:
Future<String> resultFuture =
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
scheduleAtFixedRate()
方法让我们可用在一些固定的延迟后周期性地调度以任务。上述代码延迟一秒后执行 callableTask
。
下面的代码块在初始延迟 100 毫秒后运行任务。此后,它将每 450 毫秒运行一次相同的任务:
executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
如果处理器运行分配的任务所需的时间超过 scheduleAtFixedRate()
方法的 period
参数,则 ScheduledExecutiorService
将等到当前任务完成后再启动下一个任务。
如果任务的迭代之间需要固定长度的延迟,则应使用 scheduleWithFixedDelay()
。
例如,以下代码将保证在当前执行结束和另一个执行开始之间有 150 毫秒的暂停:
executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根据 scheduleAtFixedRate()
和 scheduleWithFixedDelay()
方法契约,任务的周期性执行会在 ExecutorService
终止或者任务执行期间泡池异常时结束。
7. ExecutorService vs Fork/Join
在 Java 7 发布后,许多开发者绝对使用 fork/join
框架替换 ExecutorService
框架。
然而,这并不总是正确的决定。尽管 fork/join
具有简单性和频繁的性能提升,但它降低了开发人员对并发执行的控制。
ExecutorService
使开发人员能够控制生成的线程的数量和应由单独线程运行的任务的粒度。ExecutorService
的最佳用例是根据“一个线程一个任务”的方案处理独立任务,如事务或请求
相比之下,根据 Oracle 的文档,fork/join
的设计是为了加快可以递归分解为更小部分的工作。
8. 结论
尽管 ExecutorService
相对简单,但也有一些常见的陷阱。
让我们总结一下:
使得未被使用的 ExecutorService
保持活跃:请查阅本文第 4 部分:如何关闭 ExecutorService
。
使用固定长度线程池时可能导致线程池容量错误:确定应用高效运行任务所需的线程数非常重要。过大的线程池将导致不必要的开销,只为了创建大部分处于等待模式的线程。太少可能会使应用看起来没有响应,因为队列中的任务等待时间很长。
在任务取消后调用 Future 的 get()
方法:试图获取已取消任务的结果会触发 CancellationException
。
使用 Future 的 get()
方法将产生出乎意料的长阻塞:应该使用超时来避免意味的等待。
Github 代码仓库: https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-simple