1. Overview
1.概述
In this article, we will look at one of the most useful constructs java.util.concurrent to solve the concurrent producer-consumer problem. We’ll look at an API of the BlockingQueue interface and how methods from that interface make writing concurrent programs easier.
在这篇文章中,我们将看看最有用的结构之一java.util.concurrent,以解决并发生产者-消费者问题。我们将看看BlockingQueue接口的一个API,以及该接口的方法如何使编写并发程序变得更容易。
Later in the article, we will show an example of a simple program that has multiple producer threads and multiple consumer threads.
在文章的后面,我们将展示一个简单程序的例子,该程序有多个生产者线程和多个消费者线程。
2. BlockingQueue Types
2.阻塞队列类型
We can distinguish two types of BlockingQueue:
我们可以区分两种类型的BlockingQueue。
- unbounded queue – can grow almost indefinitely
- bounded queue – with maximal capacity defined
2.1. Unbounded Queue
2.1.无界队列
Creating unbounded queues is simple:
创建无界队列很简单。
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
The Capacity of blockingQueue will be set to Integer.MAX_VALUE. All operations that add an element to the unbounded queue will never block, thus it could grow to a very large size.
blockingQueue的容量将被设置为Integer.MAX_VALUE.所有向无界队列添加元素的操作都不会阻塞,因此它可以增长到非常大的尺寸。
The most important thing when designing a producer-consumer program using unbounded BlockingQueue is that consumers should be able to consume messages as quickly as producers are adding messages to the queue. Otherwise, the memory could fill up and we would get an OutOfMemory exception.
在设计一个使用无界BlockingQueue的生产者-消费者程序时,最重要的是消费者应该能够像生产者向队列添加消息一样快速地消费消息。否则,内存会被填满,我们会得到一个OutOfMemory异常。
2.2. Bounded Queue
2.2.有界限的队列
The second type of queues is the bounded queue. We can create such queues by passing the capacity as an argument to a constructor:
第二种类型的队列是有界队列。我们可以通过将容量作为参数传递给构造函数来创建这种队列。
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
这里我们有一个blockingQueue,它的容量等于10。这意味着当生产者试图向已经满了的队列添加一个元素时,根据用来添加的方法(offer(), add() 或 put()),它将被阻塞,直到插入对象的空间变得可用。否则,操作将失败。
Using bounded queue is a good way to design concurrent programs because when we insert an element to an already full queue, that operations need to wait until consumers catch up and make some space available in the queue. It gives us throttling without any effort on our part.
使用有界队列是设计并发程序的好方法,因为当我们向一个已经满了的队列插入一个元素时,该操作需要等待,直到消费者赶上并在队列中腾出一些空间。它为我们提供了节流,而不需要我们做任何努力。
3. BlockingQueue API
3.BlockingQueue API
There are two types of methods in the BlockingQueue interface – methods responsible for adding elements to a queue and methods that retrieve those elements. Each method from those two groups behaves differently in case the queue is full/empty.
在BlockingQueue接口中,有两种类型的方法—负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,这两组方法的行为是不同的。
3.1. Adding Elements
3.1.添加元素
- add() – returns true if insertion was successful, otherwise throws an IllegalStateException
- put() – inserts the specified element into a queue, waiting for a free slot if necessary
- offer() – returns true if insertion was successful, otherwise false
- offer(E e, long timeout, TimeUnit unit) – tries to insert element into a queue and waits for an available slot within a specified timeout
3.2. Retrieving Elements
3.2.检索元素
- take() – waits for a head element of a queue and removes it. If the queue is empty, it blocks and waits for an element to become available
- poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout
These methods are the most important building blocks from BlockingQueue interface when building producer-consumer programs.
在构建生产者-消费者程序时,这些方法是来自BlockingQueue界面的最重要的构建块。
4. Multithreaded Producer-Consumer Example
4.多线程的生产者-消费者实例
Let’s create a program that consists of two parts – a Producer and a Consumer.
让我们创建一个由两部分组成的程序–一个生产者和一个消费者。
The Producer will be producing a random number from 0 to 100 and will put that number in a BlockingQueue. We’ll have 4 producer threads and use the put() method to block until there’s space available in the queue.
生产者将产生一个从0到100的随机数,并将该数字放入一个BlockingQueue。我们将有4个生产者线程,并使用put()方法来阻塞,直到队列中有可用空间。
The important thing to remember is that we need to stop our consumer threads from waiting for an element to appear in a queue indefinitely.
需要记住的是,我们需要阻止我们的消费者线程无限期地等待一个元素出现在队列中。
A good technique to signal from producer to the consumer that there are no more messages to process is to send a special message called a poison pill. We need to send as many poison pills as we have consumers. Then when a consumer will take that special poison pill message from a queue, it will finish execution gracefully.
从生产者到消费者发出信号说没有更多的消息要处理的一个好技术是发送一个特殊的消息,称为毒丸。我们需要发送尽可能多的毒丸,因为我们有消费者。然后,当一个消费者从队列中取出那个特殊的毒丸消息时,它将优雅地完成执行。
Let’s look at a producer class:
让我们来看看一个生产者类。
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
Our producer constructor takes as an argument the BlockingQueue that is used to coordinate processing between the producer and the consumer. We see that method generateNumbers() will put 100 elements in a queue. It takes also poison pill message, to know what type of message must be put into a queue when the execution will be finished. That message needs to be put poisonPillPerProducer times into a queue.
我们的生产者构造函数以BlockingQueue为参数,该参数用于协调生产者和消费者之间的处理。我们看到方法generateNumbers() 将把100个元素放入队列。它还需要毒丸消息,以知道当执行结束时,什么类型的消息必须被放入队列。该消息需要被放入poisonPillPerProducer次队列中。
Each consumer will take an element from a BlockingQueue using take() method so it will block until there is an element in a queue. After taking an Integer from a queue it checks if the message is a poison pill, if yes then execution of a thread is finished. Otherwise, it will print out the result on standard output along with current thread’s name.
每个消费者将使用take()方法从BlockingQueen中获取一个元素,所以它将阻塞,直到队列中有一个元素。从队列中取出一个Integer后,它将检查该消息是否是一个毒药,如果是,那么线程的执行就结束了。否则,它将在标准输出端打印出结果和当前线程的名称。
This will give us insight into inner workings of our consumers:
这将使我们深入了解消费者的内部运作。
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
The important thing to notice is the usage of a queue. Same as in the producer constructor, a queue is passed as an argument. We can do it because BlockingQueue can be shared between threads without any explicit synchronization.
需要注意的是队列的使用。和生产者构造函数一样,一个队列被作为一个参数传递。我们可以这样做,因为BlockingQueue可以在线程之间共享而不需要任何显式同步。
Now that we have our producer and consumer, we can start our program. We need to define the queue’s capacity, and we set it to 100 elements.
现在我们有了生产者和消费者,我们可以开始我们的程序。我们需要定义队列的容量,我们把它设置为100个元素。
We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:
我们希望有4个生产者线程,消费者线程的数量将等于可用处理器的数量。
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue is created using construct with a capacity. We’re creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.
BlockingQueue是使用有容量的构造创建的。我们要创建4个生产者和N个消费者。我们指定我们的毒丸消息为Integer.MAX_VALUE,因为在正常工作条件下,这样的值永远不会被我们的生产者发送。这里要注意的最重要的事情是,BlockingQueue被用来协调它们之间的工作。
When we run the program, 4 producer threads will be putting random Integers in a BlockingQueue and consumers will be taking those elements from the queue. Each thread will print to standard output the name of the thread together with a result.
当我们运行程序时,4个生产者线程将把随机的Integers放入一个BlockingQueue,消费者将从队列中获取这些元素。每个线程将把线程的名字和结果一起打印到标准输出。
5. Conclusion
5.结论
This article shows a practical use of BlockingQueue and explains methods that are used to add and retrieve elements from it. Also, we’ve shown how to build a multithreaded producer-consumer program using BlockingQueue to coordinate work between producers and consumers.
本文展示了BlockingQueue的实际用途,并解释了用于添加和检索其中的元素的方法。此外,我们还展示了如何使用BlockingQueue构建一个多线程的生产者-消费者程序,以协调生产者和消费者之间的工作。
The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven-based project, so it should be easy to import and run as it is.
所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个基于Maven的项目,所以应该很容易按原样导入和运行。