1. Overview
1.概述
In this tutorial, we’ll learn how to implement the Producer-Consumer problem in Java. This problem is also known as the bounded-buffer problem.
在本教程中,我们将学习如何在Java中实现生产者-消费者问题。这个问题也被称为有界缓冲区问题。
For more details on the problem, we can refer to the Producer-Consumer Problem wiki page. For Java threading/concurrency basics, make sure to visit our Java Concurrency article.
关于这个问题的更多细节,我们可以参考生产者-消费者问题维基页面。关于Java线程/并发的基础知识,请务必访问我们的Java并发文章。
2. Producer-Consumer Problem
2.生产者-消费者问题
Producer and Consumer are two separate processes. Both processes share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.
生产者和消费者是两个独立的进程。两个进程共享一个共同的缓冲区或队列。生产者不断产生某些数据并将其推送到缓冲区,而消费者则从缓冲区消费这些数据。
Let’s review a diagram showing this simple scenario:
让我们回顾一下显示这一简单场景的图表。
Inherently, this problem has certain complexities to deal with:
从本质上讲,这个问题有某些复杂的问题需要处理。
- Both producer and consumer may try to update the queue at the same time. This could lead to data loss or inconsistencies.
- Producers might be slower than consumers. In such cases, the consumer would process elements fast and wait.
- In some cases, the consumer can be slower than a producer. This situation leads to a queue overflow issue.
- In real scenarios, we may have multiple producers, multiple consumers, or both. This may cause the same message to be processed by different consumers.
The diagram below depicts a case with multiple producers and multiple consumers:
下图描述了一个有多个生产者和多个消费者的案例。
We need to handle resource sharing and synchronization to solve a few complexities:
我们需要处理资源共享和同步,以解决一些复杂的问题。
- Synchronization on queue while adding and removing data
- On queue empty, the consumer has to wait until the producer adds new data to the queue
- When the queue is full, the producer has to wait until the consumer consumes data and the queue has some empty buffer
3. Java Example Using Threads
3.使用线程的Java实例
We have defined a separate class for each entity of the problem.
我们为问题的每个实体定义了一个单独的类。
3.1. Message Class
3.1 消息类
The Message class holds the produced data:
Message类持有产生的数据。
public class Message {
private int id;
private double data;
// constructors and getter/setters
}
The data could be of any type. It may be a JSON String, a complex object, or just a number. Also, it’s not mandatory to wrap data into a Message class.
数据可以是任何类型的。它可以是一个JSON字符串,一个复杂的对象,或者只是一个数字。另外,并不强制要求将数据包入Message类。
3.2. DataQueue Class
3.2 DataQueue类
The shared queue and related objects are wrapped into the DataQueue class:
共享队列和相关对象被包装成DataQueue类。
public class DataQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object FULL_QUEUE = new Object();
private final Object EMPTY_QUEUE = new Object();
DataQueue(int maxSize) {
this.maxSize = maxSize;
}
// other methods
}
To make the bounded buffer, a queue and its maxSize are taken.
为了制作有界缓冲区,需要一个队列及其最大尺寸。
In Java, the synchronized block uses an object to achieve thread synchronization. Each object has an intrinsic lock. Only the thread that acquires the lock first is allowed to execute the synchronized block.
在Java中,synchronized块使用一个对象来实现线程同步。每个对象都有一个内在的锁。只有首先获得锁的线程才允许执行synchronized块。
Here, we created two references, FULL_QUEUE and EMPTY_QUEUE, to use for synchronization. As there is no other purpose for these handles, we initialized them using the Object class.
在这里,我们创建了两个引用,FULL_QUEUE和EMPTY_QUEUE,以用于同步。由于这些句柄没有其他用途,我们使用Object类来初始化它们。
When the queue is full, the producer waits on the FULL_QUEUE object. And, the consumer notifies as soon as it consumes a message.
当队列满了,生产者在FULL_QUEUE对象上等待。而且,消费者一旦消费了一个消息,就会通知。
The producer process calls the waitOnFull method:
生产者进程调用waitOnFull方法。
public void waitOnFull() throws InterruptedException {
synchronized (FULL_QUEUE) {
FULL_QUEUE.wait();
}
}
And the consumer process notifies the producer through the notifyAllForFull method:
而消费者进程通过notifyAllForFull方法通知生产者。
public void notifyAllForFull() {
synchronized (FULL_QUEUE) {
FULL_QUEUE.notifyAll();
}
}
If the queue is empty, the consumer waits on the EMPTY_QUEUE object. And, the producer notifies it as soon as a message is added to the queue.
如果队列是空的,消费者在EMPTY_QUEUE对象上等待。而且,一旦有消息被添加到队列中,生产者就会通知它。
The consumer process waits using the waitOnEmpty method:
消费者进程使用waitOnEmpty方法进行等待。
public void waitOnEmpty() throws InterruptedException {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.wait();
}
}
The producer notifies the consumer using the notifyAllForEmpty method:
生产者使用notifyAllForEmpty方法通知消费者。
public void notifyAllForEmpty() {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.notify();
}
}
And the producer uses the add() method to add a message to the queue:
而生产者使用add()方法向队列中添加一条消息。
public void add(Message message) {
synchronized (queue) {
queue.add(message);
}
}
The consumer calls the remove method to retrieve a message from the queue:
消费者调用remove方法来从队列中取回一个消息。
public Message remove() {
synchronized (queue) {
return queue.poll();
}
}
3.3. Producer Class
3.3.生产者类
The Producer class implements the Runnable interface to enable thread creation:
Producer类实现了Runnable接口以实现线程创建。
public class Producer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
produce();
}
// Other methods
}
The constructor uses the shared dataQueue parameter. Member variable runFlag helps in stopping the producer process gracefully. It is initialized to true.
构造函数使用共享的dataQueue参数。成员变量runFlag有助于优雅地停止生产者进程。它被初始化为true。
Thread start calls the produce() method:
线程启动调用produce()方法。
public void produce() {
while (runFlag) {
Message message = generateMessage();
while (dataQueue.isFull()) {
try {
dataQueue.waitOnFull();
} catch (InterruptedException e) {
break;
}
}
if (!runFlag) {
break;
}
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
}
}
The producer runs steps continuously in a while loop. This loop breaks when runFlag is false.
生产者在一个while循环中连续运行步骤。当runFlag为false时,这个循环会中断。
In each iteration, it generates a message. Then, it checks to see if the queue is full and waits as needed. Instead of an if block, a while loop is used to check whether the queue is full. This is to avoid a spurious wake-up from the wait state.
在每个迭代中,它都会生成一条消息。然后,它检查队列是否已满,并根据需要进行等待。我们不使用if块,而使用while循环来检查队列是否已满。这是为了避免在等待状态下出现虚假的唤醒。
When the producer wakes up from wait, it checks whether it still needs to continue or break out from the process. It adds a message to the queue and notifies a consumer waiting on an empty queue.
当生产者从等待中醒来时,它检查它是否仍然需要继续或脱离这个过程。它向队列中添加一条消息,并通知在空队列中等待的消费者。
The stop() method terminates the process gracefully:
stop()方法会优雅地终止进程。
public void stop() {
runFlag = false;
dataQueue.notifyAllForFull();
}
After changing runFlag to false, all the producers that are waiting in a “queue full” state are notified. This ensures that all producer threads terminate.
在将runFlag改为false后,所有在 “队列满 “状态下等待的生产者都会被通知。这确保了所有生产者线程的终止。
3.4. Consumer Class
3.4.消费类
The Consumer class implements Runnable to enable thread creation:
Consumer类实现了Runnable以实现线程创建。
public class Consumer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
public Consumer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
consume();
}
// Other methods
}
Its constructor has a shared dataQueue as a parameter. The runFlag is initialized to true. This flag stops the consumer process when needed.
它的构造函数有一个共享的dataQueen作为参数。runFlag被初始化为true。这个标志在需要的时候会停止消费者进程。
When the thread starts, it runs the consume method:
当线程开始时,它运行consume方法。
public void consume() {
while (runFlag) {
Message message;
if (dataQueue.isEmpty()) {
try {
dataQueue.waitOnEmpty();
} catch (InterruptedException e) {
break;
}
}
if (!runFlag) {
break;
}
message = dataQueue.remove();
dataQueue.notifyAllForFull();
useMessage(message);
}
}
It has a continuously running while loop. And, this process stops gracefully when the runFlag is false.
它有一个持续运行的while循环。而且,当runFlag为false时,这个过程优雅地停止。
Each iteration checks if the queue is empty. If the queue is empty, the consumer waits for a message to be produced. This wait is also used by the while loop to avoid spurious wakeups.
每次迭代都会检查队列是否为空。如果队列是空的,消费者等待消息的产生。这种等待也被while循环所使用,以避免虚假的唤醒。
When the consumer wakes up from wait, it checks the runFlag. If the flag is false, then it breaks out of the loop. Otherwise, it reads a message from the queue and notifies the producer that it’s waiting in the “full queue” state. Finally, it consumes the message.
当消费者从等待中醒来时,它检查runFlag。如果该标志是false,那么它就会跳出循环。否则,它从队列中读取一条消息,并通知生产者它正在 “满队 “状态下等待。最后,它消费该消息。
To stop the process gracefully, it uses the stop() method:
为了优雅地停止进程,它使用stop()方法。
public void stop() {
runFlag = false;
dataQueue.notifyAllForEmpty();
}
After runFlag is set to false, all the consumers that are waiting in an empty queue state are notified. This ensures that all consumer threads terminate.
在runFlag被设置为false之后,所有在空队列状态下等待的消费者都被通知。这确保了所有消费者线程的终止。
3.5. Running Producer and Consumer Threads
3.5.运行生产者和消费者线程
Let’s create a dataQueue object with max required capacity:
让我们创建一个具有最大要求容量的dataQueue对象。
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
Now, let’s create producer object and a thread:
现在,让我们创建producer对象和一个线程。
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
Then, we’ll initialize a consumer object and a thread:
然后,我们将初始化一个consumer对象和一个线程。
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
Finally, we start the threads to initiate the process:
最后,我们启动线程来启动这个过程。
producerThread.start();
consumerThread.start();
It runs continuously until we want to stop those threads. Stopping them is simple:
它连续运行,直到我们想停止这些线程。停止它们很简单。
producer.stop();
consumer.stop();
3.6. Running Multiple Producers and Consumers
3.6.运行多个生产者和消费者
Running multiple producers and consumers is similar to the single producer and consumer case. We just need to create the required number of threads and start them.
运行多个生产者和消费者与单个生产者和消费者的情况类似。我们只需要创建所需数量的线程并启动它们。
Let’s create multiple producers and threads and start them:
让我们创建多个生产者和线程,并启动它们。
Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) {
Thread producerThread = new Thread(producer);
producerThread.start();
}
Next, let’s create the required number of consumer objects and threads:
接下来,让我们创建所需数量的消费者对象和线程。
Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
}
We can stop the process gracefully by calling the stop() method on producer and consumer objects:
我们可以通过调用生产者和消费者对象的stop()方法来优雅地停止这个过程。
producer.stop();
consumer.stop();
4. Simplified Example Using BlockingQueue
4.使用BlockingQueue的简化示例
Java provides a BlockingQueue interface that is thread-safe. In other words, multiple threads can add and remove from this queue without any concurrency issues.
Java提供了一个BlockingQueue接口,是线程安全的。换句话说,多个线程可以从这个队列中添加和删除,而没有任何并发问题。
Its put() method blocks the calling thread if the queue is full. Similarly, if the queue is empty, its take() method blocks the calling thread.
如果队列是满的,它的put()方法会阻塞调用线程。同样地,如果队列是空的,它的take()方法也会阻塞调用线程。
4.1. Create Bounded BlockingQueue
4.1.创建有边界的阻塞队列
We can create a bounded BlockingQueue using a capacity value in the constructor:
我们可以在构造函数中使用一个容量值来创建一个有边界的BlockingQueue。
BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);
4.2. Simplified produce Method
4.2.简化的生产方法
In the produce() method, we can avoid explicit synchronization for our queue:
在produce()方法中,我们可以避免为我们的队列进行显式同步。
private void produce() {
while (true) {
double value = generateValue();
try {
blockingQueue.put(value);
} catch (InterruptedException e) {
break;
}
}
}
This method continuously produces objects and just adds them to the queue.
这个方法不断地产生对象,只是把它们添加到队列中。
4.3. Simplified consume Method
4.3.简化的消费方法
The consume() method uses no synchronization explicitly:
consume()方法没有明确使用同步。
private void consume() {
while (true) {
Double value;
try {
value = blockingQueue.take();
} catch (InterruptedException e) {
break;
}
// Consume value
}
}
It just takes a value from the queue and consumes it, continuously.
它只是从队列中获取一个值,并不断地消耗它。
4.4. Run Producer and Consumer Threads
4.4.运行生产者和消费者线程
We can create as many producers and consumer threads as required:
我们可以根据需要创建尽可能多的生产者和消费者线程。
for (int i = 0; i < 2; i++) {
Thread producerThread = new Thread(this::produce);
producerThread.start();
}
for (int i = 0; i < 3; i++) {
Thread consumerThread = new Thread(this::consume);
consumerThread.start();
}
5. Conclusion
5.总结
In this article, we’ve learned how to implement the Producer-Consumer problem using Java Threads. Also, we learned how to run scenarios with multiple producers and consumers.
在这篇文章中,我们已经学会了如何使用Java线程实现生产者-消费者问题。此外,我们还学习了如何运行具有多个生产者和消费者的场景。
A complete code sample can be found over on GitHub.
完整的代码样本可以在GitHub上找到over。