Reading Flux Into a Single InputStream Using Spring Reactive WebClient – 使用Spring Reactive WebClient将Flux读入一个单一的InputStream中

最后修改: 2022年 7月 27日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll deep dive into Java reactive programming to solve an interesting problem of how to read Flux<DataBuffer> into a single InputStream.

在本教程中,我们将深入研究Java反应式编程,以解决一个有趣的问题:如何将Flux<DataBuffer>读入单个InputStream

2. Request Setup

2.要求设置

As a first step to solving the problem of reading Flux<DataBuffer> into a single InputStream, we’ll use the Spring reactive WebClient for making a GET request. Further, we can use one of the public API endpoints hosted by gorest.co.in for such testing scenarios:

作为解决将Flux<DataBuffer>读入单个InputStream问题的第一步,我们将使用Spring reactive WebClient来进行GET请求。此外,我们可以使用gorest.co.in托管的公共API端点之一来进行此类测试场景。

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

Next, let’s define the getWebClient() method for getting a new instance of the WebClient class:

接下来,让我们定义getWebClient()方法,以获得WebClient类的新实例。

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

At this point, we’re ready to make a GET request to the /public/v2/users endpoint. However, we must get the response body as a Flux<DataBuffer> object. So, let’s move on to the next section about BodyExtractors to do precisely this.

在这一点上,我们已经准备好向/public/v2/users端点发出GET请求。然而,我们必须以Flux<DataBuffer>对象的形式获得响应体。所以,让我们进入下一节关于BodyExtractors的内容,以准确地完成这个任务。

3. BodyExtractors and DataBufferUtils

3.BodyExtractorsDataBufferUtils

We can use the toDataBuffers() method of the BodyExtractors class available in spring-webflux to extract the response body into Flux<DataBuffer>.

我们可以使用BodyExtractors类的toDataBuffers()方法,该类在spring-webflux中可用,将响应体提取到Flux<DataBuffer>

Let’s go ahead and create body as an instance of Flux<DataBuffer> type:

让我们继续创建body作为Flux<DataBuffer>类型的实例。

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

Next, as we require to collect these streams of DataBuffer into a single InputStream, a good strategy to achieve this is by using PipedInputStream and PipedOutputStream.

接下来,由于我们要求将这些DataBuffer流收集到一个InputStream中,实现这一目标的好策略是使用PipedInputStreamPipedOutputStream

Further, we intend to write to the PipedOutputStream and eventually read from the PipedInputStream. So, let’s see how we can create these two connected streams:

此外,我们打算写到PipedOutputStream,并最终从PipedInputStream读取。所以,让我们看看如何创建这两个相连的流。

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

We must note that the default size is 1024 bytes. However, we expect that the collected result from the Flux<DataBuffer> could exceed the default value. Therefore, we need to explicitly specify a larger value for the size, which in this case is 1024*10.

我们必须注意,默认的大小是1024字节。然而,我们预计从Flux<DataBuffer>收集的结果可能会超过默认值。因此,我们需要明确指定一个更大的大小值,在这种情况下是1024*10

Finally, we use the write() utility method available in the DataBufferUtils class for writing body as a publisher to outputStream:

最后,我们使用write()类中可用的DataBufferUtils实用方法,将body作为一个发布器写入outputStream

DataBufferUtils.write(body, outputStream).subscribe();

We must note that we connected inputStream to outputStream at the time of declaration. So, we’re good to read from inputStream. Let’s move on to the next section to see this in action.

我们必须注意到,我们在声明时inputStream连接到outputStream所以,我们可以从inputStream读取。让我们转到下一节,看看这个动作。

4. Reading From the PipedInputStream

4.从PipedInputStream中读取

First, let’s defined a helper method, readContent(), to read an InputStream as a String object:

首先,让我们定义一个辅助方法,readContent(),以读取InputStream作为String对象。

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

Next, because it’s a typical practice to read the PipedInputStream in a different thread, let’s create the readContentFromPipedInputStream() method  that internally spawns a new thread to read contents from the PipedInputStream into a String object by calling the readContent() method:

接下来,因为它是一个典型的 在不同的线程中读取PipedInputStream的做法。让我们创建readContentFromPipedInputStream()方法,在内部生成一个新的线程,通过调用PipedInputStream方法将内容从String对象中读取。

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

At this stage, our code is ready to use for a simulation. Let’s see it in action:

在这个阶段,我们的代码已经可以用于模拟。让我们看看它的运行情况。

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

As we’re dealing with an asynchronous system, we’re delaying the read by an arbitrary 3 secs before reading from the stream so that we’re able to see the complete response. Additionally, at the time of logging, we’re inserting a newline character to break the long output to multiple lines.

由于我们处理的是一个异步系统,我们在从流中读取之前,将读取的时间任意延迟3秒,这样我们就能看到完整的响应。此外,在记录的时候,我们插入一个换行符,将长的输出分成多行。

Finally, let’s verify the output generated by the code execution:

最后,让我们验证一下代码执行所产生的输出。

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]

That’s it! It looks like we’ve got it all right.

这就是了!看起来我们已经得到了所有的权利。

5. Conclusion

5.总结

In this article, we used the concept of piped streams and the utility methods available in the BodyExtractors and DataBufferUtils classes to read Flux<DataBuffer> into a single InputStream.

在这篇文章中,我们使用了管道流的概念和BodyExtractorsDataBufferUtils类中的实用方法,将Flux<DataBuffer>读入一个InputStream

As always, the complete source code for the tutorial is available over on GitHub.

一如既往,该教程的完整源代码可在GitHub上获取