Introduction to Apache Flink with Java – 用Java介绍Apache Flink

最后修改: 2017年 4月 17日


1. Overview


Apache Flink is a Big Data processing framework that allows programmers to process the vast amount of data in a very efficient and scalable manner.

Apache Flink是一个大数据处理框架,允许程序员以非常高效和可扩展的方式处理大量的数据。

In this article, we’ll introduce some of the core API concepts and standard data transformations available in the Apache Flink Java API. The fluent style of this API makes it easy to work with Flink’s central construct – the distributed collection.

在本文中,我们将介绍Apache FlinkJava API 中可用的一些核心 API 概念和标准数据转换。该API的流畅风格使我们能够轻松地使用Flink的中心结构–分布式集合。

First, we will take a look at Flink’s DataSet API transformations and use them to implement a word count program. Then we will take a brief look at Flink’s DataStream API, which allows you to process streams of events in a real-time fashion.

首先,我们将看一下Flink的DataSet API转换,并使用它们来实现一个字数统计程序。然后,我们将简要了解Flink的DataStream API,它允许你以实时的方式处理事件流。

2. Maven Dependency


To get started we’ll need to add Maven dependencies to flink-java and flink-test-utils libraries:



3. Core API Concepts


When working with Flink, we need to know couple things related to its API:


  • Every Flink program performs transformations on distributed collections of data. A variety functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating
  • A sink operation in Flink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to the standard output
  • Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked
  • The Apache Flink API supports two modes of operations — batch and real-time. If you are dealing with a limited data source that can be processed in batch mode, you will use the DataSet API. Should you want to process unbounded streams of data in real-time, you would need to use the DataStream API

4. DataSet API Transformations


The entry point to the Flink program is an instance of the ExecutionEnvironment class — this defines the context in which a program is executed.


Let’s create an ExecutionEnvironment to start our processing:


ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();

Note that when you launch the application on the local machine, it will perform processing on the local JVM. Should you want to start processing on a cluster of machines, you would need to install Apache Flink on those machines and configure the ExecutionEnvironment accordingly.

请注意,当您在本地机器上启动应用程序时,它将在本地 JVM 上执行处理。如果您想在一个机器集群上开始处理,您需要在这些机器上安装Apache Flink并相应地配置ExecutionEnvironment

4.1. Creating a DataSet


To start performing data transformations, we need to supply our program with the data.


Let’s create an instance of the DataSet class using our ExecutionEnvironement:


DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

You can create a DataSet from multiple sources, such as Apache Kafka, a CSV, file or virtually any other data source.

你可以从多个来源创建一个数据集,例如Apache Kafka、CSV、文件或几乎任何其他数据源。

4.2. Filter and Reduce


Once you create an instance of the DataSet class, you can apply transformations to it.


Let’s say that you want to filter numbers that are above a certain threshold and next sum them all. You can use the filter() and reduce() transformations to achieve this:


int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)


Note that the collect() method is a sink operation that triggers the actual data transformations.

请注意,collect() 方法是一个sink操作,触发了实际的数据转换。

4.3. Map


Let’s say that you have a DataSet of Person objects:


private static class Person {
    private int age;
    private String name;

    // standard constructors/getters/setters

Next, let’s create a DataSet of these objects:


DataSet<Person> personDataSource = env.fromCollection(
    new Person(23, "Tom"),
    new Person(75, "Michael")));

Suppose that you want to extract only the age field from every object of the collection. You can use the map() transformation to get only a specific field of the Person class:

假设你想从集合的每个对象中只提取年龄字段。你可以使用map() 转换来只获取Person 类的一个特定字段。

List<Integer> ages = personDataSource
  .map(p -> p.age)

assertThat(ages).contains(23, 75);

4.4. Join


When you have two datasets, you may want to join them on some id field. For this, you can use the join() transformation.


Let’s create collections of transactions and addresses of a user:


Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

The first field in both tuples is of an Integer type, and this is an id field on which we want to join both data sets.


To perform the actual joining logic, we need to implement a KeySelector interface for address and transaction:


private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;

private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;

Each selector is only returning the field on which the join should be performed.


Unfortunately, it’s not possible to use lambda expressions here because Flink needs generic type info.


Next, let’s implement merging logic using those selectors:


List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())

assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5. Sort


Let’s say that you have the following collection of Tuple2:


Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transformation:

如果你想通过元组的第一个字段对这个集合进行排序,你可以使用sortPartitions() 转换。

List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)

  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. Word Count


The word count problem is one that is commonly used to showcase the capabilities of Big Data processing frameworks. The basic solution involves counting word occurrences in a text input. Let’s use Flink to implement a solution to this problem.


As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting for each token a Tuple2 of key-values pairs. In each of these tuples, the key is a word found in the text, and the value is the integer one (1).


This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2<String, Integer>:

这个类实现了FlatMapFunction接口,它接受String作为输入,并产生Tuple2<String, Integer>:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));

We call the collect() method on the Collector class to push data forward in the processing pipeline.

我们在Collector类上调用collect() 方法,在处理管道中向前推送数据。

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:


public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .aggregate(Aggregations.SUM, 1);

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().


Let’s write a test to assert that the word count implementation is working as expected:


List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. DataStream API


6.1. Creating a DataStream


Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

Apache Flink还支持通过其DataStream API来处理事件流。如果我们想开始消费事件,我们首先需要使用StreamExecutionEnvironment类。

StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

接下来,我们可以使用executionEnvironment从各种来源创建一个事件流。它可以是一些消息总线,如Apache Kafka,但在这个例子中,我们将简单地从几个字符串元素中创建一个源。

DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:


SingleOutputStreamOperator<String> upperCase =;

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:



It will produce the following output:



6.2. Windowing of Events


When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.


Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.


For this example, let’s first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:


SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16,,
  new Tuple2<>(15,
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;

Next, let’s define a window operation to group our events into five-second windows and apply a transformation on those events:


SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .maxBy(0, true);

It will get the last element of every five-second window, so it prints out:


1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.


7. Conclusion


In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

在这篇文章中,我们介绍了Apache Flink框架,并看了其API提供的一些转换。

We implemented a word count program using Flink’s fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

我们使用Flink的流畅和功能性的DataSet API实现了一个字数统计程序。然后我们研究了DataStream API,并在事件流上实现了一个简单的实时转换。

The implementation of all these examples and code snippets can be found over on GitHub – this is a Maven project, so it should be easy to import and run as it is.
