Java Phaser 指南
1. 概述
本文中,我们将研究 java.util.concurrent
包中的 Phaser
构造。它是一个与 CountDownLatch
非常相似的结构,允许我们协调线程的执行。与 CountDownLatch
相比,它有一些额外的功能。
Phaser
是一个屏障(barrier),在继续执行之前,动态数量的线程需要等待它。在 CountDownLatch
中,该数字不能动态配置,必须在创建实例时提供。
2. Phaser API
Phaser 允许我们构建逻辑,使其中的线程在进入下一步执行之前需要在屏障上进行等待。
我们可以协调多个执行阶段,为每个程序阶段重用一个 Phaser
实例。每个阶段可以有不同数量的线程等待进入另一个阶段。稍后我们将看一个使用阶段的示例。
要参与协调,线程需要向 Phaser
实例注册(register()
)本身。请注意,这只会增加注册方的数量,并且我们无法检查当前线程是否已注册——我们必须对实现进行子类化才能支持这一点。
线程通过调用 arriveAndAwaitAdvance()
来发出到达屏障的信号,这是一种阻塞方法。当到达方的数量等于注册方的数量时,程序将继续执行,并且 phase 数量将增加。我们可以通过调用 getPhase()
方法来获取当前 phase 的编号。
当线程完成其任务,我们应该调用 arriveAndDeregister()
方法来表示在该特定阶段(phase)中不应再占用当前线程的信号。
3.使用 Phaser API 实现逻辑
比方说,我们希望协调操作(Action)的多个阶段。三个线程将处理第一阶段,两个线程将进行第二阶段。
我们可以创建 LongRunningAction
类实现 Runnable
接口:
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;
LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
this.randomWait();
ph.register();
}
@Override
public void run() {
ph.arriveAndAwaitAdvance();
randomWait();
ph.arriveAndDeregister();
}
// Simulating real work
private void randomWait() {
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
当我们的操作类被实例化时,我们使用 register()
方法注册到 Phaser
实例。这将增加使用该特定 Phaser 的线程数。
对 arriveAndAwaitAdvance()
的调用将导致当前线程在屏障上等待。如前所述,当到达方的数量与注册方的数量相同时,将继续执行。
处理完成后,当前线程将通过调用 arriveAndDeregister()
方法自行退出注册。
注意:我们使用 randomWait
方法在线程中引入随机延迟,以复制实时场景。
4. 测试 Phaser API
让我们创建一个测试用例,在其中我们将启动三个 LongRunningAction
线程并阻塞到屏障上。接下来,在操作完成后,我们将创建两个额外的 LongRunningAction
线程,用于执行下一阶段的处理。
从主线程创建 Phaser
实例时,我们传递 1
作为参数。这相当于从当前线程调用 register()
方法。
我们这样做是因为,当我们创建三个工作线程时,主线程是一个协调器,因此 Phaser
需要注册四个线程:
Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());
初始化后的该阶段等于零。
Phaser
类有一个构造函数,我们可以将父实例传递给它。在有大量参与方将经历巨大同步争用成本的情况下,它很有用。在这种情况下,可以设置 Phasers
的实例,使得子 Phaser 的组共享一个共同的父类。
接下来,让我们启动三个 LongRunningAction
操作线程,它们将在屏障上等待,直到我们从主线程调用 arriveAndAwaitAdvance()
方法。
请记住,我们已经用 1
初始化了 Phaser
,并再调用了 register()
三次。
new Thread(new LongRunningAction("thread-1", ph)).start();
new Thread(new LongRunningAction("thread-2", ph)).start();
new Thread(new LongRunningAction("thread-3", ph)).start();
现在,三个操作线程已经宣布他们已经到达了障碍,因此还需要一个 arriveAndAwaitAdvance()
的调用——来自主线程的调用:
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());
在该阶段完成后,getPhase()
方法将退回一个,因为程序完成了执行的第一步。
假设两个线程应该进行下一阶段的处理。我们可以利用 Phaser
来实现这一点,因为它允许我们动态配置应该在屏障上等待的线程数量。我们启动两个新线程,但在从主线程调用 arriveAndAwaitAdvance()
之前,这些线程不会继续执行(与前面的情况相同):
new Thread(new LongRunningAction("thread-4", ph)).start();
new Thread(new LongRunningAction("thread-5", ph)).start();
ph.arriveAndAwaitAdvance();
assertEquals(2, ph.getPhase());
ph.arriveAndDeregister();
之后,getPhase()
方法将返回一个等于 2
的阶段号。当我们想完成我们的程序时,我们需要调用 arriveAndDeregister()
方法,因为主线程仍在 Phaser 中注册。当注销导致注册方的数量变为零时,Phaser 终止。对同步方法的所有调用将不再被阻塞,并将立即返回。
运行该程序将产生以下输出(带有打印行语句的完整源代码可以在代码库中找到):
Thread thread-1 registered during phase 0
Thread thread-1 BEFORE long running action in phase 0
Thread thread-2 registered during phase 0
Thread thread-2 BEFORE long running action in phase 0
Thread thread-3 registered during phase 0
Thread main waiting for others
Thread thread-3 BEFORE long running action in phase 0
Thread main proceeding in phase 1
Thread thread-4 registered during phase 1
Thread thread-3 AFTER long running action in phase 1
Thread thread-2 AFTER long running action in phase 1
Thread thread-1 AFTER long running action in phase 1
Thread thread-5 registered during phase 1
Thread main waiting for new phase
Thread thread-4 BEFORE long running action in phase 1
Thread thread-5 BEFORE long running action in phase 1
Thread main proceeding in phase 2
Thread thread-5 AFTER long running action in phase 2
Thread thread-4 AFTER long running action in phase 2
我们看到所有线程都在等待执行,直到屏障打开。只有在前一阶段成功完成时,才能执行下一阶段的执行。
4. 结论
本教程中,我们了解了 java.util.concurrent
中的 Phaser 构造,并使用 Phaser
类实现了具有多个阶段的协调逻辑。
代码库:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-advanced