Configuring Retry Logic in Spring Batch – 在Spring Batch中配置重试逻辑

最后修改: 2020年 2月 22日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

By default, a Spring batch job fails for any errors raised during its execution. However, at times, we may want to improve our application’s resiliency to deal with intermittent failures.

默认情况下,Spring批处理作业在执行过程中出现的任何错误都会失败。然而,有时我们可能想提高我们应用程序的弹性,以处理间歇性故障。

In this quick tutorial, we’ll explore how to configure retry logic in the Spring Batch framework.

在这个快速教程中,我们将探讨如何在Spring Batch框架中配置重试逻辑

2. An Example Use Case

2.一个用例

Let’s say we have a batch job that reads an input CSV file:

假设我们有一个批处理作业,读取一个输入CSV文件。

username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

Then, it processes each record by hitting a REST endpoint to fetch the user’s age and postCode attributes:

然后,它通过点击一个REST端点来处理每条记录,以获取用户的年龄邮政编码属性。

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
    
    @Override
    public Transaction process(Transaction transaction) throws IOException {
        log.info("RetryItemProcessor, attempting to process: {}", transaction);
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        //parse user's age and postCode from response and update transaction
        ...
        return transaction;
    }
    ...
}

And finally, it generates a consolidated output XML:

最后,它生成一个综合输出XML

<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    ...
</transactionRecord>

3. Adding Retries to ItemProcessor

3.向ItemProcessor添加重试

Now, what if the connection to the REST endpoint times out due to some network slowness? If so, our batch job will fail.

现在,如果由于网络缓慢,连接到REST端点的时间结束了怎么办?如果是这样,我们的批处理作业就会失败。

In such cases, we’d prefer the failed item processing to be retried a couple of times. And so, let’s configure our batch job to perform up to three retries in case of failures:

在这种情况下,我们希望失败的项目处理能被重试几次。因此,让我们配置我们的批处理作业,在失败的情况下最多执行三次重试

@Bean
public Step retryStep(
  ItemProcessor<Transaction, Transaction> processor,
  ItemWriter<Transaction> writer) throws ParseException {
    return stepBuilderFactory
      .get("retryStep")
      .<Transaction, Transaction>chunk(10)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .faultTolerant()
      .retryLimit(3)
      .retry(ConnectTimeoutException.class)
      .retry(DeadlockLoserDataAccessException.class)
      .build();
}

Here, we have a call to faultTolerant() for enabling the retry functionality. Additionally, we use retry and retryLimit to define the exceptions that qualify for a retry and the maximum retry count for an item, respectively.

在这里,我们有一个对 faultTolerant() 的调用,用于启用重试功能。此外,我们使用retryretryLimit来分别定义符合重试条件的异常和一个项目的最大重试次数

4. Testing the Retries

4.测试重试

Let’s have a test scenario where the REST endpoint returning age and postCode was down just for a while. In this test scenario, we’ll get a ConnectTimeoutException only for the first two API calls, and the third call will succeed:

让我们有一个测试场景,返回agepostCode的REST端点只是关闭了一段时间。在这个测试场景中,我们将只在前两次API调用中得到一个ConnectTimeoutException,而第三次调用将成功。

@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
    FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
    FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

    when(httpResponse.getEntity())
      .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
 
    //fails for first two calls and passes third time onwards
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Timeout count 1"))
      .thenThrow(new ConnectTimeoutException("Timeout count 2"))
      .thenReturn(httpResponse);

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
    AssertFile.assertFileEquals(expectedResult, actualResult);
}

Here, our job completed successfully. Additionally, it’s evident from the logs that the first record with id=1234 failed twice and finally succeeded on the third retry:

在这里,我们的工作成功完成。此外,从日志中可以看出,第一条id=1234的记录失败了两次,最后在第三次重试时成功了

19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

Similarly, let’s have another test case to see what happens when all retries are exhausted:

同样地,让我们有另一个测试案例,看看当所有重试都用完了会发生什么

@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Endpoint is down"));

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
    assertThat(actualJobExitStatus.getExitDescription(),
      containsString("org.apache.http.conn.ConnectTimeoutException"));
}

In this case, three retries were attempted for the first record before the job finally failed due to a ConnectTimeoutException.

在这种情况下,在作业最终因ConnectTimeoutException而失败之前,对第一条记录尝试了三次重试。

5. Configuring Retries Using XML

5.使用XML配置重试

Finally, let’s look at the XML equivalent of the above configurations:

最后,让我们看一下上述配置的XML等价物。

<batch:job id="retryBatchJob">
    <batch:step id="retryStep">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="retryItemProcessor" commit-interval="10"
              retry-limit="3">
                <batch:retryable-exception-classes>
                    <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
                    <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                </batch:retryable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

6. Conclusion

6.结语

In this article, we learned how to configure retry logic in Spring Batch. We looked at both Java and XML configurations.

在这篇文章中,我们学习了如何在Spring Batch中配置重试逻辑。我们看了Java和XML的配置。

We also used a unit test to see how the retries worked in practice.

我们还使用了一个单元测试来看看重试的实际效果。

As always, the example code for this tutorial is available over on GitHub.

一如既往,本教程的示例代码可在GitHub上获得