编程

JAVA 高并发之 java.util.concurrent 概述

545 2024-06-27 00:05:00

1. 概述

java.util.concurrent 包为创建高并发应用提供了工具。

本文将对此包做一个概述。

2. 主要组件

java.util.concurrent 包含了太多的功能,无法在一篇文章中进行讨论。在本文中,我们将主要关注此包中一些最有用的实用功能,如:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

2.1. Executor

Executor 是一个接口,表示执行所提供任务的对象。

取决于特定的实现(从哪里启动调用),任务应该在新的线程或当前线程上运行。因此,使用此接口,我们可以将任务执行流与实际任务执行机制解耦。

这里需要注意的一点是,Executor 并不严格要求任务执行是异步的。在最简单的情况下,Executor 可以在调用线程中立即调用提交的任务。

我们需要创建一个(Invoker)来创建 executor 实例:

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

现在,我们可以使用调用者来执行该任务。

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // task to be performed
    });
}

此处需要注意的点是,如果 executor 不能接受执行的任务,它将抛出 RejectedExecutionException

2.2. ExecutorService

ExecutorService 是异步处理的完整解决方案。它管理内存中的队列,并根据线程可用性调度提交的任务。
要使用 ExecutorService,我们需要创建一个 Runnable 类。

public class Task implements Runnable {
    @Override
    public void run() {
        // task details
    }
}

现在我们可以创建 ExecutorService 实例并分配任务。创建实例时,我们需要指定线程池的大小。

ExecutorService executor = Executors.newFixedThreadPool(10);

如果我们想创建单线程 ExecutorService 实例,我们可以使用 newSingleThreadExecutor(ThreadFactory threadFactory) 来创建实例。

创建完 executor 后,我们可以用它来提交任务。

public void execute() { 
    executor.submit(new Task()); 
}

我们也可以在提交任务时,创建 Runnable 实例。

executor.submit(() -> {
    new Task();
});

它还附带了两个开箱即用的执行终止方法。第一个是 shutdown();它等待直到所有提交的任务完成执行。另一个方法是 shutdownNow(),它试图终止所有正在执行的任务,并停止处理等待的任务。

还有另一个方法 awaitTermination(long timeout, TimeUnit unit),它在触发关闭事件或执行超时发生后,强制阻止直到所有任务都完成了执行,或者执行线程本身被中断,

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3. ScheduledExecutorService

ScheduledExecutorService 是一个类似于 ExecutorService 的接口,不过它周期性地执行任务。

Executor 和 ExecutorService 的方法是当场安排,不会造成任何人为延迟。零值或者负值都表示请求需要立即执行。

我们可以使用 RunnableCallable 接口来定义任务。

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService 也可以在一些给定的固定延迟之后安排任务:

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

此处,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) 方法创建并执行一个周期性操作,该操作首先在提供的初始延迟之后调用,然后在给定的周期内调用,直到服务实例关闭。 

scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) 方法创建并执行一个周期性动作,该动作在提供的初始延迟之后首先被调用,并在执行动作的终止和下一个动作的调用之间以给定的延迟重复调用。

2.4. Future

Future 用以表示异步操作的结果。他与检测异步是否完成、获取计算结果等方法一起。

再者,cancel(boolean mayInterruptIfRunning) API 取消了操作并释放正在执行的线程。如果 mayInterruptIfRunning 的值为 true。该线程所执行的任务会立即被终止。

否则,在进行的任务将被允许完成。我们可以使用下面的代码创建 future 实例:

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

我们可以使用下面代码检测该 future 结果是否准备好并且在计算完成后获取其数据。

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

我们还可以为给定的操作指定超时。如果任务花费的时间超过此时间,则会抛出 TimeoutException

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5. CountDownLatch

CountDownLatch (JDK 5 中引入) 是一个实用类,其阻塞一组线程直到某些操作完成。

CountDownLatch 使用一个 counter(Integer type) 初始化;该 counter 在依赖线程完成执行后减少。而当该计数器(counter)达到零时,其他线程获得释放。

更多 CountDownLatch 详情,查看此处

2.6. CyclicBarrier

CyclicBarrierCountDownLatch 工作方式很接近,不同之处在于它是可复用的。不像 CountDownLatch,它允许多线程在调用最终任务之前使用 await() 方法(称之为 barrier 条件)相互等待。

我们需要创建一个 Runnable 任务实例来初始化 barrier 条件:

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

现在我们可以调用一些线程,来竞争 barrier 条件:

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

此处,isBroken() 方法检测是否有任何线程在执行时间内被中断。在执行实际流程之前,我们应该始终执行此检查。

2.7. Semaphore

Semaphore 用于阻塞线程级访问物理或逻辑资源的某些部分。Semaphore 包含一组许可;每当线程试图进入关键部分时,它都需要检查 Semaphore 是否有权限访问。

如果没有可用许可(通过  tryAcquire()),线程不允许跳转到关键部分;但是,如果许可可用,则授予访问权限,并且权限计数器减少。

一旦执行中的线程释放了关键部分,权限计数器(counter)再次增加(由 release() 方法完成)。

使用 tryAcquire(long timeout, TimeUnit unit) 方法,我们可以指定获取权限的超时时间。

我们还可以检查可用许可的数量或等待获取信号量(semaphore)的线程数量。

以下代码用以实现 semaphore:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        }
        finally {
            semaphore.release();
        }
    }

}

我们可以使用 Semaphore 实现像 Mutex 一样的数据结构。更多详情查看此处

2.8. ThreadFactory

顾名思义,ThreadFactory 充当一个线程(不存在)池,根据需要创建一个新线程。它消除了实现高效线程创建机制所需的大量样板编码。

我们可以这样定义一个 ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

我们可以使用 newThread(Runnable r) 方法在运行时创建新的线程:

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9. BlockingQueue

在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent 包附带一个称为 BlockingQueue 的数据结构,它在这些异步场景中非常有用。

更多信息和示例查看此处

2.10. DelayQueue

DelayQueue 是一个无限大小的元素阻塞队列,只有当元素的过期时间(称为用户定义的延迟)到达时,才能提取该队列中的元素。因此,最上面的元素(头)将具有最多的延迟量,并且它将最后被轮询。

更多信息和示例查看此处。

2.11. Locks

毫不奇怪,Lock 是一个实用程序,用于阻塞其他线程访问特定的代码段,而不是当前正在执行它的线程。

LockSynchronized 块之间的主要区别在于,Synchronized 的块完全包含在方法中;但是,我们可以在单独的方法中使用 Lock API 的 Lock()unlock()操作。

更多信息和示例查看此处。

2.12. Phaser

Phaser 是一个比 CyclicBarrierCountDownLatch 更灵活的解决方案,用于充当可重复使用的屏障(barrier),在继续执行之前,动态数量的线程需要在其上等待。我们可以协调多个执行阶段,为每个程序阶段重用一个 Phaser 实例。

更多信息和示例查看此处。

3. 结论

在这篇高级概述文章中,我们重点介绍了 java.util.concurrent 包中不同的实用程序。