1. Introduction
1.绪论
In this tutorial, we’ll learn about Hazelcast Jet. It’s a distributed data processing engine provided by Hazelcast, Inc. and is built on top of Hazelcast IMDG.
在本教程中,我们将学习Hazelcast Jet。它是由Hazelcast公司提供的一个分布式数据处理引擎,建立在Hazelcast IMDG之上。
If you want to learn about Hazelcast IMDG, here is an article for getting started.
如果您想了解 Hazelcast IMDG,这里是一篇用于入门的文章。
2. What Is Hazelcast Jet?
2.什么是Hazelcast Jet?
Hazelcast Jet is a distributed data processing engine that treats data as streams. It can process data that is stored in a database or files as well as the data that is streamed by a Kafka server.
Hazelcast Jet是一个分布式数据处理引擎,将数据视为流。它可以处理存储在数据库或文件中的数据,以及由Kafka服务器传输的数据。
Moreover, it can perform aggregate functions over infinite data streams by dividing the streams into subsets and applying aggregation over each subset. This concept is known as windowing in the Jet terminology.
此外,它可以通过将数据流划分为子集并在每个子集上应用聚合功能,对无限的数据流执行聚合功能。这个概念在Jet的术语中被称为窗口化。
We can deploy Jet in a cluster of machines and then submit our data processing jobs to it. Jet will make all the members of the cluster automatically process the data. Each member of the cluster consumes a part of the data, and that makes it easy to scale up to any level of throughput.
我们可以将Jet部署在一个机器集群中,然后将我们的数据处理作业提交给它。Jet将使集群中的所有成员自动处理数据。集群中的每个成员消耗一部分数据,这使得它很容易扩展到任何水平的吞吐量。
Here are the typical use cases for Hazelcast Jet:
以下是Hazelcast Jet的典型用例。
- Real-Time Stream Processing
- Fast Batch Processing
- Processing Java 8 Streams in a distributed way
- Data processing in Microservices
3. Setup
3.设置
To setup Hazelcast Jet in our environment, we just need to add a single Maven dependency to our pom.xml.
要在我们的环境中设置Hazelcast Jet,我们只需要在pom.xml中添加一个Maven依赖。
Here’s how we do it:
我们是这样做的。
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.2</version>
</dependency>
Including this dependency will download a 10 Mb jar file which provides us with all the infrastructure we need to build a distributed data processing pipeline.
包括这个依赖关系将下载一个10Mb的jar文件,为我们提供建立分布式数据处理管道所需的所有基础设施。
The latest version for Hazelcast Jet can be found here.
Hazelcast Jet的最新版本可以在这里找到。
4. Sample Application
4.申请书样本
In order to learn more about Hazelcast Jet, we’ll create a sample application that takes an input of sentences and a word to find in those sentences and returns the count of the specified word in those sentences.
为了了解更多关于Hazelcast Jet的信息,我们将创建一个示例程序,该程序接收输入的句子和要在这些句子中寻找的单词,并返回这些句子中指定单词的数量。
4.1. The Pipeline
4.1.管线
A Pipeline forms the basic construct for a Jet application. Processing within a pipeline follows these steps:
一个管道构成了Jet应用程序的基本结构。管道内的处理遵循以下步骤:。
- read data from a source
- transform the data
- write data into a sink
For our application, the pipeline will read from a distributed List, apply the transformation of grouping and aggregation and finally write to a distributed Map.
对于我们的应用,管道将从分布式List读取,应用分组和聚合的转换,最后写入分布式Map。
Here’s how we write our pipeline:
下面是我们如何编写我们的管道。
private Pipeline createPipeLine() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list(LIST_NAME))
.flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.writeTo(Sinks.map(MAP_NAME));
return p;
}
Once we’ve read from the source, we traverse the data and split it around the space using a regular expression. After that, we filter out the blanks.
一旦我们从源文件中读出,我们就会遍历数据,并使用正则表达式在空格周围分割数据。之后,我们过滤掉空白的部分。
Finally, we group the words, aggregate them and write the results to a Map.
最后,我们对这些词进行分组、汇总,并将结果写入一个Map。
4.2. The Job
4.2.工作
Now that our pipeline is defined, we create a job for executing the pipeline.
现在我们的管道已经定义好了,我们创建一个工作来执行管道。
Here’s how we write a countWord function which accepts parameters and returns the count:
下面是我们如何写一个countWord函数,它接受参数并返回计数。
public Long countWord(List<String> sentences, String word) {
long count = 0;
JetInstance jet = Jet.newJetInstance();
try {
List<String> textList = jet.getList(LIST_NAME);
textList.addAll(sentences);
Pipeline p = createPipeLine();
jet.newJob(p).join();
Map<String, Long> counts = jet.getMap(MAP_NAME);
count = counts.get(word);
} finally {
Jet.shutdownAll();
}
return count;
}
We create a Jet instance first in order to create our job and use the pipeline. Next, we copy the input List to a distributed list so that it’s available over all the instances.
我们首先创建一个Jet实例,以创建我们的工作并使用管道。接下来,我们将输入的List复制到一个分布式列表中,这样它就可以在所有的实例中使用。
We then submit a job using the pipeline that we have built above. The method newJob() returns an executable job that is started by Jet asynchronously. The join method waits for the job to complete and throws an exception if the job is completed with an error.
然后,我们使用上面建立的管道提交一个作业。方法newJob() 返回一个由Jet异步启动的可执行作业。join方法等待作业完成,如果作业完成后出现错误,则抛出一个exception。
When the job completes the results are retrieved in a distributed Map, as we defined in our pipeline. So, we get the Map from the Jet instance and get the counts of the word against it.
当工作完成后,结果被检索到一个分布式Map中,正如我们在管道中定义的那样。因此,我们从Jet实例中获得Map,并对照它获得单词的计数。
Lastly, we shut down the Jet instance. It is important to shut it down after our execution has ended, as Jet instance starts its own threads. Otherwise, our Java process will still be alive even after our method has exited.
最后,我们关闭Jet实例。在我们的执行结束后关闭它是很重要的,因为Jet实例启动它自己的线程。否则,即使在我们的方法退出后,我们的Java进程仍将活着。
Here is a unit test that tests the code we have written for Jet:
这里有一个单元测试,测试我们为Jet编写的代码。
@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
List<String> sentences = new ArrayList<>();
sentences.add("The first second was alright, but the second second was tough.");
WordCounter wordCounter = new WordCounter();
long countSecond = wordCounter.countWord(sentences, "second");
assertEquals(3, countSecond);
}
5. Conclusion
5.总结
In this article, we’ve learned about Hazelcast Jet. To learn more about it and its features, refer to the manual.
在这篇文章中,我们已经了解了Hazelcast Jet。要了解更多关于它和它的功能,请参考手册。
As usual, the code for the examples used in this article can be found over on Github.
像往常一样,本文中使用的例子的代码可以在Github上找到over。