编程

JAVA 阻塞队列 java.util.concurrent.BlockingQueue 指南

811 2024-07-02 17:52:00

1. 概述

本文中,我们将研究解决并发生产者-消费者问题的最有用的构造之一 java.util.concurrent。我们将研究 BlockingQueue 接口的 API,以及来自该接口的方法如何使编写并发程序变得更容易。

在本文的后面,我们将展示一个简单程序的示例,该程序具有多个生产者线程和多个消费者线程。

2. BlockingQueue 类型

我们可以区分两种类型的 BlockingQueue:

  • 无限队列(unbounded queue)–几乎可以无限增长
  • 有界队列(bounded queue)–定义了最大容量

2.1. Unbounded Queue

创建无限队列很简单:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

blockingQueue 的容量将被设置为 Integer.MAX_VALUE所有将元素添加到无限队列的操作都不会阻塞,因此它可能会增长到非常大的大小。

当使用无限 BlockingQueue 设计生产者-消费者程序时,最重要的一点是消费者应该能够像生产者向队列添加消息一样快速地消费消息。否则,内存可能会被填满,可能会出现 OutOfMemory 异常。

2.2. Bounded Queue

第二种类型的队列是有界队列。我们可以通过将容量作为参数传递给构造函数来创建这样的队列:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

这里我们有一个容量为 10 的 blockingQueue。这意味着,当生产者试图通过offer()add()put() 方法将一个元素添加到已经满载的队列中时,它将发生阻塞,直到插入对象的空间可用。否则,操作将失败。

使用有界队列是设计并发程序的好方法,因为当我们将元素插入到已经满的队列中时,该操作需要等待消费者赶上并在队列中腾出一些可用空间。它让我们无需任何努力的情况下节流。

3. BlockingQueue API

BlockingQueue 接口有两种类型的方法,一种是负责将元素添加到队列的方法,另一种是检索元素的方法。这两组方法中的每个方法在队列满载或者为空时的表现都不一样。

3.1. 添加元素

  • add()如果插入成功则返回 true。否则抛出 IllegalStateException
  • put()将特定的元素插入到队列中,必要时等待空闲插槽
  • offer()如果插入成功则返回 true。否则返回 false
  • offer(E e, long timeout, TimeUnit unit)尝试将元素插入队列,并在指定的超时时间内等待可用的插槽

3.2. 检索元素

  • take() – 等待队列的头元素并将其删除。如果队列为空,则阻塞并等待元素变为可用
  • poll(long timeout, TimeUnit unit)检索并删除队列的头元素,如果需要,则等待指定的时间直到元素变为可用。超时后返回 null

在构建生产者-消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

4. 多线程生产者-消费者示例

让我们创建一个由两部分组成的程序——生产者和消费者。

生产者将产生一个从 0 到 100 的随机数,并将该数放入 BlockingQueue。我们将有 4 个生产者线程,并使用 put() 方法进行阻塞,直到队列中有可用空间。

需要记住的重要一点是,我们需要阻止消费者线程无限期地等待元素出现在队列中。

从生产者向消费者发出信号,表明没有更多的消息需要处理的一个好方法是发送一个称为毒丸的特殊消息。我们需要发送尽可能多的毒丸给消费者。然后,当消费者从队列中获取特殊的毒丸消息时,它将优雅地完成执行。

让我们来看一个生产者(producer)类:

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
    
    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

我们的生产者构造函数将用于协调生产者和消费者之间的处理的 BlockingQueue 作为参数。我们看到方法 generateNumbers() 将在一个队列中放入 100 个元素。它还需要毒丸消息,以了解在执行完成时必须将什么类型的消息放入队列。该消息需要放入 poisonPillPerProducer 次到队列中。

每个消费者将使用 take() 方法从 BlockingQueue 中获取一个元素,因此它将阻塞,直到队列中有一个元素为止。从队列中获取 Integer 后,它会检查消息是否是毒丸,如果是,则线程的执行结束。否则,它将在标准输出中打印出结果以及当前线程的名称。

这将使我们深入了解消费者的内部运作:

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;
    
    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

需要注意的一件重要事情是队列的使用。与生产者构造函数中相同,队列是作为参数传递的。我们之所以能做到这一点,是因为 BlockingQueue 可以在线程之间共享,而无需任何显式同步。

既然我们有了生产者和消费者,我们就可以开始我们的计划了。我们需要定义队列的容量,将其设置为 100 个元素。

我们希望有 4 个生产者线程,消费者线程的数量将等于可用处理器的数量:

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

BlockingQueue 是使用具有容量的构造创建的。我们正在创建 4 个生产者和 N 个消费者。我们将毒丸消息指定为 Integer.MAX_VALUE,因为在正常工作条件下,生产者永远不会发送这样的值。这里需要注意的最重要的一点是,BlockingQueue 用于协调它们之间的工作。

当我们运行该程序时,4 个生产者线程将把随机整数放入 BlockingQueue 中,消费者将从队列中获取这些元素。每个线程都会将线程的名称和结果一起打印到标准输出中。

5. 结论

本文展示了 BlockingQueue 的实际使用,并解释了从中添加和检索元素的方法。此外,我们还展示了如何使用BlockingQueue 构建多线程生产者-消费者程序,以协调生产者和消费者之间的工作。

所有这些示例和代码片段的实现都可以在 GitHub 项目中找到——这是一个基于 Maven 的项目,因此导入和运行应该很容易。Github 仓库: https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-collections