1. Overview
1.概述
In this tutorial, we’ll look into java.util.concurrent.Exchanger<T>. This works as a common point for two threads in Java to exchange objects between them.
在本教程中,我们将研究java.util.concurrent.Exchanger<T>.这作为Java中两个线程的共同点,在它们之间交换对象。
2. Introduction to Exchanger
2.交换器简介
The Exchanger class in Java can be used to share objects between two threads of type T. The class provides only a single overloaded method exchange(T t).
Java中的Exchanger类可用于在两个类型为T的线程之间共享对象。该类只提供了一个重载方法exchange(T t)。
When invoked exchange waits for the other thread in the pair to call it as well. At this point, the second thread finds the first thread is waiting with its object. The thread exchanges the objects they are holding and signals the exchange, and now they can return.
当调用exchange时,会等待这对线程中的另一个线程也来调用它。这时,第二个线程发现第一个线程正带着它的对象在等待。线程交换它们所持有的对象并发出交换信号,现在它们可以返回了。
Let’s look into an example to understand message exchange between two threads with Exchanger:
让我们看一个例子,了解两个线程之间用Exchanger进行的消息交换。
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(
runAsync(taskA), runAsync(taskB)).join();
}
Here, we have the two threads exchanging messages between each other using the common exchanger. Let’s see an example where we exchange an object from the main thread with a new thread:
在这里,我们有两个线程使用公共交换器在彼此之间交换消息。让我们看一个例子,我们从主线程与一个新线程交换一个对象。
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result
= CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
Note that, we need to start the runner thread first and later call exchange() in the main thread.
注意,我们需要先启动运行器线程,然后在主线程中调用exchange()。
Also, note that the first thread’s call may timeout if the second thread does not reach the exchange point in time. How long the first thread should wait can be controlled using the overloaded exchange(T t, long timeout, TimeUnit timeUnit).
另外,请注意,如果第二个线程没有及时到达交换点,第一个线程的调用可能会超时。第一个线程应该等待多长时间,可以使用重载的exchange(T t, long timeout, TimeUnit timeUnit)来控制。
3. No GC Data Exchange
3.没有GC数据交换
Exchanger could be used to create pipeline kind of patterns with passing data from one thread to the other. In this section, we’ll create a simple stack of threads continuously pass data between each other as a pipeline.
Exchanger可以用来创建流水线式的模式,将数据从一个线程传递到另一个。在本节中,我们将创建一个简单的线程堆栈,作为管道在彼此之间不断传递数据。
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
readerBuffer = readerExchanger.exchange(readerBuffer);
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writerBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
processorBuffer = readerExchanger.exchange(processorBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
Runnable writer = () -> {
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
writerBuffer = writerExchanger.exchange(writerBuffer);
while (true) {
System.out.println(writerBuffer.poll());
if (writerBuffer.isEmpty()) {
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
CompletableFuture.allOf(
runAsync(reader),
runAsync(processor),
runAsync(writer)).join();
}
Here, we have three threads: reader, processor, and writer. Together, they work as a single pipeline exchanging data between them.
这里,我们有三个线程。读取器、处理器和写入器。它们一起作为一个单一的管道工作,在它们之间交换数据。
The readerExchanger is shared between the reader and the processor thread, while the writerExchanger is shared between the processor and the writer thread.
readerExchanger在reader和processor线程之间共享,而writerExchanger则在processor和writer线程之间共享。
Note that the example here is only for demonstration. We must be careful while creating infinite loops with while(true). Also to keep the code readable, we’ve omitted some exceptions handling.
请注意,这里的例子只是为了演示。在用while(true)创建无限循环的时候,我们必须小心。同时为了保持代码的可读性,我们省略了一些异常处理。
This pattern of exchanging data while reusing the buffer allows having less garbage collection. The exchange method returns the same queue instances and thus there would be no GC for these objects. Unlike any blocking queue, the exchanger does not create any nodes or objects to hold and share data.
这种在重用缓冲区的同时交换数据的模式允许有更少的垃圾收集。交换方法返回相同的队列实例,因此这些对象将不会有GC。与任何阻塞队列不同,交换器不创建任何节点或对象来保存和共享数据。
Creating such a pipeline is similar to the Disrupter pattern, with a key difference, the Disrupter pattern supports multiple producers and consumers, while an exchanger could be used between a pair of consumers and producers.
创建这样的管道与Disrupter模式类似,但有一个关键的区别,Disrupter模式支持多个生产者和消费者,而交换器可用于一对消费者和生产者之间。
4. Conclusion
4. Conclusion
So, we have learned what Exchanger<T> is in Java, how it works, and we have seen how to use the Exchanger class. Also, we created a pipeline and demonstrated GC-less data exchange between threads.
因此,我们已经了解了Exchanger<T>在Java中是什么,它是如何工作的,并且我们已经看到了如何使用Exchanger类。此外,我们还创建了一个管道,并演示了线程之间无GC的数据交换。
As always, the code is available over on GitHub.
像往常一样,代码可在GitHub上获得。