1. Introduction
1.绪论
AWS offers many services through its many APIs which we can access from Java using their official SDK. Until recently though, this SDK didn’t offer support for reactive operations and had only limited support for asynchronous access.
AWS通过它的许多API提供了许多服务,我们可以使用他们的官方SDK从Java中访问。但直到最近,这个SDK还没有提供对反应式操作的支持,对异步访问的支持也很有限。
With the release of the AWS SDK for Java 2.0, we can now use those APIs in fully non-blocking I/O mode, thanks to its adopting the Reactive Streams standard.
随着AWS SDK for Java 2.0的发布,我们现在可以在完全非阻塞的I/O模式下使用这些API,这要归功于其采用的Reactive Streams标准。
In this tutorial, we’ll explore those new features by implementing a simple blob store REST API in Spring Boot that uses the well-known S3 service as its storage backend.
在本教程中,我们将通过在Spring Boot中实现一个简单的blob存储REST API来探索这些新功能,该API使用著名的S3服务作为其存储后端。
2. Overview of AWS S3 Operations
2.AWS S3操作概述
Before diving into the implementation, let’s do a quick overview of what we want to achieve here. A typical blob store service exposes CRUD operations that a front-end application consumes to allow an end-user to upload, list, download and delete several types of content, such as audio, pictures, and documents.
在深入实施之前,让我们对我们想在这里实现的目标做一个简单的概述。典型的blob存储服务暴露了CRUD操作,前端应用通过这些操作允许终端用户上传、列出、下载和删除几种类型的内容,如音频、图片和文档。
A common issue that traditional implementations must deal with is how to efficiently handle large files or slow connections. In early versions (pre-servlet 3.0), all the JavaEE spec had to offer was a blocking API, so we needed a thread for each concurrent blob store client. This model has the drawback that requires more server resources (ergo, bigger machines) and turns them more vulnerable to DoS-type attacks:
传统实现必须处理的一个常见问题是如何有效地处理大文件或慢速连接。在早期版本中(Servlet 3.0之前),JavaEE规范所能提供的只是一个阻塞式API,因此我们需要为每个并发的blob存储客户端提供一个线程。这种模式的缺点是需要更多的服务器资源(也就是更大的机器),并使其更容易受到DoS类型的攻击。
By using a reactive stack, we can make our service much less resource-intensive for the same number of clients. The reactor implementation uses a small number of threads that are dispatched in response to I/O completion events, such as the availability of new data to read or the completion of a previous write.
通过使用反应堆,我们可以使我们的服务在相同数量的客户中的资源密集度大大降低。反应器的实现使用了少量的线程,这些线程在响应I/O完成事件时被分派,例如有新的数据可供读或完成之前的写。
This means that the same thread keeps going on handling those events – which can originate from any of the active client connections – until there is no more available work to do. This approach greatly reduces the number of context switches – a quite expensive operation – and allows for very efficient use of the available resources:
这意味着同一个线程一直在处理这些事件–这些事件可能来自任何一个活跃的客户端连接–直到没有更多的工作可做。这种方法大大减少了上下文切换的次数–这是一个相当昂贵的操作–并允许非常有效地利用可用资源。
3. Project Setup
3.项目设置
Our demo project is a standard Spring Boot WebFlux application which includes the usual support dependencies, such as Lombok and JUnit.
我们的演示项目是一个标准的Spring Boot WebFlux应用程序,其中包括通常的支持依赖项,如Lombok和JUnit。
In addition to those libraries, we need to bring in the AWS SDK for Java V2 dependencies:
除了这些库之外,我们还需要引入AWS SDK for Java V2的依赖性。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.10.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<artifactId>netty-nio-client</artifactId>
<groupId>software.amazon.awssdk</groupId>
<scope>compile</scope>
</dependency>
</dependencies>
The AWS SDK provides a BOM defining the required versions for all dependencies, so we don’t need to specify them in the dependencies section of our POM file.
AWS SDK 提供了一个BOM,定义了所有依赖的必要版本,因此我们不需要在POM文件的依赖部分指定它们。
We’ve added the S3 client library, which will bring along other core dependencies from the SDK. We also need the Netty client library, required since we’ll be using asynchronous APIs to interact with AWS.
我们已经添加了S3客户端库,它将带来SDK的其他核心依赖项。我们还需要Netty客户端库,因为我们将使用异步API与AWS进行交互。
The official AWS documentation contains more details on the available transports.
官方的AWS文档包含关于可用运输工具的更多细节。
4. AWS S3 Client Creation
4.创建AWS S3客户端
The entry point for S3 operations is the S3AsyncClient class, which we’ll use to start new API calls.
S3操作的入口是S3AsyncClient类,我们将用它来启动新的API调用。
As we only need a single instance of this class, let’s create a @Configuration class with a @Bean method that builds it, so we can inject it wherever we need it:
由于我们只需要这个类的一个实例,让我们创建一个@Configuration类,用@Bean方法来构建它,这样我们就可以在我们需要的地方注入它。
@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
@Bean
public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props,
AwsCredentialsProvider credentialsProvider) {
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ZERO)
.maxConcurrency(64)
.build();
S3Configuration serviceConfiguration = S3Configuration.builder()
.checksumValidationEnabled(false)
.chunkedEncodingEnabled(true)
.build();
S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
.region(s3props.getRegion())
.credentialsProvider(credentialsProvider)
.serviceConfiguration(serviceConfiguration);
if (s3props.getEndpoint() != null) {
b = b.endpointOverride(s3props.getEndpoint());
}
return b.build();
}
}
For this demo, we’re using a minimal @ConfigurationProperties class (available at our repository) that holds the following pieces of information required to access S3 services:
在这个演示中,我们使用一个最小的@ConfigurationProperties类(可在我们的资源库中找到),它持有访问S3服务所需的以下信息。
- region: A valid AWS region identifier, such as us-east-1
- accessKeyId/secretAccessKey: Our AWS API key and identifier
- endpoint: An optional URI that we can use to override S3’s default service endpoint. The main use case is to use the demo code with alternative storage solutions that offer an S3-compatible API (minio and localstack are examples)
- bucket: Name of the bucket where we’ll store uploaded files
There are a few points worth mentioning about the client’s initialization code. First, we’re disabling write timeouts and increasing the maximum concurrency, so uploads can complete even under low-bandwidth situations.
关于客户端的初始化代码,有几点是值得一提的。首先,我们禁用了写超时,并增加了最大并发量,所以即使在低带宽的情况下也能完成上传。
Second, we’re disabling checksum validation and enabling chunked encoding. We’re doing this because we want to start uploading data to the bucket as soon as the data arrives at our service in a streaming fashion.
第二,我们要禁用校验和验证,并启用分块编码。我们这样做是因为我们想在数据到达我们的服务时,立即开始将数据上传到桶中,以流的方式进行。
Finally, we’re not addressing the bucket creation itself, as we’re assuming it’s been already created and configured by an administrator.
最后,我们不讨论桶的创建本身,因为我们假设它已经被管理员创建和配置了。
As for the credentials, we supply a customized AwsCredentialsProvider that can recover the credentials from Spring properties. This opens the possibility to inject those values through Spring’s Environment abstraction and all its supported PropertySource implementations, such as Vault or Config Server:
至于证书,我们提供了一个定制的AwsCredentialsProvider ,它可以从Spring属性中恢复证书。这为通过Spring的Environment抽象及其所有支持的PropertySource实现(如Vault或Config Server)注入这些值提供了可能。
@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
if (StringUtils.isBlank(s3props.getAccessKeyId())) {
return DefaultCredentialsProvider.create();
} else {
return () -> {
return AwsBasicCredentials.create(
s3props.getAccessKeyId(),
s3props.getSecretAccessKey());
};
}
}
5. Upload Service Overview
5.上传服务概述
We’ll now implement an upload service, which we’ll be reachable at the /inbox path.
我们现在要实现一个上传服务,我们将在/inbox路径上可以到达。
A POST to this resource path will store the file at our S3 bucket under a randomly generated key. We’ll store the original filename as a metadata key, so we can use it to generate the appropriate HTTP download headers for browsers.
对该资源路径的POST将在我们的S3桶中以随机生成的密钥存储该文件。我们将存储原始文件名作为元数据密钥,所以我们可以用它来为浏览器生成适当的HTTP下载头。
We need to handle two distinct scenarios: simple and multi-part uploads. Let’s go ahead and create a @RestController and add methods to handle those scenarios:
我们需要处理两种不同的情况:简单的和多部分的上传。让我们继续创建一个@RestController并添加方法来处理这些场景。
@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
private final S3AsyncClient s3client;
private final S3ClientConfigurarionProperties s3config;
public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
this.s3client = s3client;
this.s3config = s3config;
}
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... see section 6
}
@RequestMapping(
consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
method = {RequestMethod.POST, RequestMethod.PUT})
public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<Part> parts ) {
// ... see section 7
}
}
Handler signatures reflect the main difference between both cases: In the simple case, the body contains the file content itself, whereas in the multipart case it can have multiple “parts”, each corresponding to a file or form data.
处理程序签名反映了两种情况的主要区别。在简单情况下,主体包含文件内容本身,而在多部分情况下,它可以有多个 “部分”,每个部分对应一个文件或表单数据。
As a convenience, we’ll support multipart uploads using POST or PUT methods. The reason for this is that some tools (cURL, notably) use the latter by default when uploading files with the -F option.
为了方便起见,我们将支持使用POST或PUT方法进行多部分上传。这样做的原因是,一些工具(尤其是cURL)在使用-F选项上传文件时默认使用后者。
In both cases, we’ll return an UploadResult containing the result of the operation and the generated file keys that a client should use to recover the original files – more on this later!
在这两种情况下,我们将返回一个UploadResult,其中包含操作的结果和生成的文件密钥,客户端应该使用这些密钥来恢复原始文件–稍后会有更多的内容。
6. Single File Upload
6.单一文件上传
In this case, clients send content in a simple POST operation with the request body containing raw data. To receive this content in a Reactive Web application, all we have to do is to declare a @PostMapping method that takes a Flux<ByteBuffer> argument.
在这种情况下,客户在一个简单的POST操作中发送内容,请求体包含原始数据。为了在Reactive Web应用程序中接收这些内容,我们所要做的就是声明一个@PostMapping方法,该方法接受一个Flux<ByteBuffer> 参数。
Streaming this flux to a new S3 file is straightforward in this case.
在这种情况下,将这种流量流向一个新的S3文件是很直接的。
All we need is to build a PutObjectRequest with a generated key, file length, MIME content-type and pass it to the putObject() method in our S3 client:
我们所需要的是建立一个PutObjectRequest,其中包含生成的密钥、文件长度、MIME内容类型,并将其传递给我们S3客户端的putObject()方法。
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... some validation code omitted
String fileKey = UUID.randomUUID().toString();
MediaType mediaType = headers.getContentType();
if (mediaType == null) {
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(mediaType.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));
return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}
The key point here is how we’re passing the incoming Flux to the putObject() method.
这里的关键点是我们如何将传入的Flux传递给putObject() 方法。
This method expects an AsyncRequestBody object that provides content on demand. Basically, it’s a regular Publisher with some extra convenience methods. In our case, we’ll take benefit from the fromPublisher() method to convert our Flux into the required type.
这个方法期望一个AsyncRequestBody对象,按需提供内容。基本上,它是一个普通的Publisher,有一些额外的便利方法。在我们的案例中,我们将从fromPublisher()方法中获益,将我们的Flux转换成所需的类型。
Also, we assume that the client will send the Content-Length HTTP header with the correct value. Without this information, the call will fail since this is a required field.
另外,我们假设客户端将发送Content-Length HTTP头的正确值。如果没有这个信息,调用将失败,因为这是一个必填字段。
Asynchronous methods in the SDK V2 always return a CompletableFuture object. We take it and adapt it to a Mono using its fromFuture() factory method. This gets mapped to the final UploadResult object.
SDK V2中的异步方法总是返回一个CompletableFuture对象。我们使用其fromFuture()工厂方法,将其调整为Mono。这将被映射到最终的UploadResult对象。
7. Uploading Multiple Files
7.上传多个文件
Handling a multipart/form-data upload may seem easy, especially when using libraries that handle all details for us. So, can we simply use the previous method for each uploaded file? Well, yes, but this comes with a price: Buffering.
处理一个multipart/form-data上传可能看起来很容易,特别是当使用为我们处理所有细节的库。那么,我们可以简单地对每个上传的文件使用之前的方法吗?嗯,可以,但这是有代价的。缓冲。
To use the previous method, we need the part’s length, but chunked file transfers do not always include this information. One approach is to store the part in a temporary file and then send it to AWS, but this will slow down the total upload time. It also means extra storage for our servers.
要使用前面的方法,我们需要零件的长度,但分块文件传输并不总是包括这一信息。一种方法是将该部分存储在一个临时文件中,然后将其发送到AWS,但这将减慢总的上传时间。这也意味着我们的服务器需要额外的存储。
As an alternative, here we’ll use an AWS multipart upload. This feature allows the upload of a single file to be split in multiple chunks that we can send in parallel and out of order.
作为一个替代方案,这里我们将使用AWS多部分上传。该功能允许将单个文件的上传分割成多个块,我们可以平行地、不按顺序地发送。
The steps are as follows, we need to send:
步骤如下,我们需要发送。
- the createMultipartUpload request – AWS responds with an uploadId that we’ll use in the next calls
- file chunks containing the uploadId, sequence number and content – AWS responds with an ETag identifier for each part
- a completeUpload request containing the uploadId and all ETags received
Please note: We’ll repeat those steps for each received FilePart!
请注意:我们将为每个收到的文件部分重复这些步骤!。
7.1. Top-Level Pipeline
7.1.顶层管道
The multipartUploadHandler in our @Controller class is responsible for handling, not surprisingly, multipart file uploads. In this context, each part can have any kind of data, identified by its MIME-type. The Reactive Web framework delivers those parts to our handler as a Flux of objects that implement the Part interface, which we’ll process in turn:
我们的@Controller类中的multipartUploadHandler负责处理多部分文件上传,这并不奇怪。在这种情况下,每个部分可以有任何类型的数据,由其MIME类型来识别。Reactive Web框架将这些部分作为实现Part接口的对象的Flux传递给我们的处理程序,我们将依次处理。
return parts
.ofType(FilePart.class)
.flatMap((part)-> saveFile(headers, part))
.collect(Collectors.toList())
.map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));
This pipeline starts by filtering parts that correspond to an actual uploaded file, which will always be an object that implements the FilePart interface. Each part is then passed to the saveFile method, which handles the actual upload for a single file and returns the generated file key.
该管道首先过滤与实际上传的文件相对应的部分,该部分将始终是一个实现FilePart接口的对象。然后,每个部分被传递给saveFile方法,该方法处理单个文件的实际上传并返回生成的文件密钥。
We collect all keys in a List and, finally, build the final UploadResult. We’re always creating a new resource, so we’ll return a more descriptive CREATED status (202) instead of a regular OK.
我们在List中收集所有键,最后建立最终的UploadResult。我们总是在创建一个新的资源,所以我们将返回一个更具描述性的CREATED状态(202),而不是一个常规的OK.。
7.2. Handling a Single File Upload
7.2.处理单个文件的上传
We’ve already outlined the steps required to upload a file using AWS’s multipart method. There’s a catch, though: The S3 service requires that each part, except the last one, must have a given minimum size – 5 MBytes, currently.
我们已经概述了使用AWS的多部分方法上传文件的步骤。不过,有一个问题。S3服务要求每个部分,除了最后一个,必须有一个给定的最小尺寸–目前是5MB字节。
This means that we can’t just take the received chunks and send them right away. Instead, we need to buffer them locally until we reach the minimum size or end of data. Since we also need a place to track how many parts we’ve sent and the resulting CompletedPart results, we’ll create a simple UploadState inner class to hold this state:
这意味着我们不能只是把收到的数据块立即发送。相反,我们需要在本地缓冲它们,直到我们达到最小尺寸或数据结束。由于我们还需要一个地方来跟踪我们已经发送了多少部分以及由此产生的CompletedPart结果,我们将创建一个简单的UploadState内类来保存这个状态。
class UploadState {
String bucket;
String filekey;
String uploadId;
int partCounter;
Map<Integer, CompletedPart> completedParts = new HashMap<>();
int buffered = 0;
// ... getters/setters omitted
UploadState(String bucket, String filekey) {
this.bucket = bucket;
this.filekey = filekey;
}
}
Given the required steps and buffering, we end up with implementation may look a bit intimidating at first glance:
考虑到所需的步骤和缓冲,我们最终的实施方案乍看之下可能有点吓人。
Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
String filekey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
String filename = part.filename();
if ( filename == null ) {
filename = filekey;
}
metadata.put("filename", filename);
MediaType mt = part.headers().getContentType();
if ( mt == null ) {
mt = MediaType.APPLICATION_OCTET_STREAM;
}
UploadState uploadState = new UploadState(bucket,filekey);
CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
.createMultipartUpload(CreateMultipartUploadRequest.builder()
.contentType(mt.toString())
.key(filekey)
.metadata(metadata)
.bucket(bucket)
.build());
return Mono
.fromFuture(uploadRequest)
.flatMapMany((response) -> {
checkResult(response);
uploadState.uploadId = response.uploadId();
return part.content();
})
.bufferUntil((buffer) -> {
uploadState.buffered += buffer.readableByteCount();
if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
uploadState.buffered = 0;
return true;
} else {
return false;
}
})
.map((buffers) -> concatBuffers(buffers))
.flatMap((buffer) -> uploadPart(uploadState,buffer))
.reduce(uploadState,(state,completedPart) -> {
state.completedParts.put(completedPart.partNumber(), completedPart);
return state;
})
.flatMap((state) -> completeUpload(state))
.map((response) -> {
checkResult(response);
return uploadState.filekey;
});
}
We start by collecting some file metadata and using it to create a request object required by the createMultipartUpload() API call. This call returns a CompletableFuture, which is the starting point for our streaming pipeline.
我们从收集一些文件元数据开始,用它来创建createMultipartUpload() API调用所需的请求对象。该调用返回一个CompletableFuture,这是我们流媒体管道的起点。
Let’s review what each step of this pipeline does:
让我们回顾一下这条管道的每一步是做什么的。
- After receiving the initial result, which contains the S3’s generated uploadId, we save it in the upload state object and start streaming the file’s body. Notice the use of flatMapMany here, which turns the Mono into a Flux
- We use bufferUntil() to accumulate the required number of bytes. The pipeline at this point changes from a Flux of DataBuffer objects into a Flux of List<DataBuffer> objects
- Convert each List<DataBuffer> to a ByteBuffer
- Send the ByteBuffer to S3 (see next section) and return the resulting CompletedPart value downstream
- Reduce the resulting CompletedPart values into the uploadState
- Signals S3 that we’ve completed the upload (more on this later)
- Return the generated file key
7.3. Uploading File Parts
7.3.上传文件部件
Once again, let’s make clear that a “file part” here means a piece of a single file (for example, the first 5MB of a 100MB file), not a part of a message that happens to be a file, as it is in the top-level stream!
我们再一次明确,这里的 “文件部分 “是指单个文件的一部分(例如,一个100MB的文件的前5MB),而不是一个恰好是文件的消息的一部分,因为它在顶层流中是这样的!。
The file upload pipeline calls the uploadPart() method with two arguments: the upload state and a ByteBuffer. From there, we build a UploadPartRequest instance and use the uploadPart() method available in our S3AsyncClient to send the data:
文件上传管道以两个参数调用uploadPart()方法:上传状态和ByteBuffer。由此,我们建立一个UploadPartRequest实例,并使用uploadPart()方法来发送数据,该方法在我们的S3AsyncClient中可用。
private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
final int partNumber = ++uploadState.partCounter;
CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
.bucket(uploadState.bucket)
.key(uploadState.filekey)
.partNumber(partNumber)
.uploadId(uploadState.uploadId)
.contentLength((long) buffer.capacity())
.build(),
AsyncRequestBody.fromPublisher(Mono.just(buffer)));
return Mono
.fromFuture(request)
.map((uploadPartResult) -> {
checkResult(uploadPartResult);
return CompletedPart.builder()
.eTag(uploadPartResult.eTag())
.partNumber(partNumber)
.build();
});
}
Here, we use the return value from the uploadPart() request to build a CompletedPart instance. This is an AWS SDK type that we’ll need later when building the final request that closes the upload.
在这里,我们使用uploadPart()请求的返回值来建立一个CompletedPart实例。这是一个AWS SDK类型,我们稍后在构建关闭上传的最终请求时将会用到。
7.4. Completing the Upload
7.4.完成上传
Last, but not least, we need to finish the multi-part file upload by sending a completeMultipartUpload() request to S3. This is quite easy given that the upload pipeline passes all the information we need as arguments:
最后,但并非最不重要,我们需要通过向S3发送completeMultipartUpload()请求来完成多部分文件的上传。这很容易,因为上传管道将所有我们需要的信息作为参数传递。
private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
.parts(state.completedParts.values())
.build();
return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(state.bucket)
.uploadId(state.uploadId)
.multipartUpload(multipartUpload)
.key(state.filekey)
.build()));
}
8. Downloading Files from AWS
8.从AWS下载文件
Compared to multi-part uploads, downloading objects from an S3 bucket is a much simpler task. In this case, we don’t have to worry about chunks or anything like that. The SDK API provides the getObject() method that takes two arguments:
与多部分上传相比,从S3桶下载对象是一项更简单的任务。在这种情况下,我们不必担心块或任何类似的问题。SDK的API提供了getObject()方法,它需要两个参数。
- A GetObjectRequest object containing the requested bucket and file key
- An AsyncResponseTransformer, which allows us to map an incoming streaming response to something else
The SDK provides a couple of implementations of the latter that make it possible to adapt the stream to a Flux, but, again, at a cost: they buffer data internally in an array buffer. As this buffering results in poor response time for a client of our demo service, we’ll implement our own adapter – which is not a big deal, as we’ll see.
SDK提供了一些后者的实现,使其有可能将流适应于Flux,但是,同样地,有一个代价:它们在数组缓冲区内缓冲数据。由于这种缓冲导致我们的演示服务的客户端的响应时间很差,我们将实现我们自己的适配器–这并不是什么大问题,我们将看到。
8.1. Download Controller
8.1.下载控制器
Our download controller is a standard Spring Reactive @RestController, with a single @GetMapping method that handles download requests. We expect the file key through a @PathVariable argument and we’ll return an asynchronous ResponseEntity with the file’s content:
我们的下载控制器是一个标准的Spring Reactive @RestController,有一个处理下载请求的@GetMapping方法。我们通过@PathVariable参数来期待文件密钥,我们将返回一个带有文件内容的异步ResponseEntity。
@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3config.getBucket())
.key(filekey)
.build();
return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
.map(response -> {
checkResult(response.response());
String filename = getMetadataItem(response.response(),"filename",filekey);
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, response.response().contentType())
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response().contentLength()))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
.body(Flux.from(response));
});
}
Here, getMetadataItem() is just a helper method that looks up a given metadata key in the response in a case-insensitive way.
这里,getMetadataItem()只是一个辅助方法,它以不区分大小写的方式在响应中查找一个给定的元数据键。
This is an important detail: S3 returns metadata information using special HTTP headers, but those headers are case-insensitive (see RFC 7230, section 3.2). This means that implementations may change the case for a given item at will – and this actually happens when using MinIO.
这是一个重要的细节。S3使用特殊的HTTP头来返回元数据信息,但是这些头是不区分大小写的(见RFC 7230,第3.2节)。这意味着实现可以随意改变某个项目的大小写 – 在使用MinIO时,实际上会发生这种情况。
9. Conclusion
9.结语
In this tutorial, we’ve covered the basics of using the reactive extensions available in the AWS SDK V2 library. Our focus here was the AWS S3 service, but we can extend the same techniques to other reactive-enabled services, such as DynamoDB.
在本教程中,我们已经介绍了使用AWS SDK V2库中的反应式扩展的基本知识。我们在这里的重点是AWS S3服务,但我们可以将同样的技术扩展到其他支持反应式的服务,如DynamoDB。
As usual, all code is available over on GitHub.
像往常一样,所有的代码都可以在GitHub上找到。