Spring Batch – Tasklets vs Chunks – Spring Batch – Tasklets vs Chunks

最后修改: 2018年 2月 20日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

Spring Batch provides two different ways for implementing a job: using tasklets and chunks.

Spring Batch为实现作业提供了两种不同的方式:使用tasklet和chunks

In this article, we’ll learn how to configure and implement both methods using a simple real-life example.

在这篇文章中,我们将通过一个简单的实际例子来学习如何配置和实现这两种方法。

2. Dependencies

2.依赖性

Let’s get started by adding the required dependencies:

让我们从添加所需的依赖项开始

<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <version>4.3.0</version>
    <scope>test</scope>
</dependency>

To get the latest version of spring-batch-core and spring-batch-test, please refer to Maven Central.

要获得spring-batch-corespring-batch-test的最新版本,请参考Maven Central。

3. Our Use Case

3.我们的用例

Let’s consider a CSV file with the following content:

让我们考虑一个CSV文件,其内容如下。

Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992

The first position of each line represents a person’s name and the second position represents his/her date of birth.

每行的第一个位置代表一个人的名字,第二个位置代表他/她的出生日期

Our use case is to generate another CSV file that contains each person’s name and age:

我们的用例是生成另一个CSV文件,包含每个人的姓名和年龄

Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25

Now that our domain is clear let’s go ahead and build a solution using both approaches. We’ll start with tasklets.

现在我们的领域已经清楚了,让我们继续使用这两种方法建立一个解决方案。我们将从tasklets开始。

4. Tasklets Approach

4.小任务方法

4.1. Introduction and Design

4.1.导言和设计

Tasklets are meant to perform a single task within a step. Our job will consist of several steps that execute one after the other. Each step should perform only one defined task.

Tasklet 是为了在一个步骤中执行一个单一的任务。我们的工作将由几个步骤组成,一个接一个地执行。每个步骤应该只执行一个定义的任务

Our job will consist of three steps:

我们的工作将包括三个步骤。

  1. Read lines from the input CSV file.
  2. Calculate age for every person in the input CSV file.
  3. Write name and age of each person to a new output CSV file.

Now that the big picture is ready, let’s create one class per step.

现在大局已定,让我们为每一步创建一个类。

LinesReader will be in charge of reading data from the input file:

LinesReader将负责从输入文件读取数据。

public class LinesReader implements Tasklet {
    // ...
}

LinesProcessor will calculate the age for every person in the file:

LinesProcessor将计算文件中每个人的年龄。

public class LinesProcessor implements Tasklet {
    // ...
}

Finally, LinesWriter will have the responsibility of writing names and ages to an output file:

最后,LinesWriter将负责把名字和年龄写到输出文件。

public class LinesWriter implements Tasklet {
    // ...
}

At this point, all our steps implement Tasklet interface. That will force us to implement its execute method:

在这一点上,我们所有的步骤都实现了Tasklet interface。这将迫使我们实现其execute方法。

@Override
public RepeatStatus execute(StepContribution stepContribution, 
  ChunkContext chunkContext) throws Exception {
    // ...
}

This method is where we’ll add the logic for each step. Before starting with that code, let’s configure our job.

这个方法是我们为每个步骤添加逻辑的地方。在开始使用这些代码之前,让我们先配置我们的工作。

4.2. Configuration

4.2.配置

We need to add some configuration to Spring’s application context. After adding standard bean declaration for the classes created in the previous section, we’re ready to create our job definition:

我们需要向Spring的应用上下文添加一些配置。在为上一节中创建的类添加标准的Bean声明后,我们就可以创建我们的工作定义了。

@Configuration
@EnableBatchProcessing
public class TaskletsConfig {

    @Autowired 
    private JobBuilderFactory jobs;

    @Autowired 
    private StepBuilderFactory steps;

    @Bean
    protected Step readLines() {
        return steps
          .get("readLines")
          .tasklet(linesReader())
          .build();
    }

    @Bean
    protected Step processLines() {
        return steps
          .get("processLines")
          .tasklet(linesProcessor())
          .build();
    }

    @Bean
    protected Step writeLines() {
        return steps
          .get("writeLines")
          .tasklet(linesWriter())
          .build();
    }

    @Bean
    public Job job() {
        return jobs
          .get("taskletsJob")
          .start(readLines())
          .next(processLines())
          .next(writeLines())
          .build();
    }

    // ...

}

This means that our “taskletsJob” will consist of three steps. The first one (readLines) will execute the tasklet defined in the bean linesReader and move to the next step: processLines. ProcessLines will perform the tasklet defined in the bean linesProcessor and go to the final step: writeLines.

这意味着我们的“taskletsJob”将由三个步骤组成。第一个步骤(readLines)将执行Bean linesReader中定义的tasklet,然后进入下一个步骤。processLines。ProcessLines将执行Bean linesProcessor中定义的任务let,并进入最后一步。writeLines

Our job flow is defined, and we’re ready to add some logic!

我们的工作流程已经确定,我们已经准备好添加一些逻辑了!

4.3. Model and Utils

4.3.模型和实用程序

As we’ll be manipulating lines in a CSV file, we’re going to create a class Line:

由于我们将在CSV文件中操作行,我们将创建一个Line:类。

public class Line implements Serializable {

    private String name;
    private LocalDate dob;
    private Long age;

    // standard constructor, getters, setters and toString implementation

}

Please note that Line implements Serializable. That is because Line will act as a DTO to transfer data between steps. According to Spring Batch, objects that are transferred between steps must be serializable.

请注意,Line实现了Serializable.,这是因为Line将作为一个DTO,在步骤之间传输数据。根据Spring Batch,在步骤间传输的对象必须是可序列化的

On the other hand, we can start thinking about reading and writing lines.

另一方面,我们可以开始思考阅读和写线。

For that, we’ll make use of OpenCSV:

为此,我们将利用OpenCSV。

<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>4.1</version>
</dependency>

Look for the latest OpenCSV version in Maven Central.

在Maven中心查找最新的OpenCSV版本。

Once OpenCSV is included, we’re also going to create a FileUtils class. It will provide methods for reading and writing CSV lines:

一旦包含了OpenCSV,我们还将创建一个FileUtils。它将提供读取和写入CSV行的方法。

public class FileUtils {

    public Line readLine() throws Exception {
        if (CSVReader == null) 
          initReader();
        String[] line = CSVReader.readNext();
        if (line == null) 
          return null;
        return new Line(
          line[0], 
          LocalDate.parse(
            line[1], 
            DateTimeFormatter.ofPattern("MM/dd/yyyy")));
    }

    public void writeLine(Line line) throws Exception {
        if (CSVWriter == null) 
          initWriter();
        String[] lineStr = new String[2];
        lineStr[0] = line.getName();
        lineStr[1] = line
          .getAge()
          .toString();
        CSVWriter.writeNext(lineStr);
    }

    // ...
}

Notice that readLine acts as a wrapper over OpenCSV’s readNext method and returns a Line object.

请注意,readLine作为OpenCSV的readNext方法的一个封装器,并返回一个Line对象。

Same way, writeLine wraps OpenCSV’s writeNext receiving a Line object. Full implementation of this class can be found in the GitHub Project.

同样,writeLine包裹了OpenCSV的writeNext接收Line对象。这个类的完整实现可以在GitHub项目中找到。

At this point, we’re all set to start with each step implementation.

在这一点上,我们都准备好了,可以开始每一步的实施。

4.4. LinesReader

4.4.LinesReader

Let’s go ahead and complete our LinesReader class:

让我们继续,完成我们的LinesReader类。

public class LinesReader implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesReader.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        lines = new ArrayList<>();
        fu = new FileUtils(
          "taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Lines Reader initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        Line line = fu.readLine();
        while (line != null) {
            lines.add(line);
            logger.debug("Read line: " + line.toString());
            line = fu.readLine();
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        stepExecution
          .getJobExecution()
          .getExecutionContext()
          .put("lines", this.lines);
        logger.debug("Lines Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

LinesReader’s execute method creates a FileUtils instance over the input file path. Then, adds lines to a list until there’re no more lines to read.

LinesReader的execute方法在输入文件路径上创建一个FileUtils实例。然后,将行添加到一个列表中,直到没有行可读为止

Our class also implements StepExecutionListener that provides two extra methods: beforeStep and afterStep. We’ll use those methods to initialize and close things before and after execute runs.

我们的类还实现了StepExecutionListener,提供了两个额外的方法。beforeStepafterStep。我们将使用这些方法来初始化和关闭execute运行前后的东西。

If we take a look at afterStep code, we’ll notice the line where the result list (lines) is put in the job’s context to make it available for the next step:

如果我们看一下afterStep代码,我们会注意到结果列表(lines)被放在作业的上下文中,使其可用于下一个步骤。

stepExecution
  .getJobExecution()
  .getExecutionContext()
  .put("lines", this.lines);

At this point, our first step has already fulfilled its responsibility: load CSV lines into a List in memory. Let’s move to the second step and process them.

在这一点上,我们的第一步已经完成了它的职责:将CSV行加载到内存中的List。让我们进入第二步,处理它们。

4.5. LinesProcessor

4.5.LinesProcessor

LinesProcessor will also implement StepExecutionListener and of course, Tasklet. That means that it will implement beforeStep, execute and afterStep methods as well:

LinesProcessor也将实现StepExecutionListener,当然还有Tasklet这意味着它也将实现beforeStepexecuteafterStep方法。

public class LinesProcessor implements Tasklet, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(
      LinesProcessor.class);

    private List<Line> lines;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List<Line>) executionContext.get("lines");
        logger.debug("Lines Processor initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            long age = ChronoUnit.YEARS.between(
              line.getDob(), 
              LocalDate.now());
            logger.debug("Calculated age " + age + " for line " + line.toString());
            line.setAge(age);
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Lines Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

It’s effortless to understand that it loads lines list from the job’s context and calculates the age of each person.

不难理解,它从工作的上下文中加载line列表,并计算每个人的年龄

There’s no need to put another result list in the context as modifications happen on the same object that comes from the previous step.

没有必要把另一个结果列表放在上下文中,因为修改发生在来自上一步的同一个对象上。

And we’re ready for our last step.

我们已经准备好了最后一步。

4.6. LinesWriter

4.6.LinesWriter

LinesWriter‘s task is to go over lines list and write name and age to the output file:

LinesWriter的任务是浏览lines列表,并将名字和年龄写入输出文件

public class LinesWriter implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);

    private List<Line> lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List<Line>) executionContext.get("lines");
        fu = new FileUtils("output.csv");
        logger.debug("Lines Writer initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Lines Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

We’re done with our job’s implementation! Let’s create a test to run it and see the results.

我们已经完成了工作的实施!让我们创建一个测试来运行它,看看结果。

4.7. Running the Job

4.7.运行作业

To run the job, we’ll create a test:

为了运行这个工作,我们将创建一个测试。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {

    @Autowired 
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
      throws Exception {
 
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }
}

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

ContextConfiguration注解指向Spring上下文配置类,它有我们的工作定义。

We’ll need to add a couple of extra beans before running the test:

在运行测试之前,我们需要添加一些额外的Bean。

@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
    return new JobLauncherTestUtils();
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource());
    factory.setTransactionManager(transactionManager());
    return factory.getObject();
}

@Bean
public DataSource dataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("org.sqlite.JDBC");
    dataSource.setUrl("jdbc:sqlite:repository.sqlite");
    return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean
public JobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    return jobLauncher;
}

Everything is ready! Go ahead and run the test!

一切准备就绪!继续运行测试!

After the job has finished, output.csv has the expected content and logs show the execution flow:

作业完成后,output.csv有预期的内容,日志显示执行流程。

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That’s it for Tasklets. Now we can move on to the Chunks approach.

这就是小任务的情况。现在我们可以转向Chunks方法。

5. Chunks Approach

5 块状方法

5.1. Introduction and Design

5.1.导言和设计

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it’ll read, process and write a fixed amount of records (chunk) at a time.

顾名思义,这种方法对数据块执行操作。也就是说,它不是一次性读取、处理和写入所有行,而是一次读取、处理和写入固定数量的记录(块)。

Then, it’ll repeat the cycle until there’s no more data in the file.

然后,它将重复这个循环,直到文件中没有更多的数据。

As a result, the flow will be slightly different:

因此,流程会略有不同。

  1. While there’re lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

因此,我们还需要为面向分块的方法创建三个Bean

public class LineReader {
     // ...
}
public class LineProcessor {
    // ...
}
public class LinesWriter {
    // ...
}

Before moving to implementation, let’s configure our job.

在进入实施阶段之前,让我们来配置我们的工作。

5.2. Configuration

5.2.配置

The job definition will also look different:

工作定义看起来也会有所不同。

@Configuration
@EnableBatchProcessing
public class ChunksConfig {

    @Autowired 
    private JobBuilderFactory jobs;

    @Autowired 
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Line> itemReader() {
        return new LineReader();
    }

    @Bean
    public ItemProcessor<Line, Line> itemProcessor() {
        return new LineProcessor();
    }

    @Bean
    public ItemWriter<Line> itemWriter() {
        return new LinesWriter();
    }

    @Bean
    protected Step processLines(ItemReader<Line> reader,
      ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
        return steps.get("processLines").<Line, Line> chunk(2)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }

    @Bean
    public Job job() {
        return jobs
          .get("chunksJob")
          .start(processLines(itemReader(), itemProcessor(), itemWriter()))
          .build();
    }

}

In this case, there’s only one step performing only one tasklet.

在这种情况下,只有一个步骤只执行一个任务单元。

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

然而,该任务包定义了一个读者、一个写者和一个处理器,他们将对数据块采取行动

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

请注意,commit interval表示一个块中要处理的数据量。我们的工作将一次读取、处理和写入两行。

Now we’re ready to add our chunk logic!

现在我们准备添加我们的分块逻辑了

5.3. LineReader

5.3.LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

LineReader将负责读取一条记录并返回一个包含其内容的Line实例。

To become a reader, our class has to implement ItemReader interface:

要成为一个读者,我们的类必须实现ItemReader interface

public class LineReader implements ItemReader<Line> {
     @Override
     public Line read() throws Exception {
         Line line = fu.readLine();
         if (line != null) 
           logger.debug("Read line: " + line.toString());
         return line;
     }
}

The code is straightforward, it just reads one line and returns it. We’ll also implement StepExecutionListener for the final version of this class:

这段代码很直接,它只是读取一行并返回。我们还将为这个类的最终版本实现StepExecutionListener

public class LineReader implements 
  ItemReader<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LineReader.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Line Reader initialized.");
    }

    @Override
    public Line read() throws Exception {
        Line line = fu.readLine();
        if (line != null) logger.debug("Read line: " + line.toString());
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        logger.debug("Line Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

应该注意,beforeStepafterStep分别在整个步骤之前和之后执行。

5.4. LineProcessor

5.4.LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

LineProcessorLineReader的逻辑基本相同。

However, in this case, we’ll implement ItemProcessor and its method process():

然而,在这种情况下,我们将实现ItemProcessor和它的方法process()

public class LineProcessor implements ItemProcessor<Line, Line> {

    private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS
          .between(line.getDob(), LocalDate.now());
        logger.debug("Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

}

The process() method takes an input line, processes it and returns an output line. Again, we’ll also implement StepExecutionListener:

process()方法接收一个输入行,对其进行处理并返回一个输出行。同样,我们还将实现StepExecutionListener:

public class LineProcessor implements 
  ItemProcessor<Line, Line>, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        logger.debug("Line Processor initialized.");
    }
    
    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS
          .between(line.getDob(), LocalDate.now());
        logger.debug(
          "Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Line Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

5.5. LinesWriter

5.5.LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

与阅读器和处理器不同,LinesWriter将写下一整块的行,这样它就会收到一个ListLines:

public class LinesWriter implements 
  ItemWriter<Line>, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("output.csv");
        logger.debug("Line Writer initialized.");
    }

    @Override
    public void write(List<? extends Line> lines) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Line Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

LinesWriter code speaks for itself. And again, we’re ready to test our job.

LinesWriter代码不言自明。再说一遍,我们已经准备好测试我们的工作了。

5.6. Running the Job

5.6.运行作业

We’ll create a new test, same as the one we created for the tasklets approach:

我们将创建一个新的测试,与我们为小任务方法创建的测试相同。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenChunksJob_whenJobEnds_thenStatusCompleted() 
      throws Exception {
 
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
 
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); 
    }
}

After configuring ChunksConfig as explained above for TaskletsConfig, we’re all set to run the test!

在配置了ChunksConfig之后,就像上面为TaskletsConfig所解释的那样,我们就可以运行测试了!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

一旦工作完成,我们可以看到output.csv又包含了预期的结果,并且日志描述了流程。

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

我们有相同的结果和不同的流程。日志使人看到工作是如何按照这种方法执行的。

6. Conclusion

6.结论

Different contexts will show the need for one approach or the other. While Tasklets feel more natural for ‘one task after the other’ scenarios, chunks provide a simple solution to deal with paginated reads or situations where we don’t want to keep a significant amount of data in memory.

不同的环境会显示出对这种或那种方法的需求。虽然Tasklet对于 “一个接一个的任务 “的场景来说感觉更自然,但Chunks提供了一个简单的解决方案来处理分页读取或我们不想在内存中保留大量数据的情况。

The complete implementation of this example can be found in the GitHub project.

这个例子的完整实现可以在GitHub项目中找到。