A Guide to Apache Crunch – 阿帕奇紧缩指南

最后修改: 2018年 8月 31日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll demonstrate Apache Crunch with an example data processing application. We’ll run this application using the MapReduce framework.

在本教程中,我们将通过一个示例数据处理应用程序来演示Apache Crunch。我们将使用MapReduce框架运行该应用程序。

We’ll start by covering briefly some Apache Crunch concepts. Then we’ll jump into a sample app. In this app we’ll do text processing:

我们将首先简要地介绍一些Apache Crunch的概念。然后,我们将跳到一个示例应用程序。在这个应用程序中,我们将进行文本处理。

  • First of all, we’ll read the lines from a text file
  • Later, we’ll split them into words and remove some common words
  • Then, we’ll group the remaining words to get a list of unique words and their counts
  • Finally, we’ll write this list to a text file

2. What Is Crunch?

2.什么是紧缩?

MapReduce is a distributed, parallel programming framework for processing large amounts of data on a cluster of servers. Software frameworks such as Hadoop and Spark implement MapReduce.

MapReduce是一个分布式的并行编程框架,用于处理服务器集群上的大量数据。Hadoop和Spark等软件框架实现了MapReduce。

Crunch provides a framework for writing, testing and running MapReduce pipelines in Java. Here, we don’t write the MapReduce jobs directly. Rather, we define data pipeline (i.e. the operations to perform input, processing, and output steps) using the Crunch APIs. Crunch Planner maps them to the MapReduce jobs and executes them when needed.

Crunch提供了一个在Java中编写、测试和运行MapReduce管道的框架。在这里,我们不直接编写MapReduce作业。相反,我们使用Crunch APIs定义数据管道(即执行输入、处理和输出步骤的操作)。Crunch Planner将它们映射到MapReduce作业中,并在需要时执行它们。

Therefore, every Crunch data pipeline is coordinated by an instance of the Pipeline interface. This interface also defines methods for reading data into a pipeline via Source instances and writing data out from a pipeline to Target instances.

因此,每个Crunch数据管道都由Pipeline接口的一个实例协调。该接口还定义了通过Source实例将数据读入管道和从管道向Target实例写入数据的方法。

We have 3 interfaces for representing data:

我们有3个代表数据的接口。

  1. PCollection – an immutable, distributed collection of elements
  2. PTable<K, V> – an immutable, distributed, unordered multi-map of keys and values
  3. PGroupedTable<K, V> – a distributed, sorted map of keys of type K to an Iterable V that may be iterated over exactly once

DoFn is the base class for all data processing functions. It corresponds to MapperReducer and Combiner classes in MapReduce. We spend most of the development time writing and testing logical computations using it.

DoFn是所有数据处理函数的基类。它对应于MapReduce中的MapperReducerCombiner类。我们将大部分开发时间用于编写和测试使用它的逻辑计算.

Now that we’re more familiar with Crunch, let’s use it to build the example application.

现在我们对Crunch更加熟悉了,让我们用它来构建这个例子的应用程序。

3. Setting up a Crunch Project

3.建立一个紧缩项目

First of all, let’s set up a Crunch Project with Maven. We can do so in two ways:

首先,让我们用Maven建立一个Crunch项目。我们可以通过两种方式来实现。

  1. Add the required dependencies in the pom.xml file of an existing project
  2. Use an archetype to generate a starter project

Let’s have a quick look at both approaches.

让我们快速了解一下这两种方法。

3.1. Maven Dependencies

3.1.Maven的依赖性

In order to add Crunch to an existing project, let’s add the required dependencies in the pom.xml file.

为了将Crunch添加到一个现有的项目中,让我们在pom.xml文件中添加所需的依赖项。

First, let’s add the crunch-core library:

首先,让我们添加crunch-core库。

<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>0.15.0</version>
</dependency>

Next, let’s add the hadoop-client library to communicate with Hadoop. We use the version matching Hadoop installation:

接下来,让我们添加hadoop-client库,与Hadoop进行通信。我们使用与Hadoop安装相匹配的版本。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>

We can check Maven Central for the latest versions of crunch-core and hadoop-client libraries.

我们可以查看Maven Central,了解crunch-corehadoop-client库的最新版本。

3.2. Maven Archetype

3.2.Maven Archetype

Another approach is to quickly generate a starter project using the Maven archetype provided by Crunch:

另一种方法是使用Crunch提供的Maven原型快速生成一个启动项目

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype

When prompted by the above command, we provide the Crunch version and the project artifact details.

当上述命令提示时,我们提供Crunch的版本和项目工件的细节。

4. Crunch Pipeline Setup

4.紧缩管线设置

After setting up the project, we need to create a Pipeline object. Crunch has 3 Pipeline implementations:

设置好项目后,我们需要创建一个Pipeline对象。Crunch有3个Pipeline实现

  • MRPipeline – executes within Hadoop MapReduce
  • SparkPipeline – executes as a series of Spark pipelines
  • MemPipeline – executes in-memory on the client and is useful for unit testing

Usually, we develop and test using an instance of MemPipeline. Later we use an instance of MRPipeline or SparkPipeline for actual execution.

通常,我们使用MemPipeline的实例进行开发和测试。之后,我们使用MRPipelineSparkPipeline的实例进行实际执行。

If we needed an in-memory pipeline, we could use the static method getInstance to get the MemPipeline instance:

如果我们需要一个内存管道,我们可以使用静态方法getInstance来获取MemPipeline实例。

Pipeline pipeline = MemPipeline.getInstance();

But for now, let’s create an instance of MRPipeline to execute the application with Hadoop:

但现在,让我们创建一个MRPipeline的实例,以便用Hadoop执行应用程序

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Read Input Data

5.读取输入数据

After creating the pipeline object, we want to read input data. The Pipeline interface provides a convenience method to read input from a text filereadTextFile(pathName).

创建完管道对象后,我们要读取输入数据。管道接口提供了一个方便的方法来从文本文件读取输入数据readTextFile(pathName)。

Let’s call this method to read the input text file:

让我们调用这个方法来读取输入的文本文件。

PCollection<String> lines = pipeline.readTextFile(inputPath);

The above code reads the text file as a collection of String.

上述代码将文本文件作为String的集合来读取。

As the next step, let’s write a test case for reading input:

作为下一步,让我们写一个读取输入的测试案例。

@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
    Pipeline pipeline = MemPipeline.getInstance();
    PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);

    assertEquals(21, lines.asCollection()
      .getValue()
      .size());
}

In this test, we verify that we get the expected number of lines when reading a text file.

在这个测试中,我们验证了在读取一个文本文件时,我们是否得到了预期的行数。

6. Data Processing Steps

6.数据处理步骤

After reading the input data, we need to process it. Crunch API contains a number of subclasses of DoFn to handle common data processing scenarios:

读取输入数据后,我们需要对其进行处理。Crunch API包含一些DoFn的子类来处理常见的数据处理场景

  • FilterFn – filters members of a collection based on a boolean condition
  • MapFn – maps each input record  to exactly one output record
  • CombineFn – combines a number of values into a single value
  • JoinFn – performs joins such as inner join, left outer join, right outer join and full outer join

Let’s implement the following data processing logic by using these classes:

让我们通过使用这些类来实现以下数据处理逻辑。

  1. Split each line in the input file into words
  2. Remove the stop words
  3. Count the unique words

6.1. Split a Line of Text Into Words

6.1.将一行文字分割成几个字

First of all, let’s create the Tokenizer class to split a line into words.

首先,让我们创建Tokenizer类,将一行分成几个字。

We’ll extend the DoFn class. This class has an abstract method called process. This method processes the input records from a PCollection and sends the output to an Emitter. 

我们将扩展DoFn类。这个类有一个抽象的方法叫process。这个方法处理来自PCollection的输入记录,并将输出发送到一个Emitter。

We need to implement the splitting logic in this method:

我们需要在这个方法中实现拆分逻辑。

public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter
      .onPattern("\\s+")
      .omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

In the above implementation, we’ve used the Splitter class from Guava library to extract words from a line.

在上面的实现中,我们使用了Guava库中的Splitter类来从一行中提取单词。

Next, let’s write a unit test for the Tokenizer class:

接下来,让我们为Tokenizer类写一个单元测试。

@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
 
    @Mock
    private Emitter<String> emitter;

    @Test
    public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
        Tokenizer splitter = new Tokenizer();
        splitter.process("  hello  world ", emitter);

        verify(emitter).emit("hello");
        verify(emitter).emit("world");
        verifyNoMoreInteractions(emitter);
    }
}

The above test verifies that the correct words are returned.

上述测试验证了返回的单词是正确的。

Finally, let’s split the lines read from the input text file using this class.

最后,让我们用这个类来分割从输入文本文件中读取的行。

The parallelDo method of PCollection interface applies the given DoFn to all the elements and returns a new PCollection.

PCollection接口的parallelDo方法将给定的DoFn应用于所有元素并返回一个新的PCollection

Let’s call this method on the lines collection and pass an instance of Tokenizer:

让我们在行集合上调用这个方法,并传递一个Tokenizer的实例。

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

As a result, we get the list of words in the input text file. We’ll remove the stop words in the next step.

结果是,我们得到了输入文本文件中的单词列表。我们将在下一个步骤中删除停止词。

6.2. Remove Stop Words

6.2.删除停顿词

Similarly to the previous step, let’s create a StopWordFilter class to filter out stop words.

与上一步类似,让我们创建一个StopWordFilter类来过滤掉停止词。

However, we’ll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:

然而,我们将扩展FilterFn而不是DoFnFilterFn有一个抽象的方法叫accept。我们需要在这个方法中实现过滤逻辑。

public class StopWordFilter extends FilterFn<String> {

    // English stop words, borrowed from Lucene.
    private static final Set<String> STOP_WORDS = ImmutableSet
      .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
        "or", "s", "such", "t", "that", "the", "their", "then", "there",
        "these", "they", "this", "to", "was", "will", "with" });

    @Override
    public boolean accept(String word) {
        return !STOP_WORDS.contains(word);
    }
}

Next, let’s write the unit test for StopWordFilter class:

接下来,让我们为StopWordFilter类编写单元测试。

public class StopWordFilterUnitTest {

    @Test
    public void givenFilter_whenStopWordPassed_thenFalseReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertFalse(filter.accept("the"));
        assertFalse(filter.accept("a"));
    }

    @Test
    public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertTrue(filter.accept("Hello"));
        assertTrue(filter.accept("World"));
    }

    @Test
    public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
        PCollection<String> words = MemPipeline
          .collectionOf("This", "is", "a", "test", "sentence");
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        assertEquals(ImmutableList.of("This", "test", "sentence"),
         Lists.newArrayList(noStopWords.materialize()));
    }
}

This test verifies that the filtering logic is performed correctly.

该测试验证了过滤逻辑是否正确执行。

Finally, let’s use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.

最后,让我们使用StopWordFilter来过滤上一步生成的单词列表。PCollection接口的filter方法将给定的FilterFn应用于所有元素并返回一个新的PCollection

Let’s call this method on the words collection and pass an instance of StopWordFilter:

让我们在单词集合上调用这个方法,并传递一个StopWordFilter的实例。

PCollection<String> noStopWords = words.filter(new StopWordFilter());

As a result, we get the filtered collection of words.

结果是,我们得到了过滤后的词语集合。

6.3. Count Unique Words

6.3.计算独特的字数

After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:

在得到过滤后的单词集合后,我们要计算每个单词的出现频率。PCollection接口有许多方法来执行常见的聚合:

  • min – returns the minimum element of the collection
  • max – returns the maximum element of the collection
  • length – returns the number of elements in the collection
  • count – returns a PTable that contains the count of each unique element of the collection

Let’s use the count method to get the unique words along with their counts:

让我们使用count方法来获得独特的词和它们的计数。

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();

7. Specify Output

7.指定输出

As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:

作为前面步骤的结果,我们有一个单词及其计数的表格。我们想把这个结果写到一个文本文件中。Pipeline接口提供了方便的方法来写入输出:

void write(PCollection<?> collection, Target target);

void write(PCollection<?> collection, Target target,
  Target.WriteMode writeMode);

<T> void writeTextFile(PCollection<T> collection, String pathName);

Therefore, let’s call the writeTextFile method:

因此,让我们调用writeTextFile方法。

pipeline.writeTextFile(counts, outputPath);

8. Manage Pipeline Execution

8.管理管道的执行

All the steps so far have just defined the data pipeline. No input has been read or processed.  This is because Crunch uses lazy execution model.

到目前为止,所有的步骤都只是定义了数据管道。没有读取或处理任何输入。 这是因为Crunch使用了懒人执行模式。

It doesn’t run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:

在管道接口上调用控制作业计划和执行的方法之前,它不会运行MapReduce作业。

  • run – prepares an execution plan to create the required outputs and then executes it synchronously
  • done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
  • runAsync – similar to run method, but executes in a non-blocking fashion

Therefore, let’s call the done method to execute the pipeline as MapReduce jobs:

因此,让我们调用done方法,将管道作为MapReduce作业执行。

PipelineResult result = pipeline.done();

The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.

上述语句运行MapReduce作业来读取输入,处理它们并将结果写入输出目录。

9. Putting the Pipeline Together

9.把管道放在一起

So far we have developed and unit tested the logic to read input data, process it and write to the output file.

到目前为止,我们已经开发并单元测试了读取输入数据、处理数据并写到输出文件的逻辑。

Next, let’s put them together to build the entire data pipeline:

接下来,让我们把它们放在一起,构建整个数据管道。

public int run(String[] args) throws Exception {
    String inputPath = args[0];
    String outputPath = args[1];

    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(inputPath);

    // Define a function that splits each line in a PCollection of Strings into
    // a PCollection made up of the individual words in the file.
    // The second argument sets the serialization format.
    PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

    // Take the collection of words and remove known stop words.
    PCollection<String> noStopWords = words.filter(new StopWordFilter());

    // The count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    PTable<String, Long> counts = noStopWords.count();

    // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, outputPath);

    // Execute the pipeline as a MapReduce.
    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
}

10. Hadoop Launch Configuration

10.Hadoop启动配置

The data pipeline is thus ready.

因此,数据管道已经准备就绪。

However, we need the code to launch it. Therefore, let’s write the main method to launch the application:

然而,我们需要代码来启动它。因此,让我们编写main方法来启动该应用程序。

public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.

ToolRunner.run从命令行解析Hadoop配置并执行MapReduce作业。

11. Run Application

11.运行应用程序

The complete application is now ready. Let’s run the following command to build it:

完整的应用程序现在已经准备好了。让我们运行以下命令来构建它。

mvn package

As a result of the above command, we get the packaged application and a special job jar in the target directory.

作为上述命令的结果,我们在目标目录下得到了打包好的应用程序和一个特殊的工作jar。

Let’s use this job jar to execute the application on Hadoop:

让我们使用这个job jar来执行Hadoop上的应用程序。

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>

The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:

该应用程序读取输入文件并将结果写入输出文件。输出文件包含独特的词和它们的计数,类似于下面的情况。

[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]

In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.

除了Hadoop,我们还可以在IDE中运行应用程序,作为一个独立的应用程序或作为单元测试。

12. Conclusion

12.结语

In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.

在本教程中,我们创建了一个运行在MapReduce上的数据处理应用程序。Apache Crunch使得在Java中编写、测试和执行MapReduce管道变得非常容易。

As usual, the full source code can be found over on Github.

像往常一样,完整的源代码可以在Github上找到over