Paging and Async Calls with the Kubernetes API – 用Kubernetes API进行分页和异步调用

最后修改: 2021年 3月 22日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we continue to explore the Kubernetes API for Java. This time, we’ll focus on two of its features: paging and asynchronous calls.

在本教程中,我们将继续探索Kubernetes API for Java。这一次,我们将重点讨论其中的两个功能。分页和异步调用

2. Paging

2.寻呼

In a nutshell, paging allows us to iterate over a large result set in chunks, a.k.a pages – hence the name of this method. In the context of the Kubernetes Java API, this feature is available in all methods that return a list of resources. Those methods always include two optional parameters that we can use to iterate over the results:

简而言之,分页允许我们在大块的结果集上进行迭代,又称页面,因此该方法的名称为分页。在Kubernetes Java API的上下文中,所有返回资源列表的方法都可以使用这一功能。这些方法总是包括两个可选的参数,我们可以用它们来遍历结果。

  • limit: maximum number of items returned in a single API call
  • continue: A continuation token that tells the server the starting point for the returned result set

Using those parameters, we can iterate over an arbitrary number of items without putting too much pressure on the server. Even better, the amount of memory required on the client-side to hold the results is also bounded.

使用这些参数,我们可以对任意数量的项目进行迭代,而不会给服务器带来太大的压力。甚至更好的是,客户端为保存结果所需的内存量也是有限度的。

Now, let’s see how to use those parameters to get a list of all available pods in a cluster using this method:

现在,让我们看看如何使用这些参数来获得一个集群中所有可用的pod的列表,使用这个方法。

ApiClient client = Config.defaultClient();
CoreV1Api api = new CoreV1Api(client);
String continuationToken = null;
do {
    V1PodList items = api.listPodForAllNamespaces(
      null,
      continuationToken, 
      null,
      null, 
      2, 
      null, 
      null,
      null,
      10,
      false);
    continuationToken = items.getMetadata().getContinue();
    items.getItems()
      .stream()
      .forEach((node) -> System.out.println(node.getMetadata()));
} while (continuationToken != null);

Here, the second parameter to the listPodForAllNamespaces() API call contains the continuation token, and the fifth is the limit parameter. While the limit is usually just a fixed value, continue requires a little extra effort.

在这里,listPodForAllNamespaces() API 调用的第二个参数包含延续令牌,第五个是limit参数。虽然limit通常只是一个固定值,但continue需要一点额外的努力。

For the first call, we send a null value, signaling the server that this is the first call of paged request sequence. Upon receiving the response, we get the new value for the next to continue value to use from the corresponding list metadata field.

对于第一次调用,我们发送一个null值,向服务器发出信号,这是分页请求序列的第一次调用。收到响应后,我们从相应的列表元数据字段中获得下一个continue值的新值。

This value will be null when there are no more results available, so we use this fact to define the exit condition for the iteration loop.

当没有更多的结果可用时,这个值将是null,所以我们用这个事实来定义迭代循环的退出条件。

2.1. Pagination Gotchas

2.1.分页的问题

The paging mechanism is quite straightforward, but there are a few details we must keep in mind:

分页机制是非常直接的,但有几个细节我们必须记住。

  • Currently, the API does not support server-side sorting. Given the current lack of storage-level support for sorting, this is unlikely to change anytime soon
  • All call parameters, except for continue, must be the same between calls
  • The continue value must be treated as an opaque handle. We should never make any assumptions about its value
  • Iteration is one-way. We cannot go back in the result set using a previously received continue token
  • Even though the returned list metadata contains a remainingItemCount field, its value is neither reliable nor supported by all implementations

2.2. List Data Consistency

2.2.列表数据的连贯性

Since a Kubernetes cluster is a very dynamic environment, there’s a possibility that the result set associated with a paginated call sequence gets modified while being read by the client. How does the Kubernetes API behave in this case?

由于Kubernetes集群是一个非常动态的环境,有可能与分页调用序列相关的结果集在被客户端读取时被修改。在这种情况下,Kubernetes的API是如何表现的?

As explained in Kubernetes documentation, list APIs support a resourceVersion parameter which, together with resourceVersionMatch, define how a particular version is selected for inclusion. However, for the paged result set case, the behavior is always the same: “Continue Token, Exact”.

正如Kubernetes文档中所解释的那样,列表API支持一个resourceVersion参数,该参数与resourceVersionMatch一起,定义了如何选择特定的版本进行收录。然而,对于分页结果集的情况,其行为始终是相同的:”继续标记,精确”。

This means that the returned resource versions corresponded to those available when the paginated list call started. While this approach provides consistency, it will not include results modified afterward. For instance, by the time we finish iterating over all pods in a large cluster, some of them may already have terminated.

这意味着返回的资源版本对应于分页列表调用开始时的可用版本。虽然这种方法提供了一致性,但它不会包括事后修改的结果。例如,当我们完成对一个大型集群中的所有pod的迭代时,其中一些可能已经终止了。

3. Async Calls

3.异步调用

So far, we’ve used the Kubernetes API in a synchronous way, which is fine for simple programs but not very efficient from a resource usage viewpoint, as it blocks the calling thread until we receive a response from the cluster and process it. This behavior will hurt the responsiveness of an application badly if, for instance, we start to make those calls in a GUI thread.

到目前为止,我们以同步的方式使用Kubernetes API,这对简单的程序来说没有问题,但从资源使用的角度来看,效率并不高,因为它阻塞了调用线程,直到我们收到来自集群的响应并处理它。例如,如果我们开始在GUI线程中进行这些调用,这种行为将严重损害应用程序的响应性。

Fortunately, the library supports an asynchronous mode based on callbacks, which returns the control to the caller immediately.

幸运的是,该库支持基于回调的异步模式,可以立即将控制权返回给调用者

Inspecting the CoreV1Api class, we’ll notice that, for each synchronous xxx() method, there’s also a xxxAsync() variant. For example, the async method for listPodForAllNamespaces() is listPodForAllNamespacesAsync(). The arguments are the same, with the addition of an extra parameter for the callback implementation.

检查CoreV1Api类,我们会注意到,对于每个同步的xxx()方法,也有一个xxxAsync() 变量。例如,listPodForAllNamespaces() 的异步方法是listPodForAllNamespacesAsync()。参数是一样的,只是增加了一个用于回调实现的额外参数。

3.1. Callback Details

3.1.回调细节

The callback parameter object must implement the generic interface ApiCallback<T>, which contains just four methods:

回调参数对象必须实现通用接口ApiCallback<T>,其中只包含四个方法。

  • onSuccess: Called if and only if the call succeeded. The first argument type is the same that would be returned by the synchronous version
  • onFailure: Called there was an error calling the server or the reply contains an error code
  • onUploadProgress: Called during an upload. We can use this callback to provide feedback to a user during a lengthy operation
  • onDownloadProgress: Same as onUploadProgress, but for downloads

Async calls also don’t return a regular result. Instead, they return an OkHttp’s (the underlying REST client used by the Kubernetes API) Call instance, which works as a handle to the undergoing call. We can use this object to poll the completion state or, if we want, cancel it before completion.

异步调用也不会返回一个常规的结果。相反,它们会返回一个OkHttp的(Kubernetes API使用的底层REST客户端)Call实例,该实例可作为进行中调用的句柄。我们可以使用这个对象来轮询完成状态,或者,如果我们愿意,在完成之前取消它。

3.2. Async Call Example

3.2.异步调用实例

As we can imagine, implementing callbacks everywhere requires a lot of boilerplate code. To avoid this, we’ll use an invocation helper that simplifies this task a bit:

我们可以想象,到处实现回调需要大量的模板代码。为了避免这一点,我们将使用一个invocation helper,将这项任务简化一些。

// Start async call
CompletableFuture<V1NodeList> p = AsyncHelper.doAsync(api,(capi,cb) ->
  capi.listNodeAsync(null, null, null, null, null, null, null, null, 10, false, cb)
);
p.thenAcceptAsync((nodeList) -> {
    nodeList.getItems()
      .stream()
      .forEach((node) -> System.out.println(node.getMetadata()));
});
// ... do something useful while we wait for results

Here, the helper wraps the asynchronous call invocation and adapts it to a more standard CompletableFuture. This allows us to use it with other libraries, such as those from the Reactor Project. In this example, we’ve added a completion stage that prints all metadata to the standard output.

在这里,帮助器包装了异步调用调用,并将其调整为更标准的CompletableFuture。这使我们能够与其他库一起使用它,例如来自Reactor项目的库。在这个例子中,我们添加了一个完成阶段,将所有元数据打印到标准输出。

As usual, when dealing with futures, we must be aware of concurrency issues that may arise. The online version of this code contains some debugging logs that clearly show that, even for this simple code, at least three threads were used:

像往常一样,在处理期货时,我们必须注意可能出现的并发性问题。这段代码的在线版本包含一些调试日志,清楚地表明,即使是这段简单的代码,至少也使用了三个线程。

  • The main thread, which kicks the async call
  • OkHttp’s threads used to make the actual HTTP call
  • The completion thread, where the results are processed

4. Conclusion

4.总结

In this article, we have seen how to use paging and asynchronous calls with the Kubernetes Java API.

在这篇文章中,我们已经看到如何使用Kubernetes Java API的分页和异步调用。

As usual, the full source code of the examples can be found over on GitHub.

像往常一样,这些例子的完整源代码可以在GitHub上找到