Conditional Flow in Spring Batch – Spring Batch中的条件流

最后修改: 2020年 4月 6日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

We use Spring Batch to compose jobs from multiple steps that read, transform, and write data. If the steps in a job have multiple paths, similar to using an if statement in our code, we say that the job flow is conditional.

我们使用Spring Batch来将作业从读取、转换和写入数据的多个步骤组成。如果作业中的步骤有多条路径,类似于在我们的代码中使用if语句,我们就说该作业流程是conditional

In this tutorial, we’ll look at two ways to create Spring Batch jobs with a conditional flow.

在本教程中,我们将看看用条件流创建Spring Batch作业的两种方法。

2. Exit Status and Batch Status

2.退出状态和批处理状态

When we specify a conditional step with Spring’s Batch framework, we’re using the exit status of a step or job. Therefore, we need to understand the difference between Batch Status and Exit Status in our steps and jobs:

当我们用Spring的Batch框架指定一个条件步骤时,我们使用的是步骤或作业的退出状态。因此,我们需要了解步骤和工作中的批处理状态和退出状态的区别。

  • BatchStatus is an Enum representing the status of a step/job and is used by the Batch framework internally
  • Possible values are: ABANDONED, COMPLETED, FAILED, STARTED, STARTING, STOPPED, STOPPING, UNKNOWN
  • ExitStatus is the status of a step when execution is completed and is used to conditionally determine the flow

By default, the ExitStatus of a step or job is the same as its BatchStatus. We can also set a custom ExitStatus to drive flow.

默认情况下,一个步骤或工作的ExitStatus与它的BatchStatus相同。我们也可以设置一个自定义的ExitStatus来驱动流程。

3. Conditional Flow

3.条件性流动

Let’s say we have an IOT device that sends us measurements. Our device measurements are arrays of integers, and we need to send notifications if any of our measurements contain positive integers.

假设我们有一个物联网设备,它向我们发送测量值。我们的设备测量值是整数的数组,如果我们的任何测量值包含正整数,我们就需要发送通知。

In other words, we need to send a notification when we detect a positive measurement.

换句话说,当我们检测到一个积极的测量时,我们需要发送一个通知。

3.1. ExitStatus

3.1.退出状态

Importantly, we use the exit status of a step to drive conditional flow.

重要的是,我们使用一个步骤的退出状态来驱动条件流

To set the exit status of a step, we need to use the StepExecution object’s setExitStatus method. In order to do that, we need to create an ItemProcessor that extends ItemListenerSupport and gets the step’s StepExecution.

为了设置一个步骤的退出状态,我们需要使用StepExecution对象的setExitStatus方法。为了做到这一点,我们需要创建一个ItemProcessor,它扩展了ItemListenerSupport并获得该步骤的StepExecution

We use this to set our step’s exit status to NOTIFY when we find a positive number. When we determine our exit status based on data within the batch job, we can use an ItemProcessor.

当我们找到一个正数时,我们用它来将我们的步骤的退出状态设置为NOTIFY当我们根据批处理作业中的数据来确定我们的退出状态时,我们可以使用ItemProcessor

Let’s look at our NumberInfoClassifier to see the three methods we need:

让我们看看我们的NumberInfoClassifier,看看我们需要的三个方法。

public class NumberInfoClassifier extends ItemListenerSupport<NumberInfo, Integer>
  implements ItemProcessor<NumberInfo, Integer> {
 
    private StepExecution stepExecution;

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.stepExecution.setExitStatus(new ExitStatus(QUIET));
    }

    @Override
    public Integer process(NumberInfo numberInfo) throws Exception {
        return Integer.valueOf(numberInfo.getNumber());
    }

    @Override
    public void afterProcess(NumberInfo item, Integer result) {
        super.afterProcess(item, result);
        if (item.isPositive()) {
            stepExecution.setExitStatus(new ExitStatus(NOTIFY));
        }
    }
}

Note: we use the ItemProcessor to set the ExitStatus in this example, but we could just as easily do it in our step’s ItemReader or ItemWriter.

注意:在这个例子中,我们使用ItemProcessor来设置ExitStatus,但我们也可以在我们步骤的ItemReaderItemWriter中轻松完成。

Finally, when we create our job, we tell our JobBuilderFactory to send notifications for any step that exits with a status of NOTIFY:

最后,当我们创建作业时,我们告诉我们的JobBuilderFactory,为任何退出状态为NOTIFY的步骤发送通知。

jobBuilderFactory.get("Number generator - second dataset")
    .start(dataProviderStep)
    .on("NOTIFY").to(notificationStep)
    .end()
    .build();

Also note that when we have additional conditional branches and multiple exit codes, we can add them to our job with the from and on methods of the JobBuilderFacotry:

还要注意的是,当我们有额外的条件分支和多个退出代码时,我们可以通过JobBuilderFacotryfon方法将它们添加到我们的作业。

jobBuilderFactory.get("Number generator - second dataset")
    .start(dataProviderStep)
    .on("NOTIFY").to(notificationStep)
    .from(step).on("LOG_ERROR").to(errorLoggingStep)
    .end()
    .build();

Now, any time our ItemProcessor sees a positive number, it will direct our job to run the notificationStep, which simply prints a message to System.out:

现在,只要我们的ItemProcessor看到一个正数,它就会指示我们的作业运行notificationStep,这只是向System.out打印一条消息。

Second Dataset Processor 11
Second Dataset Processor -2
Second Dataset Processor -3
[Number generator - second dataset] contains interesting data!!

If we had a data set without a positive number, we would not see our notificationStep message:

如果我们有一个没有正数的数据集,我们就不会看到我们的notificationStep消息。

Second Dataset Processor -1
Second Dataset Processor -2
Second Dataset Processor -3

3.2. Programmatic Branching With JobExecutionDecider

3.2.使用JobExecutionDecider的程序化分支

Alternatively, we can use a class that implements JobExecutionDecider to determine job flow. This is especially useful if we have external factors for determining execution flow.

另外,我们可以使用一个实现JobExecutionDecider的类来确定作业流程。如果我们有确定执行流程的外部因素,这一点尤其有用

To use this method, we first need to modify our ItemProcessor to remove the ItemListenerSupport interface and @BeforeStep method:

要使用这个方法,我们首先需要修改我们的ItemProcessor,删除ItemListenerSupport接口和@BeforeStep方法。

public class NumberInfoClassifierWithDecider extends ItemListenerSupport<NumberInfo, Integer>
  implements ItemProcessor<NumberInfo, Integer> {

    @Override
    public Integer process(NumberInfo numberInfo) throws Exception {
        return Integer.valueOf(numberInfo.getNumber());
    }
}

Next, we create a decider class that determines the notification status of our step:

接下来,我们创建一个决定者类,决定我们步骤的通知状态。

public class NumberInfoDecider implements JobExecutionDecider {

    private boolean shouldNotify() {
        return true;
    }

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (shouldNotify()) {
            return new FlowExecutionStatus(NOTIFY);
        } else {
            return new FlowExecutionStatus(QUIET);
        }
    }
}

Then, we set up our Job to use the decider in the flow:

然后,我们设置我们的Job来使用流程中的决定者。

jobBuilderFactory.get("Number generator - third dataset")
    .start(dataProviderStep)
    .next(new NumberInfoDecider()).on("NOTIFY").to(notificationStep)
    .end()
    .build();

4. Conclusion

4.总结

In this quick tutorial, we explored two options for implementing conditional flows with Spring Batch. First, we looked at how to use the ExitStatus to control the flow of our job.

在这个快速教程中,我们探讨了用Spring Batch实现条件流的两个选项。首先,我们研究了如何使用ExitStatus来控制作业的流程。

Then we took a look at how we can control flow programmatically by defining our own JobExecutionDecider.

然后,我们看了一下如何通过定义我们自己的JobExecutionDecider来以编程方式控制流程。

As always, the full source code of the article is available over on GitHub.

一如既往,文章的完整源代码可在GitHub上获得