1. Overview
1.概述
In this article, we’ll be looking at the DelayQueue construct from the java.util.concurrent package. This is a blocking queue that could be used in producer-consumer programs.
在这篇文章中,我们将研究来自java.util.concurrent包的DelayQueue构造。这是一个阻塞队列,可用于生产者-消费者程序中。
It has a very useful characteristic – when the consumer wants to take an element from the queue, they can take it only when the delay for that particular element has expired.
它有一个非常有用的特性–当消费者想要从队列中取走一个元素时,他们只有在该特定元素的延迟过期后才能取走它。
2. Implementing Delayed for Elements in the DelayQueue
2.为DelayQueue中的元素实现Delayed
Each element we want to put into the DelayQueue needs to implement the Delayed interface. Let’s say that we want to create a DelayObject class. Instances of that class will be put into the DelayQueue.
我们想要放入DelayQueue的每个元素都需要实现Delayed接口。比方说,我们想创建一个DelayObject类。该类的实例将被放入DelayQueue.。
We’ll pass the String data and delayInMilliseconds as and arguments to its constructor:
我们将把String数据和delayInMilliseconds作为参数传递给其构造函数。
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
We are defining a startTime – this is a time when the element should be consumed from the queue. Next, we need to implement the getDelay() method – it should return the remaining delay associated with this object in the given time unit.
我们正在定义一个startTime – 这是一个元素应该从队列中被消耗的时间。接下来,我们需要实现getDelay() 方法–它应该返回在给定时间单位内与该对象相关的剩余延迟。
Therefore, we need to use the TimeUnit.convert() method to return the remaining delay in the proper TimeUnit:
因此,我们需要使用TimeUnit.convert()方法来返回适当TimeUnit的剩余延迟:。
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
When the consumer tries to take an element from the queue, the DelayQueue will execute getDelay() to find out if that element is allowed to be returned from the queue. If the getDelay() method will return zero or a negative number, it means that it could be retrieved from the queue.
当消费者试图从队列中获取一个元素时,DelayQueue将执行getDelay()以找出该元素是否被允许从队列中返回。如果getDelay()方法将返回0或一个负数,这意味着可以从队列中取回。
We also need to implement the compareTo() method, because the elements in the DelayQueue will be sorted according to the expiration time. The item that will expire first is kept at the head of the queue and the element with the highest expiration time is kept at the tail of the queue:
我们还需要实现compareTo() 方法,因为DelayQueue 中的元素将根据过期时间进行排序。将首先过期的项目被保存在队列的头部,过期时间最长的元素被保存在队列的尾部。
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3. DelayQueue Consumer and Producer
3.延迟队列C消费者和生产者
To be able to test our DelayQueue we need to implement producer and consumer logic. The producer class takes the queue, the number of elements to produce, and the delay of each message in milliseconds as arguments.
为了能够测试我们的DelayQueue,我们需要实现生产者和消费者逻辑。生产者类将队列、要生产的元素数量以及每个消息的延迟(以毫秒为单位)作为参数。
Then when the run() method is invoked, it puts elements into the queue, and sleeps for 500 milliseconds after each put:
然后当run()方法被调用时,它将元素放入队列中,并在每次放入后睡眠500毫秒。
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
The consumer implementation is very similar, but it also keeps track of the number of messages that were consumed:
消费者的实现非常相似,但它也记录了被消费的消息的数量。
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4. DelayQueue Usage Test
4.DelayQueue使用测试
To test the behavior of the DelayQueue, we’ll create one producer thread and one consumer thread.
为了测试DelayQueue的行为,我们将创建一个生产者线程和一个消费者线程。
The producer will put() two objects onto the queue with 500 milliseconds delay. The test asserts that the consumer consumed two messages:
生产者将put() 两个对象放到队列中,延迟500毫秒。该测试断言消费者消费了两条消息。
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
We can observe that running this program will produce the following output:
我们可以观察到,运行这个程序将产生以下输出。
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
The producer puts the object, and after awhile the first object for which the delay expired is consumed.
生产者放置对象,一段时间后,第一个延迟过期的对象被消耗掉。
The same situation occurred for the second element.
第二个要素也出现了同样的情况。
5. Consumer Not Able to Consume in the Given Time
5.消费者无法在规定时间内消费
Let’s say that we have a producer that is producing an element that will expire in 10 seconds:
假设我们有一个生产者,正在生产一个将在10秒内失效的元素。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
We’ll start our test, but it will terminate after 5 seconds. Due to the characteristics of the DelayQueue, the consumer will not be able to consume the message from the queue because the element hasn’t expired yet:
我们将开始我们的测试,但它将在5秒后终止。由于DelayQueue的特性,消费者将无法从队列中消费消息,因为该元素还没有过期。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
Note, that the consumer’s numberOfConsumedElements has a value equal to zero.
注意,消费者的numberOfConsumedElements的值等于零。
6. Producing an Element With Immediate Expiration
6.产生一个立即失效的元素
When the implementations of the Delayed message getDelay() method return a negative number, that means the given element has already expired. In this situation, the producer will consume that element immediately.
当DelayedmessagegetDelay()方法的实现返回一个负数,这意味着给定的元素已经过期。在这种情况下,生产者将立即消耗该元素。
We can test the situation of producing an element with negative delay:
我们可以测试一下生产一个具有负延迟的元素的情况。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
When we start the test case, the consumer will consume the element immediately because it has already expired:
当我们开始测试案例时,消费者将立即消费该元素,因为它已经过期了。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. Conclusion
7.结论
In this article, we were looking at the DelayQueue construct from the java.util.concurrent package.
在这篇文章中,我们正在研究来自java.util.concurrent包的DelayQueue结构。
We implemented a Delayed element that was produced and consumed from the queue.
我们实现了一个延迟元素,从队列中生产和消费。
We leveraged our implementation of the DelayQueue to consume elements that had expired.
我们利用我们对DelayQueue的实现来消耗已经过期的元素。
The implementation of all these examples and code snippets can be found in the GitHub project – which is a Maven project, so it should be easy to import and run as it is.
所有这些例子和代码片段的实现都可以在GitHub项目中找到–它是一个Maven项目,所以应该很容易导入并按原样运行。