1. Overview
RSocket is an application protocol providing Reactive Streams semantics – it functions, for example, as an alternative to HTTP.
RSocket是一个提供Reactive Streams语义的应用协议–它的功能是,例如,作为HTTP的替代。
In this tutorial, we’re going to look at RSocket using Spring Boot, and specifically how it helps abstract away the lower-level RSocket API.
在本教程中,我们将使用RSocket看一看Spring Boot,特别是它如何帮助抽象出低级别的 RSocket API。
2. Dependencies
Let’s start with adding the spring-boot-starter-rsocket dependency:
This will transitively pull in RSocket related dependencies such as rsocket-core and rsocket-transport-netty.
3. Sample Application
Now we’ll continue with our sample application. To highlight the interaction models RSocket provides, we’re going to create a trader application. Our trader application will consist of a client and a server.
3.1. Server Setup
First, let’s set up the server, which will be a Spring Boot application bootstrapping an RSocket server.
首先,我们来设置服务器,它将是一个引导RSocket服务器的Spring Boot应用程序。
Since we have the spring-boot-starter-rsocket dependency, Spring Boot autoconfigures an RSocket server for us. As usual with Spring Boot, we can change default configuration values for the RSocket server in a property-driven fashion.
由于我们有spring-boot-starter-rsocket依赖,Spring Boot为我们自动配置了一个RSocket服务器。与Spring Boot一样,我们可以以属性驱动的方式改变RSocket服务器的默认配置值。
For example, let’s change the port of our RSocket server by adding the following line to our application.properties file:
We can also change other properties to further modify our server according to our needs.
3.2. Client Setup
3.2 客户端设置
Next, let’s set up the client which will also be a Spring Boot application.
接下来,我们来设置客户端,它也将是一个Spring Boot应用程序。
Although Spring Boot auto-configures most of the RSocket related components, we should also define some beans to complete the setup:
虽然Spring Boot自动配置了大部分与RSocket相关的组件,但我们还应该定义一些Bean来完成设置。
public class ClientConfiguration {
public RSocketRequester getRSocketRequester(){
RSocketRequester.Builder builder = RSocketRequester.builder();
return builder
rSocketConnector ->
rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
.tcp("localhost", 7000);
Here we’re creating the RSocket client and configuring it to use TCP transport on port 7000. Note that this is the server port we’ve configured previously.
After defining this bean configuration, we have a bare-bones structure.
Next, we’ll explore different interaction models and see how Spring Boot helps us there.
接下来,我们将探索不同的交互模型,看看Spring Boot如何帮助我们实现。
4. Request/Response with RSocket and Spring Boot
4.使用RSocket和Spring Boot的请求/响应
Let’s start with Request/Response. This is probably the most common and familiar interaction model since HTTP also employs this type of communication.
In this interaction model, the client initiates the communication and sends a request. Afterward, the server performs the operation and returns a response to the client – thus the communication completes.
In our trader application, a client will ask for the current market data of a given stock. In return, the server will pass the requested data.
4.1. Server
On the server side, we should first create a controller to hold our handler methods. But instead of @RequestMapping or @GetMapping annotations like in Spring MVC, we will use the @MessageMapping annotation:
在服务器端,我们应该首先创建一个控制器来保存我们的处理方法。但不是像Spring MVC中的@RequestMapping或@GetMapping注释,我们将使用@MessageMapping注释。
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
So let’s investigate our controller.
We’re using the @Controller annotation to define a handler which should process incoming RSocket requests. Additionally, the @MessageMapping annotation lets us define which route we’re interested in and how to react upon a request.
In this case, the server listens for the currentMarketData route, which returns a single result to the client as a Mono<MarketData>.
4.2. Client
4.2 客户
Next, our RSocket client should ask for the current price of a stock and get a single response.
To initiate the request, we should use the RSocketRequester class:
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester
.data(new MarketDataRequest(stock))
Note that in our case, the RSocket client is also a REST controller from which we call our RSocket server. So, we’re using @RestController and @GetMapping to define our request/response endpoint.
In the endpoint method, we’re using RSocketRequester and specifying the route. In fact, this is the route which the RSocket server expects. Then we’re passing the request data. And lastly, when we call the retrieveMono() method, Spring Boot initiates a request/response interaction.
在端点方法中,我们使用RSocketRequester并指定了路由。事实上,这就是RSocket服务器所期望的路由。然后,我们要传递请求数据。最后,当我们调用retrieveMono()方法时,Spring Boot启动了一个请求/响应互动。
5. Fire and Forget With RSocket and Spring Boot
5.使用RSocket和Spring Boot的火灾和遗忘
Next, we’ll look at the fire-and-forget interaction model. As the name implies, the client sends a request to the server but doesn’t expect a response back.
接下来,我们将看一下 “发射-遗忘 “交互模型。顾名思义,客户端向服务器发送一个请求,但并不期望得到回应。
In our trader application, some clients will serve as a data source and will push market data to the server.
5.1. Server
Let’s create another endpoint in our server application:
public Mono<Void> collectMarketData(MarketData marketData) {
return Mono.empty();
Again, we’re defining a new @MessageMapping with the route value of collectMarketData. Furthermore, Spring Boot automatically converts the incoming payload to a MarketData instance.
同样,我们定义了一个新的@MessageMapping,路由值为collectMarketData。此外,Spring Boot自动将传入的有效载荷转换为MarketData实例。
The big difference here, though, is that we return a Mono<Void> since the client doesn’t need a response from us.
不过,这里最大的区别是,我们返回一个Mono<Void> ,因为客户端不需要我们的响应。。
5.2. Client
Let’s see how we can initiate our fire-and-forget request.
让我们看看如何启动我们的 “防火 “请求。
We’ll create another REST endpoint:
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester
Here we’re specifying our route and our payload will be a MarketData instance. Since we’re using the send() method to initiate the request instead of retrieveMono(), the interaction model becomes fire-and-forget.
6. Request Stream with RSocket and Spring Boot
6.使用RSocket和Spring Boot的请求流
Request streaming is a more involved interaction model, where the client sends a request but gets multiple responses over the course of time from the server.
To simulate this interaction model, a client will ask for all market data of a given stock.
6.1. Server
Let’s start with our server. We’ll add another message mapping method:
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
As we can see, this handler method is very similar to the other ones. The different part is that we returning a Flux<MarketData> instead of a Mono<MarketData>. In the end, our RSocket server will send multiple responses to the client.
6.2. Client
On the client side, we should create an endpoint to initiate our request/stream communication:
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester
.data(new MarketDataRequest(stock))
Let’s investigate our RSocket request.
First, we’re defining the route and request payload. Then, we’re defining our response expectation with the retrieveFlux() method call. This is the part which determines the interaction model.
Also note that, since our client is also a REST server, it defines response media type as MediaType.TEXT_EVENT_STREAM_VALUE.
7. Exception Handling
Now let’s see how we can handle exceptions in our server application in a declarative way.
When doing request/response, we can simply use the @MessageExceptionHandler annotation:
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
Here we’ve annotated our exception handler method with @MessageExceptionHandler. As a result, it will handle all types of exceptions since the Exception class is the superclass of all others.
We can be more specific and create different exception handler methods for different exception types.
This is of course for the request/response model, and so we’re returning a Mono<MarketData>. We want our return type here to match the return type of our interaction model.
8. Summary
In this tutorial, we’ve covered Spring Boot’s RSocket support and detailed different interaction models RSocket provides.
在本教程中,我们已经介绍了Spring Boot的RSocket支持,并详细介绍了RSocket提供的不同交互模型。
As always, you can check out all the code samples over on GitHub.