Intro to Apache Storm – 阿帕奇风暴介绍

最后修改: 2018年 10月 25日

中文/混合/英文(键盘快捷键:t)

 

1. Overview

1.概述

This tutorial will be an introduction to Apache Storm, a distributed real-time computation system.

本教程将介绍Apache Storm一个分布式实时计算系统。

We’ll focus on and cover:

我们将重点关注和报道。

  • What exactly is Apache Storm and what problems it solves
  • Its architecture, and
  • How to use it in a project

2. What Is Apache Storm?

2.什么是Apache Storm?

Apache Storm is free and open source distributed system for real-time computations.

Apache Storm是用于实时计算的免费和开源的分布式系统。

It provides fault-tolerance, scalability, and guarantees data processing, and is especially good at processing unbounded streams of data. 

它提供容错性、可扩展性,并保证数据处理,尤其擅长处理无界数据流。

Some good use cases for Storm can be processing credit card operations for fraud detection or processing data from smart homes to detect faulty sensors.

Storm的一些好的用例可以是处理信用卡业务以进行欺诈检测,或处理来自智能家居的数据以检测有问题的传感器。

Storm allows integration with various databases and queuing systems available on the market.

Storm允许与市场上的各种数据库和排队系统集成。

3. Maven Dependency

3.Maven的依赖性

Before we use Apache Storm, we need to include the storm-core dependency in our project:

在使用Apache Storm之前,我们需要在我们的项目中包括storm-core依赖项

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

We should only use the provided scope if we intend to run our application on the Storm cluster.

如果我们打算在Storm集群上运行我们的应用程序,我们应该只使用提供的范围

To run the application locally, we can use a so-called local mode that will simulate the Storm cluster in a local process, in such case we should remove the provided.

为了在本地运行应用程序,我们可以使用一个所谓的本地模式,它将在本地进程中模拟Storm集群,在这种情况下,我们应该删除provided.

4. Data Model

4.数据模型

Apache Storm’s data model consists of two elements: tuples and streams.

Apache Storm的数据模型由两个元素组成:图元和流。

4.1. Tuple

4.1.元组

Tuple is an ordered list of named fields with dynamic types. This means that we don’t need to explicitly declare the types of the fields.

一个元组是一个具有动态类型的命名字段的有序列表。这意味着我们不需要明确声明字段的类型。

Storm needs to know how to serialize all values that are used in a tuple. By default, it can already serialize primitive types, Strings and byte arrays.

Storm需要知道如何序列化一个元组中使用的所有值。默认情况下,它已经可以序列化原始类型、字符串字节数组。

And since Storm uses Kryo serialization, we need to register the serializer using Config to use the custom types. We can do this in one of two ways:

由于Storm使用Kryo序列化,我们需要使用Config注册序列化器以使用自定义类型。我们可以通过以下两种方式之一来实现这一点。

First, we can register the class to serialize using its full name:

首先,我们可以用全名来注册要序列化的类。

Config config = new Config();
config.registerSerialization(User.class);

In such a case, Kryo will serialize the class using FieldSerializerBy default, this will serialize all non-transient fields of the class, both private and public.

在这种情况下,Kryo将使用FieldSerializer来序列化该类。默认情况下,这将序列化该类的所有非瞬时字段,包括私有和公共字段。

Or instead, we can provide both the class to serialize and the serializer we want Storm to use for that class:

或者相反,我们可以同时提供要序列化的类和我们希望Storm对该类使用的序列化器。

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

To create the custom serializer, we need to extend the generic class Serializer that has two methods write and read.

为了创建自定义的序列化器,我们需要扩展通用类Serializer,它有两个方法writeread

4.2. Stream

4.2.溪流

Stream is the core abstraction in the Storm ecosystem. The Stream is an unbounded sequence of tuples.

Stream是Storm生态系统中的核心抽象概念。Stream是一个无界的图元序列。

Storms allows processing multiple streams in parallel.

Storms允许并行处理多个数据流。

Every stream has an id that is provided and assigned during declaration.

每个流都有一个ID,在声明时提供和分配。

5. Topology

5.拓扑结构

The logic of the real-time Storm application is packaged into the topology. The topology consists of spouts and bolts.

实时风暴应用程序的逻辑被打包到拓扑结构中。拓扑结构由spoutsbolts组成。

5.1. Spout

5.1. 水口

Spouts are the sources of the streams. They emit tuples to the topology.

Spouts是流的来源。它们向拓扑结构发射图元。

Tuples can be read from various external systems like Kafka, Kestrel or ActiveMQ.

图元可以从各种外部系统读取,如Kafka、Kestrel或ActiveMQ。

Spouts can be reliable or unreliable. Reliable means that the spout can reply that the tuple that has failed to be processed by Storm. Unreliable means that the spout doesn’t reply since it is going to use a fire-and-forget mechanism to emit the tuples.

喷口可以是可靠的不可靠的可靠的意味着spout可以回复那个已经被Storm处理失败的元组。不可靠意味着spout不会回复,因为它要使用fire-and-forget的机制来发出图元。

To create the custom spout, we need to implement the IRichSpout interface or extend any class that already implements the interface, for example, an abstract BaseRichSpout class.

为了创建自定义的喷口,我们需要实现IRichSpout接口或扩展任何已经实现了该接口的类,例如,一个抽象的BaseRichSpout类。

Let’s create an unreliable spout:

让我们创造一个不可靠的水口。

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

Our custom RandomIntSpout will generate random integer and timestamp every second.

我们自定义的RandomIntSpout将每秒生成随机整数和时间戳。

5.2. Bolt

5.2.螺栓

Bolts process tuples in the stream. They can perform various operations like filtering, aggregations or custom functions.

螺栓处理流中的图元。它们可以执行各种操作,如过滤、聚合或自定义函数。

Some operations require multiple steps, and thus we will need to use multiple bolts in such cases.

有些操作需要多个步骤,因此在这种情况下我们需要使用多个螺栓。

To create the custom Bolt, we need to implement IRichBolt or for simpler operations IBasicBolt interface.

为了创建自定义的Bolt,我们需要实现IRichBolt,或者为了更简单的操作IBasicBolt接口。

There are also multiple helper classes available for implementing Bolt. In this case, we’ll use BaseBasicBolt:

也有多个辅助类可用于实现Bolt。在这种情况下,我们将使用BaseBasicBolt

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

This custom PrintingBolt will simply print all tuples to the console.

这个自定义的PrintingBolt将简单地打印所有图元到控制台。

6. Creating a Simple Topology

6.创建一个简单的拓扑结构

Let’s put these ideas together into a simple topology. Our topology will have one spout and three bolts.

让我们把这些想法组合成一个简单的拓扑结构。我们的拓扑结构将有一个水口和三个螺栓。

6.1. RandomNumberSpout

6.1.随机数字喷头

In the beginning, we’ll create an unreliable spout. It will generate random integers from the range (0,100) every second:

在开始时,我们将创建一个不可靠的水口。它将每秒从(0,100)范围内生成随机整数。

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2. FilteringBolt

6.2 FilteringBolt

Next, we’ll create a bolt that will filter out all elements with operation equal to 0:

接下来,我们将创建一个螺栓,将过滤掉所有operation等于0的元素。

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3. AggregatingBolt

6.3.聚合螺栓

Next, let’s create a more complicated Bolt that will aggregate all positive operations from each day.

接下来,让我们创建一个更复杂的Bolt,它将汇总每一天的所有正面操作。

For this purpose, we’ll use a specific class created especially for implementing bolts that operate on windows instead of operating on single tuples: BaseWindowedBolt.

为此,我们将使用一个专门为实现在窗口上操作而不是在单个图元上操作的螺栓而创建的特殊类。BaseWindowedBolt

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Windows是流处理的一个基本概念,它将无限的流分割成有限的块。然后我们可以对每个块进行计算。通常有两种类型的窗口。

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

时间窗口用于使用时间戳对特定时间段的元素进行分组。时间窗口可以有不同数量的元素。

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

计数窗口用于创建具有定义尺寸的窗口。在这种情况下,所有的窗口都会有相同的尺寸,如果元素少于定义的尺寸,窗口将不会被发射出来

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

我们的AggregatingBolt将生成一个时间窗口的所有正操作的总和,以及其开始和结束的时间戳。

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

Note that, in this case, getting the first element of the list directly is safe. That’s because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

请注意,在这种情况下,直接获取列表的第一个元素是安全的。这是因为每个窗口都是使用Tuple的timestamp 字段来计算的,所以每个窗口中必须有至少一个元素。

6.4. FileWritingBolt

6.4.文件写入螺栓

Finally, we’ll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

最后,我们将创建一个螺栓,将所有sumOfOperations大于2000的元素,将它们序列化并写入文件中。

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

Note that we don’t need to declare the output as this will be the last bolt in our topology

注意,我们不需要声明输出,因为这将是我们拓扑结构中的最后一个螺栓

6.5. Running the Topology

6.5.运行拓扑结构

Finally, we can pull everything together and run our topology:

最后,我们可以把所有东西拉到一起,运行我们的拓扑结构。

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

为了使数据流经拓扑结构中的每一块,我们需要指出如何连接它们。shuffleGroup允许我们说明filteringBolt的数据将来自randomNumberSpout

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt.  The source of elements may be a Spout or another Bolt. And if we set the same source for more than one boltthe source will emit all elements to each of them.

对于每个螺栓,我们需要添加shuffleGroup,它定义了这个螺栓的元素来源。元素的来源可以是一个Spout或另一个螺栓。如果我们为多个螺栓设置相同的源,源将向每个螺栓发射所有的元素。

In this case, our topology will use the LocalCluster to run the job locally.

在这种情况下,我们的拓扑结构将使用LocalCluster来本地运行作业。

7. Conclusion

7.结语

In this tutorial, we introduced Apache Storm, a distributed real-time computation system. We created a spout, some bolts, and pulled them together into a complete topology.

在本教程中,我们介绍了Apache Storm,一个分布式实时计算系统。我们创建了一个喷口,一些螺栓,并把它们拉到一起,形成一个完整的拓扑结构。

And, as always, all the code samples can be found over on GitHub.

而且,像往常一样,所有的代码样本都可以在GitHub上找到超过