编程

Java Phaser 指南

702 2024-07-01 12:58:00

1. 概述

本文中,我们将研究 java.util.concurrent 包中的 Phaser 构造。它是一个与 CountDownLatch 非常相似的结构,允许我们协调线程的执行。与 CountDownLatch 相比,它有一些额外的功能。

Phaser 是一个屏障(barrier),在继续执行之前,动态数量的线程需要等待它。在 CountDownLatch 中,该数字不能动态配置,必须在创建实例时提供。

2. Phaser API

Phaser 允许我们构建逻辑,使其中的线程在进入下一步执行之前需要在屏障上进行等待。

我们可以协调多个执行阶段,为每个程序阶段重用一个 Phaser 实例。每个阶段可以有不同数量的线程等待进入另一个阶段。稍后我们将看一个使用阶段的示例。

要参与协调,线程需要向 Phaser 实例注册(register())本身。请注意,这只会增加注册方的数量,并且我们无法检查当前线程是否已注册——我们必须对实现进行子类化才能支持这一点。

线程通过调用 arriveAndAwaitAdvance() 来发出到达屏障的信号,这是一种阻塞方法。当到达方的数量等于注册方的数量时,程序将继续执行,并且 phase 数量将增加。我们可以通过调用 getPhase() 方法来获取当前 phase 的编号。

当线程完成其任务,我们应该调用 arriveAndDeregister() 方法来表示在该特定阶段(phase)中不应再占用当前线程的信号。

3.使用 Phaser API 实现逻辑

比方说,我们希望协调操作(Action)的多个阶段。三个线程将处理第一阶段,两个线程将进行第二阶段。

我们可以创建 LongRunningAction 类实现 Runnable 接口:

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        this.randomWait();
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        randomWait();
        ph.arriveAndDeregister();
    }

    // Simulating real work
    private void randomWait() {
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

当我们的操作类被实例化时,我们使用 register() 方法注册到 Phaser实例。这将增加使用该特定 Phaser 的线程数。

arriveAndAwaitAdvance() 的调用将导致当前线程在屏障上等待。如前所述,当到达方的数量与注册方的数量相同时,将继续执行。

处理完成后,当前线程将通过调用 arriveAndDeregister() 方法自行退出注册。

注意:我们使用 randomWait 方法在线程中引入随机延迟,以复制实时场景。

4. 测试 Phaser API

让我们创建一个测试用例,在其中我们将启动三个 LongRunningAction 线程并阻塞到屏障上。接下来,在操作完成后,我们将创建两个额外的 LongRunningAction 线程,用于执行下一阶段的处理。

从主线程创建 Phaser 实例时,我们传递 1 作为参数。这相当于从当前线程调用 register() 方法。

我们这样做是因为,当我们创建三个工作线程时,主线程是一个协调器,因此 Phaser 需要注册四个线程:

Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());

初始化后的该阶段等于零。

Phaser 类有一个构造函数,我们可以将父实例传递给它。在有大量参与方将经历巨大同步争用成本的情况下,它很有用。在这种情况下,可以设置 Phasers 的实例,使得子 Phaser 的组共享一个共同的父类。

接下来,让我们启动三个 LongRunningAction 操作线程,它们将在屏障上等待,直到我们从主线程调用 arriveAndAwaitAdvance() 方法。

请记住,我们已经用 1 初始化了 Phaser,并再调用了 register() 三次。

new Thread(new LongRunningAction("thread-1", ph)).start();
new Thread(new LongRunningAction("thread-2", ph)).start();
new Thread(new LongRunningAction("thread-3", ph)).start();

现在,三个操作线程已经宣布他们已经到达了障碍,因此还需要一个 arriveAndAwaitAdvance() 的调用——来自主线程的调用:

ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());

在该阶段完成后,getPhase() 方法将退回一个,因为程序完成了执行的第一步。

假设两个线程应该进行下一阶段的处理。我们可以利用 Phaser 来实现这一点,因为它允许我们动态配置应该在屏障上等待的线程数量。我们启动两个新线程,但在从主线程调用 arriveAndAwaitAdvance() 之前,这些线程不会继续执行(与前面的情况相同):

new Thread(new LongRunningAction("thread-4", ph)).start();
new Thread(new LongRunningAction("thread-5", ph)).start();
ph.arriveAndAwaitAdvance();
 
assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

之后,getPhase() 方法将返回一个等于 2 的阶段号。当我们想完成我们的程序时,我们需要调用 arriveAndDeregister() 方法,因为主线程仍在 Phaser 中注册。当注销导致注册方的数量变为零时,Phaser 终止。对同步方法的所有调用将不再被阻塞,并将立即返回。

运行该程序将产生以下输出(带有打印行语句的完整源代码可以在代码库中找到):

Thread thread-1 registered during phase 0
Thread thread-1 BEFORE long running action in phase 0
Thread thread-2 registered during phase 0
Thread thread-2 BEFORE long running action in phase 0
Thread thread-3 registered during phase 0
Thread main waiting for others
Thread thread-3 BEFORE long running action in phase 0
Thread main proceeding in phase 1
Thread thread-4 registered during phase 1
Thread thread-3 AFTER long running action in phase 1
Thread thread-2 AFTER long running action in phase 1
Thread thread-1 AFTER long running action in phase 1
Thread thread-5 registered during phase 1
Thread main waiting for new phase
Thread thread-4 BEFORE long running action in phase 1
Thread thread-5 BEFORE long running action in phase 1
Thread main proceeding in phase 2
Thread thread-5 AFTER long running action in phase 2
Thread thread-4 AFTER long running action in phase 2

我们看到所有线程都在等待执行,直到屏障打开。只有在前一阶段成功完成时,才能执行下一阶段的执行。

4. 结论

本教程中,我们了解了 java.util.concurrent 中的 Phaser 构造,并使用 Phaser 类实现了具有多个阶段的协调逻辑。

代码库:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-advanced