Spring Boot With Spring Batch – 使用Spring Batch的Spring Boot

最后修改: 2020年 12月 5日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Spring Batch is a powerful framework for developing robust batch applications. In our previous tutorial, we introduced Spring Batch.

Spring Batch是一个强大的框架,用于开发强大的批处理应用程序。在我们之前的教程中,我们介绍了Spring Batch

In this tutorial, we’ll build on the previous one and learn how to set up and create a basic batch-driven application using Spring Boot.

在本教程中,我们将在前一个教程的基础上,学习如何使用Spring Boot设置和创建一个基本的批处理驱动应用程序。

2. Maven Dependencies

2.Maven的依赖性

First, let’s add the spring-boot-starter-batch to our pom.xml:

首先,让我们把spring-boot-starter-batch添加到我们的pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>2.7.2</version>
</dependency>

We’ll also add the org.hsqldb dependency, which is available from Maven Central as well:

我们还将添加org.hsqldb依赖,该依赖也可从Maven中心获得。

<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <version>2.5.1</version>
    <scope>runtime</scope>
</dependency>

3. Defining a Simple Spring Batch Job

3.定义一个简单的Spring Batch Job

We’re going to build a job that imports a coffee list from a CSV file, transforms it using a custom processor, and stores the final results in an in-memory database.

我们将建立一个作业,从CSV文件中导入咖啡列表,使用自定义处理器对其进行转换,并将最终结果存储在一个内存数据库中

3.1. Getting Started

3.1.入门

Let’s start by defining our application entry point:

让我们从定义我们的应用程序入口点开始。

@SpringBootApplication
public class SpringBootBatchProcessingApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
    }
}

As we can see, this is a standard Spring Boot application. As we want to use default configuration values where possible, we’re going to use a very light set of application configuration properties.

我们可以看到,这是一个标准的Spring Boot应用程序。由于我们希望尽可能地使用默认的配置值,所以我们将使用一套非常简单的应用程序配置属性。

We’ll define these properties in our src/main/resources/application.properties file:

我们将在src/main/resources/application.properties文件中定义这些属性。

file.input=coffee-list.csv

This property contains the location of our input coffee list. Each line contains the brand, origin, and some characteristics of our coffee:

这个属性包含了我们输入咖啡列表的位置。每一行都包含我们咖啡的品牌、产地和一些特征。

Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey

As we’re going to see, this is a flat CSV file, which means Spring can handle it without any special customization.

正如我们将要看到的,这是一个扁平的CSV文件,这意味着Spring可以处理它而不需要任何特殊的定制。

Next, we’ll add a SQL script schema-all.sql to create our coffee table to store the data:

接下来,我们将添加一个SQL脚本schema-all.sql来创建我们的coffee表来存储数据。

DROP TABLE coffee IF EXISTS;

CREATE TABLE coffee  (
    coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    brand VARCHAR(20),
    origin VARCHAR(20),
    characteristics VARCHAR(30)
);

Conveniently Spring Boot will run this script automatically during startup.

方便的是,Spring Boot会在启动时自动运行这个脚本

3.2. Coffee Domain Class

3.2.咖啡域类

Subsequently, we’ll need a simple domain class to hold our coffee items:

随后,我们将需要一个简单的域类来容纳我们的咖啡项目。

public class Coffee {

    private String brand;
    private String origin;
    private String characteristics;

    public Coffee(String brand, String origin, String characteristics) {
        this.brand = brand;
        this.origin = origin;
        this.characteristics = characteristics;
    }

    // getters and setters
}

As previously mentioned, our Coffee object contains three properties:

如前所述,我们的Coffee对象包含三个属性。

  • A brand
  • An origin
  • Some additional characteristics

4. Job Configuration

4.工作配置

Now, on to the key component, our job configuration. We’ll go step by step, building up our configuration and explaining each part along the way:

现在,到了关键部分,我们的工作配置。我们将一步一步地进行,建立我们的配置,并解释沿途的每一部分。

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Value("${file.input}")
    private String fileInput;
    
    // ...
}

Firstly, we start with a standard Spring @Configuration class. Next, we add a @EnableBatchProcessing annotation to our class. Notably, this gives us access to many useful beans that support jobs and will save us a lot of leg work.

首先,我们从一个标准的Spring @Configuration类开始。接下来,我们为我们的类添加@EnableBatchProcessing注解。值得注意的是,这使我们能够访问许多支持作业的有用的bean,并将为我们节省大量的工作。

Furthermore, using this annotation also provides us with access to two useful factories that we’ll use later when building our job configuration and jobs steps.

此外,使用这个注解还可以让我们访问两个有用的工厂,我们将在以后构建作业配置和作业步骤时使用。

For the last part of our initial configuration, we include a reference to the file.input property we declared previously.

对于我们初始配置的最后一部分,我们包括对我们之前声明的file.input属性的引用。

4.1. A Reader and Writer for Our Job

4.1.为我们工作的读者和写手

Now, we can go ahead and define a reader bean in our configuration:

现在,我们可以继续在我们的配置中定义一个读者Bean。

@Bean
public FlatFileItemReader reader() {
    return new FlatFileItemReaderBuilder().name("coffeeItemReader")
      .resource(new ClassPathResource(fileInput))
      .delimited()
      .names(new String[] { "brand", "origin", "characteristics" })
      .fieldSetMapper(new BeanWrapperFieldSetMapper() {{
          setTargetType(Coffee.class);
      }})
      .build();
}

In short, our reader bean defined above looks for a file called coffee-list.csv and parses each line item into a Coffee object.

简而言之,我们上面定义的阅读器bean寻找一个名为coffee-list.csv的文件,并将每一行的项目解析为一个Coffee对象

Likewise, we define a writer bean:

同样地,我们定义了一个作家Bean。

@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder()
      .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
      .sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
      .dataSource(dataSource)
      .build();
}

This time around, we include the SQL statement needed to insert a single coffee item into our database, driven by the Java bean properties of our Coffee object. Handily the dataSource is automatically created by @EnableBatchProcessing annotation.

这一次,我们在Coffee对象的Java bean属性的驱动下,包含了向数据库中插入单个咖啡项目所需的SQL语句。dataSource是由@EnableBatchProcessing注解自动创建的

4.2. Putting Our Job Together

4.2.把我们的工作放在一起

Lastly, we need to add the actual job steps and configuration:

最后,我们需要添加实际的工作步骤和配置。

@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importUserJob")
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .flow(step1)
      .end()
      .build();
}

@Bean
public Step step1(JdbcBatchItemWriter writer) {
    return stepBuilderFactory.get("step1")
      .<Coffee, Coffee> chunk(10)
      .reader(reader())
      .processor(processor())
      .writer(writer)
      .build();
}

@Bean
public CoffeeItemProcessor processor() {
    return new CoffeeItemProcessor();
}

As we can see, our job is relatively simple and consists of one step defined in the step1 method.

我们可以看到,我们的工作相对简单,由step1方法中定义的一个步骤组成。

Let’s take a look at what this step is doing:

让我们来看看这一步在做什么。

  • First, we configure our step so that it will write up to ten records at a time using the chunk(10) declaration
  • Then, we read in the coffee data using our reader bean, which we set using the reader method
  • Next, we pass each of our coffee items to a custom processor where we apply some custom business logic
  • Finally, we write each coffee item to the database using the writer we saw previously

On the other hand, our importUserJob contains our job definition, which contains an id using the build-in RunIdIncrementer class. We also set a JobCompletionNotificationListener, which we use to get notified when the job completes.

另一方面,我们的importUserJob包含了我们的作业定义,它包含了一个使用内置RunIdIncrementer类的id。我们还设置了一个JobCompletionNotificationListener,当作业完成时,我们用它来获得通知

To complete our job configuration, we list each step (though this job has only one step). We now have a perfectly configured job!

为了完成我们的工作配置,我们列出了每个步骤(尽管这个工作只有一个步骤)。我们现在有了一个完美的配置工作

5. A Custom Coffee Processor

5.定制的咖啡处理器

Let’s take a look in detail at the custom processor we defined previously in our job configuration:

让我们详细看看我们之前在工作配置中定义的自定义处理器。

public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {

    private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);

    @Override
    public Coffee process(final Coffee coffee) throws Exception {
        String brand = coffee.getBrand().toUpperCase();
        String origin = coffee.getOrigin().toUpperCase();
        String chracteristics = coffee.getCharacteristics().toUpperCase();

        Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
        LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);

        return transformedCoffee;
    }
}

Of particular interest, the ItemProcessor interface provides us with a mechanism to apply some specific business logic during our job execution.

特别值得注意的是,ItemProcessor接口为我们提供了一种机制,在作业执行过程中应用一些特定的业务逻辑。

To keep things simple, we define our CoffeeItemProcessor, which takes an input Coffee object and transforms each of the properties to uppercase.

为了保持简单,我们定义了我们的CoffeeItemProcessor,它接收一个输入的Coffee对象并将每个属性转换为大写字母

6. Job Completion

6.工作完成

Additionally, we’re also going to write a JobCompletionNotificationListener to provide some feedback when our job finishes:

此外,我们还要写一个JobCompletionNotificationListener,当我们的工作完成时提供一些反馈。

@Override
public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
        LOGGER.info("!!! JOB FINISHED! Time to verify the results");

        String query = "SELECT brand, origin, characteristics FROM coffee";
        jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
          .forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
    }
}

In the above example, we override the afterJob method and check the job completed successfully. Moreover, we run a trivial query to check that each coffee item was stored in the database successfully.

在上面的例子中,我们覆盖了afterJob方法并检查作业是否成功完成。此外,我们运行一个微不足道的查询,检查每个咖啡项目是否成功存储在数据库中

7. Running Our Job

7.经营我们的工作

Now that we have everything in place to run our job, here comes the fun part. Let’s go ahead and run our job:

现在,我们已经有了运行作业的一切准备,有趣的部分来了。让我们继续运行我们的工作。

...
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  !!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...

As we can see, our job ran successfully, and each coffee item was stored in the database as expected.

正如我们所看到的,我们的工作成功地运行了,每个咖啡项目都按照预期存储在数据库中

8. Conclusion

8.结语

In this article, we’ve learned how to create a simple Spring Batch job using Spring Boot. First, we started by defining some basic configuration.

在这篇文章中,我们已经学会了如何使用Spring Boot创建一个简单的Spring Batch作业。首先,我们从定义一些基本配置开始。

Then, we saw how to add a file reader and database writer. Finally, we took a look at how to apply some custom processing and check our job was executed successfully.

然后,我们看到如何添加一个文件阅读器和数据库写入器。最后,我们看了一下如何应用一些自定义处理,并检查我们的工作是否成功执行。

As always, the full source code of the article is available over on GitHub.

一如既往,文章的完整源代码可在GitHub上获得