Spring Batch using Partitioner – 使用Partitioner的Spring Batch

最后修改: 2017年 8月 15日


1. Overview


In our previous introduction to Spring Batch, we introduced the framework as a batch-processing tool. We also explored the configuration details and the implementation for a single-threaded, single process job execution.

在我们之前的Spring Batch 介绍中,我们将该框架作为批处理工具进行了介绍。我们还探讨了配置细节以及单线程、单进程作业执行的实现。

To implement a job with some parallel processing, a range of options is provided. At a higher level, there are two modes of parallel processing:


  1. Single-Process, multi-threaded
  2. Multi-Process

In this quick article, we’ll discuss the partitioning of Step, which can be implemented for both single process and multi-process jobs.


2. Partitioning a Step


Spring Batch with partitioning provides us the facility to divide the execution of a Step:

具有分区功能的Spring Batch为我们提供了划分执行步骤的便利。

The above picture shows an implementation of a Job with a partitioned Step.


There’s a Step called “Master”, whose execution is divided into some “Slave” steps. These slaves can take the place of a master, and the outcome will still be unchanged. Both master and slave are instances of Step. Slaves can be remote services or just locally executing threads.

有一个步骤叫做 “主”,它的执行被分成一些 “从 “步骤。这些从属步骤可以代替主步骤,而结果仍然是不变的。主站和从站都是Step的实例。从者可以是远程服务,也可以只是本地执行的线程。

If required, we can pass data from the master to the slave. The meta data (i.e. the JobRepository), makes sure that every slave is executed only once in a single execution of the Job.


Here is the sequence diagram showing how it all works:


As shown, the PartitionStep is driving the execution. The PartitionHandler is responsible for splitting the work of “Master” into the “Slaves”. The rightmost Step is the slave.

如图所示,PartitionStep正在驱动执行。PartitionHandler负责将 “Master “的工作分割成 “Slave”。最右边的Step是从者。

3. The Maven POM


The Maven dependencies are the same as mentioned in our previous article. That is, Spring Core, Spring Batch and the dependency for the database (in our case, SQLite).

Maven的依赖性与我们之前的文章中提到的相同。也就是说,Spring Core、Spring Batch和数据库的依赖(在我们的例子中是SQLite)。

4. Configuration


In our introductory article, we saw an example of converting some financial data from CSV to XML file. Let’s extend the same example.


Here, we’ll convert the financial information from 5 CSV files to corresponding XML files, using a multi-threaded implementation.


We can achieve this using a single Job and Step partitioning. We’ll have five threads, one for each of the CSV files.


First of all, let’s create a Job:


@Bean(name = "partitionerJob")
public Job partitionerJob() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob")

As we can see, this Job starts with the PartitioningStep. This is our master step which will be divided into various slave steps:


public Step partitionStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("partitionStep")
      .partitioner("slaveStep", partitioner())

Here, we’ll create the PartitioningStep using the StepBuilderFactory. For that, we need to give the information about the SlaveSteps and the Partitioner.


The Partitioner is an interface which provides the facility to define a set of input values for each of the slaves. In other words, logic to divide tasks into respective threads goes here.


Let’s create an implementation of it, called CustomMultiResourcePartitioner, where we’ll put the input and output file names in the ExecutionContext to pass on to every slave step:


public class CustomMultiResourcePartitioner implements Partitioner {
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString(keyName, resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put(PARTITION_KEY + i, context);
        return map;

We’ll also create the bean for this class, where we’ll give the source directory for input files:


public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    return partitioner;

We will define the slave step, just like any other step with the reader and the writer. The reader and writer will be same as we saw in our introductory example, except they will receive filename parameter from the StepExecutionContext.


Note that these beans need to be step scoped so that they will be able to receive the stepExecutionContext params, at every step. If they would not be step scoped, their beans will be created initially, and won’t accept the filenames at step level:


public FlatFileItemReader<Transaction> itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
    FlatFileItemReader<Transaction> reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper<Transaction> lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    return reader;
public ItemWriter<Transaction> itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter<Transaction> itemWriter 
      = new StaxEventItemWriter<Transaction>();
    itemWriter.setResource(new ClassPathResource("xml/" + filename));
    return itemWriter;

While mentioning the reader and writer in the slave step, we can pass the arguments as null, because these filenames will not be used, as they will receive the filenames from stepExecutionContext:


public Step slaveStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
      .writer(itemWriter(marshaller(), null))

5. Conclusion


In this tutorial, we discussed how to implement a job with parallel processing using Spring Batch.

在本教程中,我们讨论了如何使用Spring Batch实现并行处理的作业。

As always, the complete implementation for this example is available over on GitHub.
