Introduction to Apache Flink with Java – 用Java介绍Apache Flink

最后修改: 2017年 4月 17日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Flink is a Big Data processing framework that allows programmers to process the vast amount of data in a very efficient and scalable manner.

Apache Flink是一个大数据处理框架,允许程序员以非常高效和可扩展的方式处理大量的数据。

In this article, we’ll introduce some of the core API concepts and standard data transformations available in the Apache Flink Java API. The fluent style of this API makes it easy to work with Flink’s central construct – the distributed collection.

在本文中,我们将介绍Apache FlinkJava API 中可用的一些核心 API 概念和标准数据转换。该API的流畅风格使我们能够轻松地使用Flink的中心结构–分布式集合。

First, we will take a look at Flink’s DataSet API transformations and use them to implement a word count program. Then we will take a brief look at Flink’s DataStream API, which allows you to process streams of events in a real-time fashion.

首先,我们将看一下Flink的DataSet API转换,并使用它们来实现一个字数统计程序。然后,我们将简要了解Flink的DataStream API,它允许你以实时的方式处理事件流。

2. Maven Dependency

2.Maven的依赖性

To get started we’ll need to add Maven dependencies to flink-java and flink-test-utils libraries:

为了开始工作,我们需要为flink-javaflink-test-utils库添加Maven依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils_2.10</artifactId>
    <version>1.2.0</version>
    <scope>test<scope>
</dependency>

3. Core API Concepts

3.核心API概念

When working with Flink, we need to know couple things related to its API:

在使用Flink时,我们需要了解与它的API相关的一些事情。

  • Every Flink program performs transformations on distributed collections of data. A variety functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating
  • A sink operation in Flink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to the standard output
  • Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked
  • The Apache Flink API supports two modes of operations — batch and real-time. If you are dealing with a limited data source that can be processed in batch mode, you will use the DataSet API. Should you want to process unbounded streams of data in real-time, you would need to use the DataStream API

4. DataSet API Transformations

4.数据集API的转换

The entry point to the Flink program is an instance of the ExecutionEnvironment class — this defines the context in which a program is executed.

Flink程序的入口点是ExecutionEnvironment类的一个实例–它定义了程序的执行环境。

Let’s create an ExecutionEnvironment to start our processing:

让我们创建一个ExecutionEnvironment来开始我们的处理。

ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();

Note that when you launch the application on the local machine, it will perform processing on the local JVM. Should you want to start processing on a cluster of machines, you would need to install Apache Flink on those machines and configure the ExecutionEnvironment accordingly.

请注意,当您在本地机器上启动应用程序时,它将在本地 JVM 上执行处理。如果您想在一个机器集群上开始处理,您需要在这些机器上安装Apache Flink并相应地配置ExecutionEnvironment

4.1. Creating a DataSet

4.1.创建一个数据集

To start performing data transformations, we need to supply our program with the data.

为了开始执行数据转换,我们需要向我们的程序提供数据。

Let’s create an instance of the DataSet class using our ExecutionEnvironement:

让我们使用我们的ExecutionEnvironement创建一个DataSet类的实例。

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

You can create a DataSet from multiple sources, such as Apache Kafka, a CSV, file or virtually any other data source.

你可以从多个来源创建一个数据集,例如Apache Kafka、CSV、文件或几乎任何其他数据源。

4.2. Filter and Reduce

4.2.过滤和减少

Once you create an instance of the DataSet class, you can apply transformations to it.

一旦你创建了一个DataSet类的实例,你就可以对它应用转换。

Let’s say that you want to filter numbers that are above a certain threshold and next sum them all. You can use the filter() and reduce() transformations to achieve this:

比方说,你想过滤高于某个阈值的数字,然后把它们全部加起来你可以使用filter()reduce()转换来实现这个目的。

int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();

assertThat(collect.get(0)).isEqualTo(90);

Note that the collect() method is a sink operation that triggers the actual data transformations.

请注意,collect() 方法是一个sink操作,触发了实际的数据转换。

4.3. Map

4.3.地图

Let’s say that you have a DataSet of Person objects:

假设你有一个数据集Person对象。

private static class Person {
    private int age;
    private String name;

    // standard constructors/getters/setters
}

Next, let’s create a DataSet of these objects:

接下来,让我们为这些对象创建一个数据集

DataSet<Person> personDataSource = env.fromCollection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));

Suppose that you want to extract only the age field from every object of the collection. You can use the map() transformation to get only a specific field of the Person class:

假设你想从集合的每个对象中只提取年龄字段。你可以使用map() 转换来只获取Person 类的一个特定字段。

List<Integer> ages = personDataSource
  .map(p -> p.age)
  .collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

4.4. Join

4.4.加入

When you have two datasets, you may want to join them on some id field. For this, you can use the join() transformation.

当你有两个数据集时,你可能想在某个id字段上连接它们。为此,你可以使用join()转换。

Let’s create collections of transactions and addresses of a user:

让我们创建一个用户的交易和地址的集合。

Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

The first field in both tuples is of an Integer type, and this is an id field on which we want to join both data sets.

两个图元中的第一个字段都是Integer类型,这是一个id字段,我们想在此基础上连接两个数据集。

To perform the actual joining logic, we need to implement a KeySelector interface for address and transaction:

为了执行实际的连接逻辑,我们需要为地址和事务实现一个KeySelector接口。

private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    @Override
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;
    }
}

private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    @Override
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;
    }
}

Each selector is only returning the field on which the join should be performed.

每个选择器只返回应该被执行连接的字段。

Unfortunately, it’s not possible to use lambda expressions here because Flink needs generic type info.

不幸的是,这里不可能使用lambda表达式,因为Flink需要通用类型信息。

Next, let’s implement merging logic using those selectors:

接下来,让我们使用这些选择器实现合并逻辑。

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5. Sort

4.5.排序

Let’s say that you have the following collection of Tuple2:

假设你有以下Tuple2:的集合。

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transformation:

如果你想通过元组的第一个字段对这个集合进行排序,你可以使用sortPartitions() 转换。

List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();

assertThat(sorted)
  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. Word Count

5.字数

The word count problem is one that is commonly used to showcase the capabilities of Big Data processing frameworks. The basic solution involves counting word occurrences in a text input. Let’s use Flink to implement a solution to this problem.

字数统计问题是一个通常用来展示大数据处理框架能力的问题。其基本解决方案涉及计算文本输入中的单词出现次数。让我们使用Flink来实现这个问题的解决方案。

As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting for each token a Tuple2 of key-values pairs. In each of these tuples, the key is a word found in the text, and the value is the integer one (1).

作为我们解决方案的第一步,我们创建了一个LineSplitter类,它将我们的输入分成了标记(单词),为每个标记收集一个Tuple2的键值对。在每一个图元中,键是文本中的一个词,而值是整数1(1)。

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2<String, Integer>:

这个类实现了FlatMapFunction接口,它接受String作为输入,并产生Tuple2<String, Integer>:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        Stream.of(value.toLowerCase().split("\\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));
    }
}

We call the collect() method on the Collector class to push data forward in the processing pipeline.

我们在Collector类上调用collect() 方法,在处理管道中向前推送数据。

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

我们的下一步也是最后一步是按第一元素(单词)对图元进行分组,然后对第二元素进行sum聚合,产生一个单词出现的计数。

public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

我们正在使用三种类型的Flink转换。flatMap()groupBy()aggregate()

Let’s write a test to assert that the word count implementation is working as expected:

让我们写一个测试来断言字数统计的实现是按预期工作的。

List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
 
assertThat(collect).containsExactlyInAnyOrder(
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. DataStream API

6.数据流API

6.1. Creating a DataStream

6.1.创建一个DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

Apache Flink还支持通过其DataStream API来处理事件流。如果我们想开始消费事件,我们首先需要使用StreamExecutionEnvironment类。

StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

接下来,我们可以使用executionEnvironment从各种来源创建一个事件流。它可以是一些消息总线,如Apache Kafka,但在这个例子中,我们将简单地从几个字符串元素中创建一个源。

DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

我们可以对DataStream的每个元素应用转换,就像在普通的DataSet类中一样。

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

为了触发执行,我们需要调用一个水槽操作,比如print(),它将只是把转换的结果打印到标准输出,随后在StreamExecutionEnvironment类上使用execute()方法。

upperCase.print();
env.execute();

It will produce the following output:

它将产生以下输出。

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

6.2.事件的窗口化

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

在实时处理事件流时,你有时可能需要将事件分组,并对这些事件的一个窗口应用一些计算。

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

假设我们有一个事件流,每个事件都是由事件编号和事件被发送到我们系统的时间戳组成的一对,我们可以容忍事件的失序,但前提是它们晚了不超过20秒。

For this example, let’s first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

在这个例子中,让我们首先创建一个流,模拟两个相隔几分钟的事件,并定义一个时间戳提取器,指定我们的迟到阈值。

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
 
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;
        }
    });

Next, let’s define a window operation to group our events into five-second windows and apply a transformation on those events:

接下来,让我们定义一个窗口操作,将我们的事件分组为五秒钟的窗口,并在这些事件上应用转换。

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();

It will get the last element of every five-second window, so it prints out:

它将获得每个五秒钟窗口的最后一个元素,所以它打印出来。

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

请注意,我们没有看到第二个事件,因为它的到达时间晚于指定的迟到阈值。

7. Conclusion

7.结论

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

在这篇文章中,我们介绍了Apache Flink框架,并看了其API提供的一些转换。

We implemented a word count program using Flink’s fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

我们使用Flink的流畅和功能性的DataSet API实现了一个字数统计程序。然后我们研究了DataStream API,并在事件流上实现了一个简单的实时转换。

The implementation of all these examples and code snippets can be found over on GitHub – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub上找到over–这是一个Maven项目,所以应该很容易导入并按原样运行。