Multipart Upload on S3 with jclouds – 用jclouds在S3上进行多部分上传

最后修改: 2013年 4月 4日

中文/混合/英文(键盘快捷键:t)

1. Goal

1.目标

In the previous article on S3 uploading, we looked at how we can use the generic Blob APIs from jclouds to upload content to S3. In this article we will use the S3 specific asynchronous API from jclouds to upload content and leverage the multipart upload functionality provided by S3.

上一篇关于 S3 上传的文章中,我们探讨了如何使用 jclouds 的通用 Blob API 向 S3 上传内容。在本文中,我们将使用来自jclouds的S3特定异步API来上传内容,并利用多部分上传功能由S3提供的

2. Preparation

2.准备

2.1. Set up the Custom API

2.1.设置自定义API

The first part of the upload process is creating the jclouds API – this is a custom API for Amazon S3:

上传过程的第一部分是创建jclouds API – 这是一个为亚马逊S3定制的API。

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. Determining the Number of Parts for the Content

2.2.确定内容的部分数量

Amazon S3 has a 5 MB limit for each part to be uploaded. As such, the first thing we need to do is determine the right number of parts that we can split our content into so that we don’t have parts below this 5 MB limit:

亚马逊S3对上传的每个部分都有5MB的限制。因此,我们需要做的第一件事是确定我们可以将内容分成的正确数量,以便我们不会有低于5MB限制的部分。

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. Breaking the Content into Parts

2.3.将内容分成若干部分

Were going to break the byte array into a set number of parts:

我们要把字节数组分成固定数量的部分。

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

We’re going to test the logic of breaking the byte array into parts – we’re going to generate some bytes, split the byte array, recompose it back together using Guava and verify that we get back the original:

我们将测试将字节数组分解成若干部分的逻辑–我们将生成一些字节,分割字节数组,用Guava将其重新组合起来,并验证我们得到的是原始数据。

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

To generate the data, we simply use the support from Random:

为了生成数据,我们只需使用Random的支持。

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. Creating the Payloads

2.4.创建有效载荷

Now that we have determined the correct number of parts for our content and we managed to break the content into parts, we need to generate the Payload objects for the jclouds API:

现在我们已经为我们的内容确定了正确的部分数量,并且我们设法将内容分成了几个部分,我们需要为jclouds API生成Payload对象

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. Upload

3.上传

The upload process is a flexible multi-step process – this means:

上传过程是一个灵活的多步骤过程–这意味着。

  • the upload can be started before having all the data – data can be uploaded as it’s coming in
  • data is uploaded in chunks – if one of these operations fails, it can simply be retrieved
  • chunks can be uploaded in parallel – this can greatly increase the upload speed, especially in the case of large files

3.1. Initiating the Upload Operation

3.1.启动上传操作

The first step in the Upload operation is to initiate the process. This request to S3 must contain the standard HTTP headers – the ContentMD5 header in particular needs to be computed. Were going to use the Guava hash function support here:

上传操作的第一步是启动该过程。这个到S3的请求必须包含标准的HTTP头–特别是需要计算ContentMD5头。我们将在这里使用Guava的哈希函数支持。

Hashing.md5().hashBytes(byteArray).asBytes();

This is the md5 hash of the entire byte array, not of the parts yet.

这是整个字节数组的md5哈希值,还不是部分的。

To initiate the upload, and for all further interactions with S3, we’re going to use the AWSS3AsyncClient – the asynchronous API we created earlier:

为了启动上传,以及与S3的所有进一步互动,我们将使用AWSS3AsyncClient–我们之前创建的异步API。

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

The key is the handle assigned to the object – this needs to be a unique identifier specified by the client.

key是分配给对象的句柄–这需要是一个由客户指定的唯一标识。

Also notice that, even though we’re using the async version of the API, we’re blocking for the result of this operation – this is because we will need the result of the initialize to be able to move forward.

还要注意的是,尽管我们使用的是API的异步版本,我们还是在阻塞这个操作的结果–这是因为我们需要初始化的结果才能继续前进。

The result of the operation is an upload id returned by S3 – this will identify the upload throughout it’s lifecycle and will be present in all subsequent upload operations.

该操作的结果是由S3返回的upload id–这将在整个生命周期内识别该上传,并将出现在所有后续的上传操作中。

3.2. Uploading the Parts

3.2.上传零件

The next step is uploading the parts. Our goal here is to send these requests in parallel, as the upload parts operation represent the bulk of the upload process:

下一步是上传部件。我们在这里的目标是平行地发送这些请求,因为上传部件的操作代表了上传过程的大部分。

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

The part numbers need to be continuous but the order in which the requests are send is not relevant.

零件编号需要连续,但发送请求的顺序并不重要。

After all of the upload part requests have been submitted, we need to wait for their responses so that we can collect the individual ETag value of each part:

在所有的上传部分请求被提交后,我们需要等待它们的回应,以便我们能够收集每个部分的单独ETag值。

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

If, for whatever reason, one of the upload part operations fails, the operation can be retried until it succeeds. The logic above does not contain the retry mechanism, but building it in should be straightforward enough.

如果由于某种原因,其中一个上传部分操作失败,该操作可以重试直到成功。上面的逻辑并不包含重试机制,但将其纳入其中应该是很直接的。

3.3. Completing the Upload Operation

3.3.完成上传操作

The final step of the upload process is completing the multipart operation. The S3 API requires the responses from the previous parts upload as a Map, which we can now easily create from the list of ETags that we obtained above:

上传过程的最后一步是完成多部分操作。S3 API要求将前几部分上传的响应作为Map,现在我们可以从上面获得的ETags列表中轻松创建。

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

And finally, send the complete request:

最后,发送完整的请求。

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

This will return final ETag of the finished object and will complete the entire upload process.

这将返回完成对象的最终ETag,并完成整个上传过程。

4. Conclusion

4.结论

In this article we built a multipart enabled, fully parallel upload operation to S3, using the custom S3 jclouds API. This operation is ready to be used as is, but it can be improved in a few ways.

在这篇文章中,我们使用自定义的S3 jclouds API构建了一个支持多部分、完全并行的上传操作到S3。这个操作可以按原样使用,但它可以通过一些方式进行改进

First, retry logic should be added around the upload operations to better deal with failures.

首先,应围绕上传操作添加重试逻辑,以更好地处理失败。

Next, for really large files, even though the mechanism is sending all upload multipart requests in parallel, a throttling mechanism should still limit the number of parallel requests being sent. This is both to avoid bandwidth becoming a bottleneck as well as to make sure Amazon itself doesn’t flag the upload process as exceeding an allowed limit of requests per second – the Guava RateLimiter can potentially be very well suited for this.

接下来,对于真正的大文件,即使该机制是平行发送所有的上传多部分请求,节流机制仍应限制正在发送的平行请求的数量。这既是为了避免带宽成为瓶颈,也是为了确保亚马逊本身不会将上传过程标记为超过每秒允许的请求限制 – Guava RateLimiter有可能非常适用于此。