Streaming with gRPC in Java – 在Java中用gRPC进行流式传输

最后修改: 2021年 9月 23日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

gRPC is a platform to do inter-process Remote Procedure Calls (RPC). It follows a client-server model, is highly performant, and supports the most important computer languages. Check out our article  Introduction to gRPC for a good review.

gRPC是一个进行进程间远程过程调用(RPC)的平台。它遵循客户-服务器模型,具有很高的性能,并支持最重要的计算机语言。请查看我们的文章gRPC简介,以获得良好的评论。

In this tutorial, we’ll focus on gRPC streams. Streaming allows multiplex messages between servers and clients, creating very efficient and flexible inter-process communications.

在本教程中,我们将重点讨论gRPC流。流允许服务器和客户端之间的多路消息,创造非常高效和灵活的进程间通信

2. Basics of gRPC Streaming

2.gRPC流的基础知识

gRPC uses the HTTP/2 network protocol to do inter-service communications. One key advantage of HTTP/2 is that it supports streams. Each stream can multiplex multiple bidirectional messages sharing a single connection.

gRPC 使用HTTP/2网络协议进行服务间通信。 HTTP/2的一个关键优势是它支持流。每个流可以复用多个双向消息,共享一个连接。

In gRPC, we can have streaming with three functional call types:

在gRPC中,我们可以通过三种功能调用类型来实现流媒体。

  1. Server streaming RPC: The client sends a single request to the server and gets back several messages that it reads sequentially.
  2. Client streaming RPC: The client sends a sequence of messages to the server. The client waits for the server to process the messages and reads the returned response.
  3. Bidirectional streaming RPC: The client and server can send multiple messages back and forth. The messages are received in the same order that they were sent. However, the server or client can respond to the received messages in the order that they choose.

To demonstrate how to use these procedural calls, we’ll write a simple client-server application example that exchanges information on stock securities.

为了演示如何使用这些程序性调用,我们将编写一个简单的客户-服务器应用实例,交换股票证券的信息。

3. Service Definition

3.服务定义

We use stock_quote.proto to define the service interface and the structure of the payload messages:

我们使用stock_quote.proto来定义服务接口和有效载荷消息的结构。

service StockQuoteProvider {
  
  rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}

  rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
  
  rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
   string ticker_symbol = 1;
   string company_name = 2;
   string description = 3;
}
message StockQuote {
   double price = 1;
   int32 offer_number = 2;
   string description = 3;
}

The StockQuoteProvider service has three method types that support message streaming. In the next section, we’ll cover their implementations.

StockQuoteProvider服务有三种方法类型支持消息流。在下一节中,我们将介绍它们的实现。

We see from the service’s method signatures that the client queries the server by sending Stock messages. The server sends the response back using StockQuote messages.

我们从该服务的方法签名中看到,客户端通过发送Stock消息来查询服务器。服务器使用StockQuote消息发送响应。

We use the protobuf-maven-plugin defined in the pom.xml file to generate the Java code from the stock-quote.proto IDL file.

我们使用protobuf-maven-plugin定义的pom.xml文件,从stock-quote.proto IDL文件生成Java代码

The plugin generates client-side stubs and server-side code in the target/generated-sources/protobuf/java and /grpc-java directories.

该插件在target/generated-sources/protobuf/java/grpc-java目录下生成客户端存根和服务器端代码。

We’re going to leverage the generated code to implement our server and client.

我们将利用生成的代码来实现我们的服务器和客户端

4. Server Implementation

4.服务器实施

The StockServer constructor uses the gRPC Server to listen to and dispatch incoming requests:

StockServer构造函数使用gRPC Server来监听和分配传入的请求。

public class StockServer {
    private int port;
    private io.grpc.Server server;

    public StockServer(int port) throws IOException {
        this.port = port;
        server = ServerBuilder.forPort(port)
          .addService(new StockService())
          .build();
    }
    //...
}

We add StockService to the io.grpc.Server. StockService extends StockQuoteProviderImplBase, which the protobuf plugin generated from our proto file. Therefore, StockQuoteProviderImplBase has stubs for the three streaming service methods.

我们将StockService添加到io.grpc.ServerStockService扩展了StockQuoteProviderImplBaseprotobuf插件从我们的proto文件生成。因此,StockQuoteProviderImplBase有三个流媒体服务方法的存根

StockService needs to override these stub methods to do the actual implementation of our service.

StockService需要覆盖这些存根方法来完成我们服务的实际实现

Next, we’re going to see how this is done for the three streaming cases.

接下来,我们要看看这三个流媒体案例是如何做到的。

4.1. Server-Side Streaming

4.1.服务器端流媒体

The client sends a single request for a quote and gets back several responses, each with different prices offered for the commodity:

客户发送一个报价请求,并得到几个回复,每个回复都有不同的商品报价

@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
    for (int i = 1; i <= 5; i++) {
        StockQuote stockQuote = StockQuote.newBuilder()
          .setPrice(fetchStockPriceBid(request))
          .setOfferNumber(i)
          .setDescription("Price for stock:" + request.getTickerSymbol())
          .build();
        responseObserver.onNext(stockQuote);
    }
    responseObserver.onCompleted();
}

The method creates a StockQuote, fetches the prices, and marks the offer number. For each offer, it sends a message to the client invoking responseObserver::onNext. It uses reponseObserver::onCompleted to signal that it’s done with the RPC.

该方法创建一个StockQuote,获取价格,并标记报价编号。对于每个报价,它向客户端发送一条消息,调用responseObserver::onNext。它使用reponseObserver::onCompleted来表示它已经完成了RPC。

4.2. Client-Side Streaming

4.2.客户端流媒体

The client sends multiple stocks and the server returns back a single StockQuote:

客户发送多个股票,服务器返回一个单一的股票报价

@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        int count;
        double price = 0.0;
        StringBuffer sb = new StringBuffer();

        @Override
        public void onNext(Stock stock) {
            count++;
            price = +fetchStockPriceBid(stock);
            sb.append(":")
                .append(stock.getTickerSymbol());
        }

        @Override
        public void onCompleted() {
            responseObserver.onNext(StockQuote.newBuilder()
                .setPrice(price / count)
                .setDescription("Statistics-" + sb.toString())
                .build());
            responseObserver.onCompleted();
        }

        // handle onError() ...
    };
}

The method gets a StreamObserver<StockQuote> as a parameter to respond to the client. It returns a StreamObserver<Stock>, where it processes the client request messages.

该方法得到一个StreamObserver<StockQuote>作为参数来响应客户端。它返回一个StreamObserver<Stock>,在这里它处理客户端的请求信息。

The returned StreamObserver<Stock> overrides onNext() to get notified each time the client sends a request.

返回的StreamObserver<Stock>重写了onNext(),以便在每次客户端发送请求时得到通知。

The method StreamObserver<Stock>.onCompleted() is called when the client has finished sending all the messages. With all the Stock messages that we have received, we find the average of the fetched stock prices, create a StockQuote, and invoke responseObserver::onNext to deliver the result to the client.

方法StreamObserver<Stock>.onCompleted()在客户端完成发送所有消息时被调用。通过我们收到的所有Stock消息,我们找到所获取的股票价格的平均值,创建一个StockQuote,并调用responseObserver::onNext来将结果传递给客户端。

Finally, we override StreamObserver<Stock>.onError() to handle abnormal terminations.

最后,我们覆盖StreamObserver<Stock>.onError()以处理异常的终止。

4.3. Bidirectional Streaming

4.3.双向流

The client sends several stocks and the server returns a set of prices for each request:

客户发送几个股票,服务器为每个请求返回一组价格

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        @Override
        public void onNext(Stock request) {
            for (int i = 1; i <= 5; i++) {
                StockQuote stockQuote = StockQuote.newBuilder()
                  .setPrice(fetchStockPriceBid(request))
                  .setOfferNumber(i)
                  .setDescription("Price for stock:" + request.getTickerSymbol())
                  .build();
                responseObserver.onNext(stockQuote);
            }
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }

        //handle OnError() ...
    };
}

We have the same method signature as in the previous example. What changes is the implementation: We don’t wait for the client to send all the messages before we respond.

我们的方法签名与前一个例子中的相同。改变的是实现。我们不等待客户端发送所有的消息,然后再进行响应。

In this case, we invoke responseObserver::onNext immediately after receiving each incoming message, and in the same order that it was received.

在这种情况下,我们在收到每个传入的消息后立即调用responseObserver::onNext,而且是按照收到消息的相同顺序调用。

It’s important to notice that we could have easily changed the order of the responses if needed.

值得注意的是,如果需要,我们可以很容易地改变回答的顺序。

5. Client Implementation

5.客户实施

The constructor of StockClient takes a gRPC channel and instantiates the stub classes generated by the gRPC Maven plugin:

StockClient的构造函数接收一个gRPC通道,并实例化由gRPC Maven插件生成的存根类。

public class StockClient {
    private StockQuoteProviderBlockingStub blockingStub;
    private StockQuoteProviderStub nonBlockingStub;

    public StockClient(Channel channel) {
        blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
        nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
    }
    // ...
}

StockQuoteProviderBlockingStub and StockQuoteProviderStub support making synchronous and asynchronous client method requests.

StockQuoteProviderBlockingStubStockQuoteProviderStub支持提出同步和异步的客户端方法请求

We’re going to see the client implementation for the three streaming RPCs next.

接下来,我们将看到三个流媒体RPC的客户端实现。

5.1. Client RPC with Server-Side Streaming

5.1.带有服务器端流的客户端RPC

The client makes a single call to the server requesting a stock price and gets back a list of quotes:

客户端向服务器发出一个请求股票价格的单一呼叫,并得到一个报价列表。

public void serverSideStreamingListOfStockPrices() {
    Stock request = Stock.newBuilder()
      .setTickerSymbol("AU")
      .setCompanyName("Austich")
      .setDescription("server streaming example")
      .build();
    Iterator<StockQuote> stockQuotes;
    try {
        logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
        stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
        for (int i = 1; stockQuotes.hasNext(); i++) {
            StockQuote stockQuote = stockQuotes.next();
            logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
        }
    } catch (StatusRuntimeException e) {
        logInfo("RPC failed: {0}", e.getStatus());
    }
}

We use blockingStub::serverSideStreamingGetListStocks to make a synchronous request. We get back a list of StockQuotes with the Iterator.

我们使用blockingStub::serverSideStreamingGetListStocks来做一个同步请求。我们得到一个带有IteratorStockQuotes的列表。

5.2. Client RPC with Client-Side Streaming

5.2.带有客户端流的客户端RPC

The client sends a stream of Stocks to the server and gets back a StockQuote with some statistics:

客户端向服务器发送一个股票流,并得到一个股票报价和一些统计数据。

public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote summary) {
            logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
        }

        // Override OnError ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

As we did with the server example, we use StreamObservers to send and receive messages.

正如我们在服务器例子中所做的,我们使用StreamObservers来发送和接收消息。

The requestObserver uses the non-blocking stub to send the list of Stocks to the server.

requestObserver使用非阻塞存根将Stocks的列表发送到服务器。

With responseObserver, we get back the StockQuote with some statistics.

通过responseObserver,我们得到带有一些统计数据的StockQuote

5.3. Client RPC with Bidirectional Streaming

5.3.带有双向流的客户端RPC

The client sends a stream of Stocks and gets back a list of prices for each Stock.

客户端发送一个Stocks流,并得到每个Stock的价格列表。

public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote stockQuote) {
            logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
        }

        //Override onError() ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
            Thread.sleep(200);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

The implementation is quite similar to the client-side streaming case. We send the Stocks with the requestObserver — the only difference is that now we get multiple responses with the responseObserver. The responses are decoupled from the requests — they can arrive in any order.

这个实现与客户端流媒体的情况很相似。我们用requestObserver发送Stock–唯一不同的是,现在我们用responseObserver获得多个响应。响应与请求是脱钩的–它们可以以任何顺序到达。

6. Running the Server and Client

6.运行服务器和客户端

After using Maven to compile the code, we just need to open two command windows.

使用Maven编译完代码后,我们只需要打开两个命令窗口。

To run the server:

要运行服务器。

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer

To run the client:

要运行客户端。

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient

7. Conclusion

7.结语

In this article, we’ve seen how to use streaming in gRPC. Streaming is a powerful feature that allows clients and servers to communicate by sending multiple messages over a single connection. Furthermore, the messages are received in the same order as they were sent, but either side can read or write the messages in any order they wish.

在这篇文章中,我们已经看到了如何在gRPC中使用流媒体。流媒体是一个强大的功能,它允许客户端和服务器通过单个连接发送多个消息来进行通信。此外,消息的接收顺序与发送顺序相同,但任何一方都可以按照他们希望的任何顺序读取或写入这些消息

The source code of the examples can be found over on GitHub.

这些例子的源代码可以在GitHub上找到