1. Introduction
1.绪论
RxJava is one of the most popular reactive programming libraries out there.
RxJava是目前最流行的反应式编程库之一。
And Ratpack is a collection of Java libraries for creating lean and powerful web applications built on Netty.
而Ratpack是一个Java库的集合,用于创建基于Netty的精简而强大的Web应用程序。
In this tutorial, we’ll discuss the incorporation of RxJava in a Ratpack application to create a nice reactive web app.
在本教程中,我们将讨论在Ratpack应用程序中加入RxJava,以创建一个漂亮的反应式Web应用程序。
2. Maven Dependencies
2.Maven的依赖性
Now, we first need the ratpack-core and ratpack-rx dependencies:
现在,我们首先需要ratpack-core和ratpack-rx依赖。
<dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-rx</artifactId>
<version>1.6.0</version>
</dependency>
Note, by the way, that ratpack-rx imports the rxjava dependency for us.
顺便注意,ratpack-rx为我们导入了rxjava依赖。
3. Initial Setup
3.初始设置
RxJava supports the integration of 3rd party libraries, using its plugin system. So, we can incorporate different execution strategies into RxJava’s execution model.
RxJava支持使用其插件系统来集成第三方库。因此,我们可以将不同的执行策略纳入RxJava的执行模型中。
Ratpack plugs into this execution model via RxRatpack, which we initialize at startup:
Ratpack通过RxRatpack插入这个执行模型,我们在启动时初始化它。
RxRatpack.initialise();
Now, it’s important to note that the method needs to be called only once per JVM run.
现在,需要注意的是,该方法在每个JVM运行中只需要被调用一次。
The result is that we’ll be able to map RxJava’s Observables into RxRatpack’s Promise types and vice versa.
其结果是,我们将能够把RxJava的Observables映射到RxRatpack的Promise类型,反之亦然。
4. Observables to Promises
4.可观察到的s到承诺s
We can convert an Observable in RxJava into a Ratpack Promise.
我们可以将RxJava中的Observable转换成Ratpack的Promise.。
However, there is a bit of a mismatch. See, a Promise emits a single value, but an Observable can emit a stream of them.
然而,有一点不匹配。看,Promise发射一个单一的值,但是Observable可以发射一个流。
RxRatpack handles this by offering two different methods: promiseSingle() and promise().
RxRatpack通过提供两种不同的方法来处理这个问题。promiseSingle()和promise()。
So, let’s assume we have a service named MovieService that emits a single promise on getMovie(). We’d use promiseSingle() since we know it will only emit once:
因此,让我们假设我们有一个名为MovieService的服务,它在getMovie()上发出了一个承诺。我们将使用promiseSingle(),因为我们知道它将只发射一次。
Handler movieHandler = (ctx) -> {
MovieService movieSvc = ctx.get(MovieService.class);
Observable<Movie> movieObs = movieSvc.getMovie();
RxRatpack.promiseSingle(movieObs)
.then(movie -> ctx.render(Jackson.json(movie)));
};
On the other hand, if getMovies() can return a stream of movie results, we’d use promise():
另一方面,如果getMovies()可以返回一个电影结果流,我们会使用promise()。
Handler moviesHandler = (ctx) -> {
MovieService movieSvc = ctx.get(MovieService.class);
Observable<Movie> movieObs = movieSvc.getMovies();
RxRatpack.promise(movieObs)
.then(movie -> ctx.render(Jackson.json(movie)));
};
Then, we can add these handlers to our Ratpack server like normal:
然后,我们可以像平常一样把这些处理程序添加到我们的Ratpack服务器上。
RatpackServer.start(def -> def.registryOf(rSpec -> rSpec.add(MovieService.class, new MovieServiceImpl()))
.handlers(chain -> chain
.get("movie", movieHandler)
.get("movies", moviesHandler)));
5. Promises to Observables
5.承诺到可观察的承诺
Conversely, we can map a Promise type in Ratpack back to an RxJava Observable.
相反,我们可以将Ratpack中的Promise类型映射到RxJava的Observable。
RxRatpack again has two methods: observe() and observeEach().
RxRatpack又有两个方法。observe()和observeEach()。
In this case, we’ll imagine we have a movie service that returns Promises instead of Observables.
在这种情况下,我们将想象我们有一个电影服务,它返回Promises而不是Observables。
With our getMovie(), we’d use observe():
对于我们的getMovie(),我们会使用observe()。
Handler moviePromiseHandler = ctx -> {
MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
Promise<Movie> moviePromise = promiseSvc.getMovie();
RxRatpack.observe(moviePromise)
.subscribe(movie -> ctx.render(Jackson.json(movie)));
};
And when we get back a list, like with getMovies(), we’d use observeEach():
而当我们得到一个列表时,比如用getMovies(),我们会使用observeEach()。
Handler moviesPromiseHandler = ctx -> {
MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
Promise<List<Movie>> moviePromises = promiseSvc.getMovies();
RxRatpack.observeEach(moviePromises)
.toList()
.subscribe(movie -> ctx.render(Jackson.json(movie)));
};
Then, again, we can add the handlers as expected:
然后,同样地,我们可以像预期的那样添加处理程序。
RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
.add(MoviePromiseService.class, new MoviePromiseServiceImpl()))
.handlers(chain -> chain
.get("movie", moviePromiseHandler)
.get("movies", moviesPromiseHandler)));
6. Parallel Processing
6.平行处理
RxRatpack supports parallelism with the help of the fork() and forkEach() methods.
RxRatpack在fork()和forkEach()方法的帮助下支持并行。
And it follows a pattern we’ve already seen with each.
而且它遵循我们已经看到的每个人的模式。
fork() takes a single Observable and parallelizes its execution onto a different compute thread. Then, it automatically binds the data back to the original execution.
fork()接收一个Observable,并将其执行平行化到不同的计算线程。然后,它自动将数据绑定到原始执行中。
On the other hand, forkEach() does the same for each element emitted by an Observable‘s stream of values.
另一方面,forkEach()对Observable的值流所发射的每个元素做同样的处理。
Let’s imagine for a moment that we want to capitalize our movie titles and that such is an expensive operation.
让我们设想一下,我们想把我们的电影标题资本化,而这是一个昂贵的操作。
Simply put, we can use forkEach() to off-load the execution of each onto a thread pool:
简单地说,我们可以使用forkEach()来卸载每个线程池的执行。
Observable<Movie> movieObs = movieSvc.getMovies();
Observable<String> upperCasedNames = movieObs.compose(RxRatpack::forkEach)
.map(movie -> movie.getName().toUpperCase())
.serialize();
7. Implicit Error Handling
7. 隐性错误处理
Lastly, implicit error handling is one of the key features in RxJava integration.
最后,隐式错误处理是RxJava集成中的关键功能之一。
By default, RxJava observable sequences will forward any exception to the execution context exception handler. For this reason, error handlers don’t need to be defined in observable sequences.
默认情况下,RxJava可观察序列将把任何异常转发给执行上下文的异常处理程序。由于这个原因,错误处理程序不需要在可观察序列中定义。
So, we can configure Ratpack to handle these errors raised by RxJava.
因此,我们可以配置Ratpack来处理这些由RxJava引发的错误。
Let’s say, for example, that we wanted each error to be printed in the HTTP response.
比方说,我们希望每个错误都被打印在HTTP响应中。
Note that the exception we throw via the Observable gets caught and handled by our ServerErrorHandler:
请注意,我们通过Observable抛出的异常被我们的ServerErrorHandler捕获并处理。
RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
.add(ServerErrorHandler.class, (ctx, throwable) -> {
ctx.render("Error caught by handler : " + throwable.getMessage());
}))
.handlers(chain -> chain
.get("error", ctx -> {
Observable.<String> error(new Exception("Error from observable")).subscribe(s -> {});
})));
Note that any subscriber-level error handling takes precedence, though. If our Observable wanted to do its own error handling, it could, but since it doesn’t, the exception percolates up to Ratpack.
注意,任何订阅者级别的错误处理都是优先的。如果我们的 Observable 想要做它自己的错误处理,它可以,但是因为它没有,所以异常会渗透到 Ratpack。
8. Conclusion
8.结语
In this article, we talked about how to configure RxJava with Ratpack.
在这篇文章中,我们谈到了如何用Ratpack配置RxJava。
We explored the conversions of Observables in RxJava to Promise types in Ratpack and vice versa. We also looked into the parallelism and implicit error handling features supported by the integration.
我们探讨了RxJava中的Observables到Ratpack中的Promise类型的转换,反之亦然。我们还研究了集成所支持的并行性和隐式错误处理功能。
All code samples used for the article can be found over at Github.
文章中使用的所有代码样本都可以在Github上找到。