Building a Data Pipeline with Flink and Kafka – 用Flink和Kafka构建数据管道

最后修改: 2018年 9月 15日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Flink is a stream processing framework that can be used easily with Java. Apache Kafka is a distributed stream processing system supporting high fault-tolerance.

Apache Flink是一个流处理框架,可以方便地与Java一起使用。Apache Kafka是一个分布式流处理系统,支持高容错。

In this tutorial, we-re going to have a look at how to build a data pipeline using those two technologies.

在本教程中,我们将看看如何使用这两种技术建立一个数据管道。

2. Installation

2.安装

To install and configure Apache Kafka, please refer to the official guide. After installing, we can use the following commands to create the new topics called flink_input and flink_output:

要安装和配置Apache Kafka,请参考官方指南>。安装后,我们可以使用以下命令来创建名为flink_inputflink_output的新主题:

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_output

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_input

For the sake of this tutorial, we’ll use default configuration and default ports for Apache Kafka.

在本教程中,我们将使用Apache Kafka的默认配置和默认端口。

3. Flink Usage

3.Flink的使用

Apache Flink allows a real-time stream processing technology. The framework allows using multiple third-party systems as stream sources or sinks.

Apache Flink是一种实时流处理技术。该框架允许使用多个第三方系统作为流源或流汇

In Flink – there are various connectors available :

在Flink中,有各种可用的连接器。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

To add Flink to our project, we need to include the following Maven dependencies :

为了在我们的项目中加入Flink,我们需要包括以下Maven依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.5.0</version>
</dependency>

Adding those dependencies will allow us to consume and produce to and from Kafka topics. You can find the current version of Flink on Maven Central.

添加这些依赖关系将使我们能够向Kafka主题消费和生产。您可以在Maven Central上找到Flink的当前版本。

4. Kafka String Consumer

4.Kafka字符串消费者

To consume data from Kafka with Flink we need to provide a topic and a Kafka address. We should also provide a group id which will be used to hold offsets so we won’t always read the whole data from the beginning.

为了用Flink从Kafka中获取数据,我们需要提供一个主题和一个Kafka地址。我们还应该提供一个组的id,它将被用来保存偏移量,这样我们就不会总是从头开始读取整个数据。

Let’s create a static method that will make the creation of FlinkKafkaConsumer easier:

让我们创建一个静态方法,使创建FlinkKafkaConsumer的过程更加轻松。

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
  String topic, String kafkaAddress, String kafkaGroup ) {
 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id",kafkaGroup);
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
      topic, new SimpleStringSchema(), props);

    return consumer;
}

This method takes a topic, kafkaAddress, and kafkaGroup and creates the FlinkKafkaConsumer that will consume data from given topic as a String since we have used SimpleStringSchema to decode data.

这个方法接收topic、kafkaAddress、kafkaGroup,并创建FlinkKafkaConsumer,它将以String的形式消耗来自给定主题的数据,因为我们已经使用SimpleStringSchema来解码数据。

The number 011 in the name of class refers to the Kafka version.

类的名称中的数字011是指Kafka的版本。

5. Kafka String Producer

5 Kafka字符串生产者

To produce data to Kafka, we need to provide Kafka address and topic that we want to use. Again, we can create a static method that will help us to create producers for different topics:

为了向Kafka生产数据,我们需要提供Kafka地址和我们想要使用的主题。同样,我们可以创建一个静态方法,帮助我们为不同的主题创建生产商。

public static FlinkKafkaProducer011<String> createStringProducer(
  String topic, String kafkaAddress){

    return new FlinkKafkaProducer011<>(kafkaAddress,
      topic, new SimpleStringSchema());
}

This method takes only topic and kafkaAddress as arguments since there’s no need to provide group id when we are producing to Kafka topic.

这个方法只接受topickafkaAddress作为参数,因为当我们向Kafka主题生产时,不需要提供组ID。

6. String Stream Processing

6.字符串流处理

When we have a fully working consumer and producer, we can try to process data from Kafka and then save our results back to Kafka. The full list of functions that can be used for stream processing can be found here.

当我们有一个完全工作的消费者和生产者时,我们可以尝试从Kafka处理数据,然后将结果保存回Kafka。可用于流处理的函数的完整列表可以在这里找到。

In this example, we’re going to capitalize words in each Kafka entry and then write it back to Kafka.

在这个例子中,我们要把每个Kafka条目中的单词大写,然后写回给Kafka。

For this purpose we need to create a custom MapFunction:

为此,我们需要创建一个自定义的MapFunction

public class WordsCapitalizer implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toUpperCase();
    }
}

After creating the function, we can use it in stream processing:

创建函数后,我们可以在流处理中使用它。

public static void capitalize() {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String address = "localhost:9092";
    StreamExecutionEnvironment environment = StreamExecutionEnvironment
      .getExecutionEnvironment();
    FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
    DataStream<String> stringInputStream = environment
      .addSource(flinkKafkaConsumer);

    FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
      outputTopic, address);

    stringInputStream
      .map(new WordsCapitalizer())
      .addSink(flinkKafkaProducer);
}

The application will read data from the flink_input topic, perform operations on the stream and then save the results to the flink_output topic in Kafka.

应用程序将从flink_input主题读取数据,对流进行操作,然后将结果保存到Kafka中的flink_output主题。

We’ve seen how to deal with Strings using Flink and Kafka. But often it’s required to perform operations on custom objects. We’ll see how to do this in the next chapters.

我们已经看到了如何使用Flink和Kafka来处理字符串。但经常需要对自定义对象进行操作。我们将在接下来的章节中看到如何做到这一点。

7. Custom Object Deserialization

7.自定义对象反序列化

The following class represents a simple message with information about sender and recipient:

下面的类代表了一个简单的信息,其中有关于发送者和接收者的信息。

@JsonSerialize
public class InputMessage {
    String sender;
    String recipient;
    LocalDateTime sentAt;
    String message;
}

Previously, we were using SimpleStringSchema to deserialize messages from Kafka, but now we want to deserialize data directly to custom objects.

之前,我们使用SimpleStringSchema来反序列化Kafka的消息,但现在我们想直接反序列化数据到自定义对象

To do this, we need a custom DeserializationSchema:

要做到这一点,我们需要一个自定义的解序列化模式:

public class InputMessageDeserializationSchema implements
  DeserializationSchema<InputMessage> {

    static ObjectMapper objectMapper = new ObjectMapper()
      .registerModule(new JavaTimeModule());

    @Override
    public InputMessage deserialize(byte[] bytes) throws IOException {
        return objectMapper.readValue(bytes, InputMessage.class);
    }

    @Override
    public boolean isEndOfStream(InputMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation<InputMessage> getProducedType() {
        return TypeInformation.of(InputMessage.class);
    }
}

We are assuming here that the messages are held as JSON in Kafka.

我们在这里假设消息是以JSON格式保存在Kafka中。

Since we have a field of type LocalDateTime, we need to specify the JavaTimeModule, which takes care of mapping LocalDateTime objects to JSON.

由于我们有一个LocalDateTime类型的字段,我们需要指定JavaTimeModule,它负责将LocalDateTime对象映射到JSON。

Flink schemas can’t have fields that aren’t serializable because all operators (like schemas or functions) are serialized at the start of the job.

Flink模式不能有不可序列化的字段,因为所有的操作者(如模式或函数)都在工作开始时被序列化。

There are similar issues in Apache Spark. One of the known fixes for this issue is initializing fields as static, as we did with ObjectMapper above. It isn’t the prettiest solution, but it’s relatively simple and does the job.

Apache Spark中也有类似的问题。这个问题的已知修复方法之一是将字段初始化为static,就像我们在上面的ObjectMapper中所做的那样。这不是最漂亮的解决方案,但它相对简单,并能完成工作。

The method isEndOfStream can be used for the special case when stream should be processed only until some specific data is received. But it isn’t needed in our case.

方法isEndOfStream可以用于特殊情况,即流应该只被处理到某些特定的数据被接收。但在我们的例子中不需要它。

8. Custom Object Serialization

8.自定义对象序列化

Now, let’s assume that we want our system to have a possibility of creating a backup of messages. We want the process to be automatic, and each backup should be composed of messages sent during one whole day.

现在,让我们假设,我们希望我们的系统有可能创建一个信息的备份。我们希望这个过程是自动的,而且每个备份应该由一整天内发送的信息组成。

Also, a backup message should have a unique id assigned.

另外,备份信息应该有一个唯一的ID。

For this purpose, we can create the following class:

为了这个目的,我们可以创建以下类。

public class Backup {
    @JsonProperty("inputMessages")
    List<InputMessage> inputMessages;
    @JsonProperty("backupTimestamp")
    LocalDateTime backupTimestamp;
    @JsonProperty("uuid")
    UUID uuid;

    public Backup(List<InputMessage> inputMessages, 
      LocalDateTime backupTimestamp) {
        this.inputMessages = inputMessages;
        this.backupTimestamp = backupTimestamp;
        this.uuid = UUID.randomUUID();
    }
}

Please mind that the UUID generation mechanism isn’t perfect, as it allows duplicates. However, this is enough for the scope of this example.

请注意,UUID的生成机制并不完美,因为它允许重复。然而,对于这个例子的范围来说,这已经足够了。

We want to save our Backup object as JSON to Kafka, so we need to create our SerializationSchema:

我们想把我们的Backup对象作为JSON保存到Kafka,所以我们需要创建我们的SerializationSchema

public class BackupSerializationSchema
  implements SerializationSchema<Backup> {

    ObjectMapper objectMapper;
    Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);

    @Override
    public byte[] serialize(Backup backupMessage) {
        if(objectMapper == null) {
            objectMapper = new ObjectMapper()
              .registerModule(new JavaTimeModule());
        }
        try {
            return objectMapper.writeValueAsString(backupMessage).getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

9. Timestamping Messages

9.对信息进行时间标记

Since we want to create a backup for all messages of each day, messages need a timestamp.

由于我们想为每天的所有信息创建一个备份,信息需要一个时间戳。

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

Flink提供了三种不同的时间特征EventTime, ProcessingTime, IngestionTime。

In our case, we need to use the time at which the message has been sent, so we’ll use EventTime.

在我们的案例中,我们需要使用消息被发送的时间,所以我们将使用EventTime.

To use EventTime we need a TimestampAssigner which will extract timestamps from our input data:

要使用EventTime 我们需要一个TimestampAssigner,它将从我们的输入数据中提取时间戳

public class InputMessageTimestampAssigner 
  implements AssignerWithPunctuatedWatermarks<InputMessage> {
 
    @Override
    public long extractTimestamp(InputMessage element, 
      long previousElementTimestamp) {
        ZoneId zoneId = ZoneId.systemDefault();
        return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(InputMessage lastElement, 
      long extractedTimestamp) {
        return new Watermark(extractedTimestamp - 1500);
    }
}

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

我们需要将我们的LocalDateTime转换成EpochSecond,因为这是Flink所期望的格式。在分配了时间戳之后,所有基于时间的操作将使用来自sentAt字段的时间来操作。

Since Flink expects timestamps to be in milliseconds and toEpochSecond()  returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

由于Flink希望时间戳的单位是毫秒,而toEpochSecond()返回的时间单位是秒,我们需要将其乘以1000,这样Flink就能正确地创建窗口。

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don’t arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Flink定义了水印的概念。水印在数据没有按照发送顺序到达的情况下非常有用。水印定义了允许元素被处理的最大延迟时间。

Elements that have timestamps lower than the watermark won’t be processed at all.

时间戳低于水印的元素将完全不被处理。

10. Creating Time Windows

10.创建时间窗口

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

为了确保我们的备份只收集一天内发送的消息,我们可以在流上使用timeWindowAll方法,这将把消息分成几个窗口。

However, we’ll still need to aggregate messages from each window and return them as Backup.

然而,我们仍然需要汇总每个窗口的信息,并将其作为备份返回。

To do this, we’ll need a custom AggregateFunction:

要做到这一点,我们需要一个自定义的AggregateFunction

public class BackupAggregator 
  implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
 
    @Override
    public List<InputMessage> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<InputMessage> add(
      InputMessage inputMessage,
      List<InputMessage> inputMessages) {
        inputMessages.add(inputMessage);
        return inputMessages;
    }

    @Override
    public Backup getResult(List<InputMessage> inputMessages) {
        return new Backup(inputMessages, LocalDateTime.now());
    }

    @Override
    public List<InputMessage> merge(List<InputMessage> inputMessages,
      List<InputMessage> acc1) {
        inputMessages.addAll(acc1);
        return inputMessages;
    }
}

11. Aggregating Backups

11.聚集备份

After assigning proper timestamps and implementing our AggregateFunction, we can finally take our Kafka input and process it:

在分配了适当的时间戳并实现了我们的AggregateFunction之后,我们终于可以接受我们的Kafka输入并处理它了。

public static void createBackup () throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "192.168.99.100:9092";
    StreamExecutionEnvironment environment
      = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
      = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
    flinkKafkaConsumer.setStartFromEarliest();

    flinkKafkaConsumer.assignTimestampsAndWatermarks(
      new InputMessageTimestampAssigner());
    FlinkKafkaProducer011<Backup> flinkKafkaProducer
      = createBackupProducer(outputTopic, kafkaAddress);

    DataStream<InputMessage> inputMessagesStream
      = environment.addSource(flinkKafkaConsumer);

    inputMessagesStream
      .timeWindowAll(Time.hours(24))
      .aggregate(new BackupAggregator())
      .addSink(flinkKafkaProducer);

    environment.execute();
}

12. Conclusion

12.结论

In this article, we’ve presented how to create a simple data pipeline with Apache Flink and Apache Kafka.

在这篇文章中,我们已经介绍了如何用Apache Flink和Apache Kafka创建一个简单的数据管道。

As always, the code can be found over on Github.

一如既往,代码可以在Github上找到over