1. Overview
1.概述
In this article, we’ll take a look at the Mantis platform developed by Netflix.
在这篇文章中,我们将看一下Netflix开发的Mantis平台。
We’ll explore the main Mantis concepts by creating, running, and investigating a stream processing job.
我们将通过创建、运行和调查一个流处理作业来探索Mantis的主要概念。
2. What Is Mantis?
2.什么是Mantis?
Mantis is a platform for building stream-processing applications (jobs). It provides an easy way to manage the deployment and life-cycle of jobs. Moreover, it facilitates resource allocation, discovery, and communication between these jobs.
Mantis是一个用于构建流处理应用程序的平台(作业)。它提供了一种简单的方法来管理作业的部署和生命周期。此外,它促进了资源分配、发现和这些作业之间的通信。
Therefore, developers can focus on actual business logic, all the while having the support of a robust and scalable platform to run their high volume, low latency, non-blocking applications.
因此,开发人员可以专注于实际的业务逻辑,同时有一个强大和可扩展的平台支持,以运行他们的高容量、低延迟、非阻塞的应用程序。
A Mantis job consists of three distinct parts:
一个Mantis工作由三个不同的部分组成。
- the source, responsible for retrieving the data from an external source
- one or more stages, responsible for processing the incoming event streams
- and a sink that collects the processed data
Let’s now explore each of them.
现在让我们逐一探讨。
3. Setup and Dependencies
3.设置和依赖性
Let’s start by adding the mantis-runtime and jackson-databind dependencies:
让我们开始添加mantis-runtime和jackson-databind依赖项。
<dependency>
<groupId>io.mantisrx</groupId>
<artifactId>mantis-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Now, for setting up our job’s data source, let’s implement the Mantis Source interface:
现在,为了设置我们工作的数据源,让我们实现Mantis的Source接口。
public class RandomLogSource implements Source<String> {
@Override
public Observable<Observable<String>> call(Context context, Index index) {
return Observable.just(
Observable
.interval(250, TimeUnit.MILLISECONDS)
.map(this::createRandomLogEvent));
}
private String createRandomLogEvent(Long tick) {
// generate a random log entry string
...
}
}
As we can see, it simply generates random log entries multiple times per second.
我们可以看到,它只是每秒多次生成随机的日志条目。
4. Our First Job
4.我们的第一份工作
Let’s now create a Mantis job that simply collects log events from our RandomLogSource. Later on, we’ll add group and aggregation transformations for a more complex and interesting result.
现在让我们创建一个Mantis作业,简单地从我们的RandomLogSource收集日志事件。稍后,我们将添加分组和聚合转换,以获得更复杂和有趣的结果。
To begin with, let’s create a LogEvent entity:
首先,让我们创建一个LogEvent实体。
public class LogEvent implements JsonType {
private Long index;
private String level;
private String message;
// ...
}
Then, let’s add our TransformLogStage.
然后,让我们添加我们的TransformLogStage.。
It’s a simple stage that implements the ScalarComputation interface and splits a log entry to build a LogEvent. Also, it filters out any wrong formatted strings:
这是一个简单的阶段,它实现了ScalarComputation接口,并分割了一个日志条目,以建立一个LogEvent。此外,它还过滤掉任何错误的格式化字符串。
public class TransformLogStage implements ScalarComputation<String, LogEvent> {
@Override
public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
return logEntry
.map(log -> log.split("#"))
.filter(parts -> parts.length == 3)
.map(LogEvent::new);
}
}
4.1. Running the Job
4.1.运行作业
At this point, we have enough building blocks for putting together our Mantis job:
在这一点上,我们已经有了足够的构件来组合我们的Mantis工作。
public class LogCollectingJob extends MantisJobProvider<LogEvent> {
@Override
public Job<LogEvent> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
Let’s take a closer look at our job.
让我们仔细看看我们的工作。
As we can see, it extends MantisJobProvider. At first, it fetches data from our RandomLogSource and applies the TransformLogStage to the fetched data. Finally, it sends the processed data to the built-in sink that eagerly subscribes and delivers data over SSE.
正如我们所看到的,它扩展了MantisJobProvider。首先,它从我们的RandomLogSource中获取数据,并对获取的数据应用TransformLogStage。最后,它将处理后的数据发送到内置的水槽,该水槽急切地订阅并通过SSE交付数据。
Now, let’s configure our job to execute locally on startup:
现在,让我们配置我们的工作,使其在启动时在本地执行。
@SpringBootApplication
public class MantisApplication implements CommandLineRunner {
// ...
@Override
public void run(String... args) {
LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
}
}
Let’s run the application. We’ll see a log message like:
让我们运行该应用程序。我们会看到一个日志信息,如
...
Serving modern HTTP SSE server sink on port: 86XX
Let’s now connect to the sink using curl:
现在让我们用curl连接到水槽。
$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...
4.2. Configuring the Sink
4.2.配置水槽
So far, we’ve used the built-in sink for collecting our processed data. Let’s see if we can add more flexibility to our scenario by providing a custom sink.
到目前为止,我们已经使用内置的水槽来收集我们处理过的数据。让我们看看我们是否可以通过提供一个自定义的水槽来为我们的方案增加更多的灵活性。
What if, for example, we’d like to filter logs by message?
例如,如果我们想按message过滤日志,怎么办?
Let’s create a LogSink that implements the Sink<LogEvent> interface:
让我们创建一个LogSink,实现Sink<LogEvent> 接口。
public class LogSink implements Sink<LogEvent> {
@Override
public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
.withEncoder(LogEvent::toJsonString)
.withPredicate(filterByLogMessage())
.build();
logEventObservable.subscribe();
sink.call(context, portRequest, logEventObservable);
}
private Predicate<LogEvent> filterByLogMessage() {
return new Predicate<>("filter by message",
parameters -> {
if (parameters != null && parameters.containsKey("filter")) {
return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
}
return logEvent -> true;
});
}
}
In this sink implementation, we configured a predicate that uses the filter parameter to only retrieve logs that contain the text set in the filter parameter:
在这个水槽实现中,我们配置了一个谓词,使用filter参数,只检索包含filter参数中设置的文本的日志。
$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...
Note Mantis also offers a powerful querying language, MQL, that can be used for querying, transforming, and analyzing stream data in a SQL fashion.
注意Mantis还提供了一种强大的查询语言,MQL,可用于以SQL方式查询、转换和分析流数据。。
5. Stage Chaining
5.阶段链
Let’s now suppose we’re interested in knowing how many ERROR, WARN, or INFO log entries we have in a given time interval. For this, we’ll add two more stages to our job and chain them together.
现在我们假设我们对知道在给定的时间间隔内有多少ERROR、WARN,或INFO日志条目感兴趣。为此,我们将为我们的工作增加两个阶段,并将它们连在一起。
5.1. Grouping
5.1.分组
Firstly, let’s create a GroupLogStage.
首先,让我们创建一个GroupLogStage.。
This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:
这个阶段是一个ToGroupComputation实现,它从现有的TransformLogStage接收LogEvent流数据。之后,它按日志级别对条目进行分组,并将它们发送到下一个阶段。
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {
@Override
public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
}
public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
.description("Group event data by level")
.codec(JacksonCodecs.pojo(LogEvent.class))
.concurrentInput();
}
}
We’ve also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage’s call method to run concurrently by using concurrentInput().
我们还通过提供描述、用于序列化输出的编解码器创建了一个自定义阶段配置,并通过使用 concurrentInput()允许该阶段的调用方法并发运行。
One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning, when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.
需要注意的一点是,这个阶段是可以横向扩展的。这意味着我们可以根据需要运行该阶段的多个实例。另外值得一提的是,当部署在Mantis集群中时,该阶段会将数据发送到下一个阶段,这样所有属于特定组的事件都会落在下一个阶段的同一个工作者身上。
5.2. Aggregating
5.2.汇总
Before we move on and create the next stage, let’s first add a LogAggregate entity:
在我们继续前进并创建下一个阶段之前,让我们首先添加一个LogAggregate实体。
public class LogAggregate implements JsonType {
private final Integer count;
private final String level;
}
Now, let’s create the last stage in the chain.
现在,让我们来创建链条中的最后一个阶段。
This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:
这个阶段实现了GroupToScalarComputation,并将日志组流转换为标量LogAggregate。它通过计算每种类型的日志在流中出现的次数来实现这一目的。此外,它还有一个LogAggregationDuration参数,可以用来控制聚合窗口的大小。
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {
private int duration;
@Override
public void init(Context context) {
duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
}
@Override
public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
return mantisGroup
.window(duration, TimeUnit.MILLISECONDS)
.flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
.flatMap(group -> group.reduce(0, (count, value) -> count = count + 1)
.map((count) -> new LogAggregate(count, group.getKey()))
));
}
public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
.description("sum events for a log level")
.codec(JacksonCodecs.pojo(LogAggregate.class))
.withParameters(getParameters());
}
public static List<ParameterDefinition<?>> getParameters() {
List<ParameterDefinition<?>> params = new ArrayList<>();
params.add(new IntParameter()
.name("LogAggregationDuration")
.description("window size for aggregation in milliseconds")
.validator(Validators.range(100, 10000))
.defaultValue(5000)
.build());
return params;
}
}
5.3. Configure and Run the Job
5.3.配置和运行作业
The only thing left to do now is to configure our job:
现在唯一要做的事情就是配置我们的工作。
public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
@Override
public Job<LogAggregate> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:
只要我们运行应用程序并执行我们的新作业,我们就可以看到每隔几秒钟就会检索一次日志计数。
$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}
data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...
6. Conclusion
6.结论
To sum up, in this article, we’ve seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.
总而言之,在这篇文章中,我们已经看到了Netflix Mantis是什么,以及它可以用来做什么。此外,我们看了主要的概念,用它们来建立工作,并探索了不同场景的自定义配置。
As always, the complete code is available over on GitHub.
一如既往,完整的代码可在GitHub上获得,。