1. Overview
1.概述
In this article, we will be looking at the akka-streams library that is built atop of the Akka actor framework, which adheres to the reactive streams manifesto. The Akka Streams API allows us to easily compose data transformation flows from independent steps.
在本文中,我们将关注akka-streams库,该库建立在Akka actor框架之上,遵守reactive streams宣言。Akka Streams API使我们能够轻松地从独立的步骤中组成数据转换流。
Moreover, all processing is done in a reactive, non-blocking, and asynchronous way.
此外,所有的处理都是以反应式的、非阻塞的和异步的方式进行。
2. Maven Dependencies
2.Maven的依赖性
To get started, we need to add the akka-stream and akka-stream-testkit libraries into our pom.xml:
为了开始,我们需要将akka-stream和akka-stream-testkit库添加到我们的pom.xml:。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit_2.11</artifactId>
<version>2.5.2</version>
</dependency>
3. Akka Streams API
3.Akka Streams API
To work with Akka Streams, we need to be aware of the core API concepts:
为了使用Akka Streams,我们需要了解核心的API概念。
- Source – the entry point to processing in the akka-stream library – we can create an instance of this class from multiple sources; for example, we can use the single() method if we want to create a Source from a single String, or we can create a Source from an Iterable of elements
- Flow – the main processing building block – every Flow instance has one input and one output value
- Materializer – we can use one if we want our Flow to have some side effects like logging or saving results; most commonly, we will be passing the NotUsed alias as a Materializer to denote that our Flow should not have any side effects
- Sink operation – when we are building a Flow, it is not executed until we will register a Sink operation on it – it is a terminal operation that triggers all computations in the entire Flow
4. Creating Flows in Akka Streams
4.在Akka流中创建Flows
Let’s start by building a simple example, where we’ll show how to create and combine multiple Flows – to process a stream of integers and calculate the average moving window of integer pairs from the stream.
让我们从建立一个简单的例子开始,我们将展示如何创建和组合多个Flows–处理一个整数流并计算整数流中整数对的平均移动窗口。
We’ll parse a semicolon-delimited String of integers as input to create our akka-stream Source for the example.
我们将解析一个以分号分隔的String的整数作为输入,以创建我们的akka-stream Source的例子。
4.1. Using a Flow to Parse Input
4.1.使用流程来解析输入
First, let’s create a DataImporter class that will take an instance of the ActorSystem that we will use later to create our Flow:
首先,让我们创建一个DataImporter类,该类将接收ActorSystem的一个实例,我们稍后将使用该实例来创建我们的Flow。
public class DataImporter {
private ActorSystem actorSystem;
// standard constructors, getters...
}
Next, let’s create a parseLine method that will generate a List of Integer from our delimited input String. Keep in mind that we are using Java Stream API here only for parsing:
接下来,让我们创建一个parseLine方法,它将从我们划定的输入String中生成List of Integer。请记住,我们在这里只使用Java Stream API来进行解析。
private List<Integer> parseLine(String line) {
String[] fields = line.split(";");
return Arrays.stream(fields)
.map(Integer::parseInt)
.collect(Collectors.toList());
}
Our initial Flow will apply parseLine to our input to create a Flow with input type String and output type Integer:
我们的初始Flow将应用parseLine到我们的输入,创建一个Flow,输入类型为String,输出类型为Integer。
private Flow<String, Integer, NotUsed> parseContent() {
return Flow.of(String.class)
.mapConcat(this::parseLine);
}
When we call the parseLine() method, the compiler knows that the argument to that lambda function will be a String – same as the input type to our Flow.
当我们调用parseLine()方法时,编译器知道该lambda函数的参数将是String–与我们Flow的输入类型相同。
Note that we are using the mapConcat() method – equivalent to the Java 8 flatMap() method – because we want to flatten the List of Integer returned by parseLine() into a Flow of Integer so that subsequent steps in our processing do not need to deal with the List.
请注意,我们使用的是mapConcat() 方法–相当于Java 8的flatMap() 方法–因为我们想把parseLine()返回的List的Integer平铺到一个Flow的Integer中,这样我们处理的后续步骤就不需要处理这个List。
4.2. Using a Flow to Perform Calculations
4.2.使用流来进行计算
At this point, we have our Flow of parsed integers. Now, we need to implement logic that will group all input elements into pairs and calculate an average of those pairs.
在这一点上,我们有了解析过的整数的Flow。现在,我们需要实现将所有输入元素分组成对并计算这些对的平均值的逻辑。
Now, we’ll create a Flow of Integers and group them using the grouped() method.
现在,我们将创建一个Flow 的Integers,并使用grouped() method将它们分组。
Next, we want to calculate an average.
接下来,我们要计算出一个平均值。
Since we are not interested in the order in which those averages will be processed, we can have averages calculated in parallel using multiple threads by using the mapAsyncUnordered() method, passing the number of threads as an argument to this method.
由于我们对这些平均数的处理顺序不感兴趣,我们可以通过使用mapAsyncUnordered() method,将线程数作为参数传递给该方法,让平均数使用多个线程并行计算。
The action that will be passed as the lambda to the Flow needs to return a CompletableFuture because that action will be calculated asynchronously in the separate thread:
将作为lambda传递给Flow的动作需要返回一个CompletableFuture,因为该动作将在独立的线程中被异步计算。
private Flow<Integer, Double, NotUsed> computeAverage() {
return Flow.of(Integer.class)
.grouped(2)
.mapAsyncUnordered(8, integers ->
CompletableFuture.supplyAsync(() -> integers.stream()
.mapToDouble(v -> v)
.average()
.orElse(-1.0)));
}
We are calculating averages in eight parallel threads. Note that we are using the Java 8 Stream API for calculating an average.
我们在八个并行线程中计算平均数。请注意,我们使用的是Java 8 Stream API来计算平均数。
4.3. Composing Multiple Flows into a Single Flow
4.3.将多个流组合成一个流
The Flow API is a fluent abstraction that allows us to compose multiple Flow instances to achieve our final processing goal. We can have granular flows where one, for example, is parsing JSON, another is doing some transformation, and another is gathering some statistics.
Flow API是一个流畅的抽象,它允许我们组合多个Flow实例来实现我们的最终处理目标。我们可以有细化的流程,例如,一个是解析JSON,另一个是做一些转换,还有一个是收集一些统计数据。
Such granularity will help us create more testable code because we can test each processing step independently.
这样的颗粒度将帮助我们创建更多可测试的代码,因为我们可以独立地测试每个处理步骤。
We created two flows above that can work independently of each other. Now, we want to compose them together.
我们在上面创建了两个可以相互独立工作的流程。现在,我们想把它们组合在一起。
First, we want to parse our input String, and next, we want to calculate an average on a stream of elements.
首先,我们要解析我们的输入String,接下来,我们要计算一个元素流的平均值。
We can compose our flows using the via() method:
我们可以使用via()方法组成我们的流。
Flow<String, Double, NotUsed> calculateAverage() {
return Flow.of(String.class)
.via(parseContent())
.via(computeAverage());
}
We created a Flow having input type String and two other flows after it. The parseContent() Flow takes a String input and returns an Integer as output. The computeAverage() Flow is taking that Integer and calculates an average returning Double as the output type.
我们创建了一个输入类型为String的Flow,并在它之后创建了另外两个flow。parseContent() Flow接收一个String输入并返回一个Integer作为输出。computeAverage() Flow接收该Integer并计算一个平均值,返回Double作为输出类型。
5. Adding Sink to the Flow
5.将水槽添加到流程
As we mentioned, to this point the whole Flow is not yet executed because it is lazy. To start execution of the Flow we need to define a Sink. The Sink operation can, for example, save data into a database, or send results to some external web service.
正如我们所提到的,到目前为止,整个Flow还没有被执行,因为它是懒惰的。为了开始执行Flow,我们需要定义一个Sink。例如,Sink操作可以将数据保存到数据库中,或者将结果发送到一些外部Web服务中。
Suppose we have an AverageRepository class with the following save() method that writes results to our database:
假设我们有一个AverageRepository类,有以下save()方法,将结果写到我们的数据库。
CompletionStage<Double> save(Double average) {
return CompletableFuture.supplyAsync(() -> {
// write to database
return average;
});
}
Now, we want to create a Sink operation that use this method to save the results of our Flow processing. To create our Sink, we first need to create a Flow that takes a result of our processing as the input type. Next, we want to save all our results to the database.
现在,我们要创建一个Sink操作,使用这个方法来保存我们Flow处理的结果。为了创建我们的Sink,我们首先需要创建一个Flow,将我们处理的结果作为输入类型。接下来,我们要将所有的结果保存到数据库中。
Again, we do not care about ordering of the elements, so we can perform the save() operations in parallel using the mapAsyncUnordered() method.
同样,我们不关心元素的排序,所以我们可以使用mapAsyncUnordered() 方法并行执行save()操作。
To create a Sink from the Flow we need to call the toMat() with Sink.ignore() as a first argument and Keep.right() as the second because we want to return a status of the processing:
为了从Flow创建一个Sink,我们需要调用toMat(),将Sink.ignore()作为第一个参数,将Keep.right()作为第二个参数,因为我们想要返回一个处理的状态。
private Sink<Double, CompletionStage<Done>> storeAverages() {
return Flow.of(Double.class)
.mapAsyncUnordered(4, averageRepository::save)
.toMat(Sink.ignore(), Keep.right());
}
6. Defining a Source for Flow
6.为Flow定义一个来源
The last thing that we need to do is to create a Source from the input String. We can apply a calculateAverage() Flow to this source using the via() method.
我们需要做的最后一件事是从输入的字符串创建一个Source。我们可以使用via()方法将calculateAverage() Flow应用于这个源。
Then, to add the Sink to the processing, we need to call the runWith() method and pass the storeAverages() Sink that we just created:
然后,为了将Sink添加到处理中,我们需要调用runWith()方法并传递我们刚刚创建的storeAverages()Sink。
CompletionStage<Done> calculateAverageForContent(String content) {
return Source.single(content)
.via(calculateAverage())
.runWith(storeAverages(), ActorMaterializer.create(actorSystem))
.whenComplete((d, e) -> {
if (d != null) {
System.out.println("Import finished ");
} else {
e.printStackTrace();
}
});
}
Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.
注意,当处理完成后,我们要添加whenComplete()回调,在这个回调中,我们可以根据处理的结果执行一些行动。
7. Testing Akka Streams
7.测试Akka流
We can test our processing using the akka-stream-testkit.
我们可以使用akka-stream-testkit来测试我们的处理。
The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.
测试实际处理逻辑的最佳方式是测试所有的Flow 逻辑,并使用TestSink 来触发计算并对结果进行断言。
In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:
在我们的测试中,我们正在创建我们想要测试的Flow,接下来,我们将从测试输入内容中创建一个Source。
@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
// given
Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
String input = "1;9;11;0";
// when
Source<Double, NotUsed> flow = Source.single(input).via(tested);
// then
flow
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(4)
.expectNextUnordered(5d, 5.5);
}
We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.
我们要检查的是,我们期待四个输入参数,而作为平均值的两个结果可以以任何顺序到达,因为我们的处理是以异步和并行方式进行的。
8. Conclusion
8.结论
In this article, we were looking at the akka-stream library.
在这篇文章中,我们看的是akka-streamlibrary。
We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.
我们定义了一个流程,结合多个Flows来计算元素的移动平均。然后,我们定义了一个Source,作为流处理的入口,以及一个Sink,触发实际处理。
Finally, we wrote a test for our processing using the akka-stream-testkit.
最后,我们使用akka-stream-testkit为我们的处理写了一个测试。
The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.
所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。