Spring Batch using Partitioner – 使用Partitioner的Spring Batch

最后修改: 2017年 8月 15日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In our previous introduction to Spring Batch, we introduced the framework as a batch-processing tool. We also explored the configuration details and the implementation for a single-threaded, single process job execution.

在我们之前的Spring Batch 介绍中,我们将该框架作为批处理工具进行了介绍。我们还探讨了配置细节以及单线程、单进程作业执行的实现。

To implement a job with some parallel processing, a range of options is provided. At a higher level, there are two modes of parallel processing:

为了实现一个具有某些并行处理的作业,提供了一系列的选项。在更高层次上,有两种并行处理模式。

  1. Single-Process, multi-threaded
  2. Multi-Process

In this quick article, we’ll discuss the partitioning of Step, which can be implemented for both single process and multi-process jobs.

在这篇快速文章中,我们将讨论Step的分区,它可以为单进程和多进程作业实现。

2. Partitioning a Step

2.分割一个步骤

Spring Batch with partitioning provides us the facility to divide the execution of a Step:

具有分区功能的Spring Batch为我们提供了划分执行步骤的便利。

The above picture shows an implementation of a Job with a partitioned Step.

上图显示了一个带有分区的Job的实现Step

There’s a Step called “Master”, whose execution is divided into some “Slave” steps. These slaves can take the place of a master, and the outcome will still be unchanged. Both master and slave are instances of Step. Slaves can be remote services or just locally executing threads.

有一个步骤叫做 “主”,它的执行被分成一些 “从 “步骤。这些从属步骤可以代替主步骤,而结果仍然是不变的。主站和从站都是Step的实例。从者可以是远程服务,也可以只是本地执行的线程。

If required, we can pass data from the master to the slave. The meta data (i.e. the JobRepository), makes sure that every slave is executed only once in a single execution of the Job.

如果需要,我们可以从主站向从站传递数据。元数据(即JobRepository),确保每个从站在Job.的一次执行中只被执行一次。

Here is the sequence diagram showing how it all works:

下面是显示这一切如何运作的顺序图。

As shown, the PartitionStep is driving the execution. The PartitionHandler is responsible for splitting the work of “Master” into the “Slaves”. The rightmost Step is the slave.

如图所示,PartitionStep正在驱动执行。PartitionHandler负责将 “Master “的工作分割成 “Slave”。最右边的Step是从者。

3. The Maven POM

3.Maven的POM

The Maven dependencies are the same as mentioned in our previous article. That is, Spring Core, Spring Batch and the dependency for the database (in our case, SQLite).

Maven的依赖性与我们之前的文章中提到的相同。也就是说,Spring Core、Spring Batch和数据库的依赖(在我们的例子中是SQLite)。

4. Configuration

4.配置

In our introductory article, we saw an example of converting some financial data from CSV to XML file. Let’s extend the same example.

在我们的介绍文章中,我们看到了一个将一些财务数据从CSV转换为XML文件的例子。让我们扩展一下同一个例子。

Here, we’ll convert the financial information from 5 CSV files to corresponding XML files, using a multi-threaded implementation.

在这里,我们将使用多线程的实现方式,将5个CSV文件中的财务信息转换为相应的XML文件。

We can achieve this using a single Job and Step partitioning. We’ll have five threads, one for each of the CSV files.

我们可以使用单个JobStep分区来实现。我们将有五个线程,每个CSV文件都有一个。

First of all, let’s create a Job:

首先,让我们创建一个工作。

@Bean(name = "partitionerJob")
public Job partitionerJob() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob")
      .start(partitionStep())
      .build();
}

As we can see, this Job starts with the PartitioningStep. This is our master step which will be divided into various slave steps:

我们可以看到,这个JobPartitioningStep开始。这是我们的主步骤,它将被划分为各种从属步骤。

@Bean
public Step partitionStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("partitionStep")
      .partitioner("slaveStep", partitioner())
      .step(slaveStep())
      .taskExecutor(taskExecutor())
      .build();
}

Here, we’ll create the PartitioningStep using the StepBuilderFactory. For that, we need to give the information about the SlaveSteps and the Partitioner.

这里,我们将使用StepBuilderFactory创建PartitioningStep。为此,我们需要给出关于SlaveStepsPartitioner的信息。

The Partitioner is an interface which provides the facility to define a set of input values for each of the slaves. In other words, logic to divide tasks into respective threads goes here.

Partitioner是一个接口,它提供了为每个从机定义一组输入值的功能。换句话说,将任务分成各自的线程的逻辑就在这里。

Let’s create an implementation of it, called CustomMultiResourcePartitioner, where we’ll put the input and output file names in the ExecutionContext to pass on to every slave step:

让我们创建一个它的实现,叫做CustomMultiResourcePartitioner,我们将把输入和输出文件名放在ExecutionContext中,传递给每个从属步骤。

public class CustomMultiResourcePartitioner implements Partitioner {
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString(keyName, resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put(PARTITION_KEY + i, context);
            i++;
        }
        return map;
    }
}

We’ll also create the bean for this class, where we’ll give the source directory for input files:

我们还将为这个类创建Bean,在这里我们将给出输入文件的源目录。

@Bean
public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
          .getResources("file:src/main/resources/input/*.csv");
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    }
    partitioner.setResources(resources);
    return partitioner;
}

We will define the slave step, just like any other step with the reader and the writer. The reader and writer will be same as we saw in our introductory example, except they will receive filename parameter from the StepExecutionContext.

我们将定义从属步骤,就像任何其他步骤的读者和写者一样。读取器和写入器将与我们在介绍性例子中看到的一样,除了它们将从StepExecutionContext接收文件名参数。

Note that these beans need to be step scoped so that they will be able to receive the stepExecutionContext params, at every step. If they would not be step scoped, their beans will be created initially, and won’t accept the filenames at step level:

请注意,这些Bean必须是步骤范围的,以便它们能够在每个步骤接收stepExecutionContext参数。如果它们不是步骤范围的,它们的Bean将被初始化,并且不会在步骤级别接受文件名。

@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
 
    FlatFileItemReader<Transaction> reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper<Transaction> lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter<Transaction> itemWriter 
      = new StaxEventItemWriter<Transaction>();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName("transactionRecord");
    itemWriter.setResource(new ClassPathResource("xml/" + filename));
    return itemWriter;
}

While mentioning the reader and writer in the slave step, we can pass the arguments as null, because these filenames will not be used, as they will receive the filenames from stepExecutionContext:

在从属步骤中提到读者和写者时,我们可以把参数传成null,因为这些文件名不会被使用,因为它们将从stepExecutionContext接收文件名。

@Bean
public Step slaveStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
      .reader(itemReader(null))
      .writer(itemWriter(marshaller(), null))
      .build();
}

5. Conclusion

5.结论

In this tutorial, we discussed how to implement a job with parallel processing using Spring Batch.

在本教程中,我们讨论了如何使用Spring Batch实现并行处理的作业。

As always, the complete implementation for this example is available over on GitHub.

一如既往,本例的完整实现可在GitHub上获得