Introduction to Spring Batch – Spring批处理简介

最后修改: 2015年 12月 28日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’re going to look at a practical, code-focused intro to Spring Batch. Spring Batch is a processing framework designed for robust execution of jobs.

在本教程中,我们要看的是一个实用的、以代码为中心的Spring Batch介绍。Spring Batch是一个处理框架,旨在稳健地执行作业。

It’s current version 4.3 supports Spring 5 and Java 8. It also accommodates JSR-352, which is the new java specification for batch processing.

它目前的4.3版本支持Spring 5和Java 8。它还适应JSR-352,这是关于批处理的新java规范。

Here are a few interesting and practical use cases of the framework.

下面是框架的一些有趣和实用的使用案例。

2. Workflow Basics

2.工作流程基础知识

Spring Batch follows the traditional batch architecture where a job repository does the work of scheduling and interacting with the job.

Spring Batch遵循传统的批处理架构,由作业库完成调度和与作业互动的工作。

A job can have more than one step. And every step typically follows the sequence of reading data, processing it and writing it.

一项工作可以有不止一个步骤。而每一步通常都遵循读取数据、处理数据和写入数据的顺序。

And of course the framework will do most of the heavy lifting for us here — especially when it comes to the low-level persistence work of dealing with the jobs — using sqlite for the job repository.

当然,框架将在这里为我们做大部分繁重的工作–特别是当涉及到处理作业的低级持久性工作时–使用sqlite作为作业库。

2.1. Example Use Case

2.1.用例

The simple use case we’re going to tackle here is migrating some financial transaction data from CSV to XML.

我们在这里要解决的简单用例是将一些金融交易数据从CSV迁移到XML。

The input file has a very simple structure.

输入文件有一个非常简单的结构。

It contains a transaction per line, made up of a username, the user id, the date of the transaction and the amount:

它每行包含一个交易,由一个用户名、用户ID、交易日期和金额组成。

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3. The Maven POM

3.Maven的POM

Dependencies required for this project are Spring Core, Spring Batch and sqlite JDBC connector:

这个项目需要的依赖是Spring Core、Spring Batch和sqlite JDBC连接器。

<!-- SQLite database driver -->
<dependency>
    <groupId>org.xerial</groupId>
    <artifactId>sqlite-jdbc</artifactId>
    <version>3.15.1</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
    <version>5.3.0</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>5.3.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>4.3.0</version>
</dependency>

4. Spring Batch Config

4.Spring Batch配置

The first thing we’ll do is configure Spring Batch with XML:

我们要做的第一件事是用XML配置Spring Batch。

<!-- connect to SQLite database -->
<bean id="dataSource"
  class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.sqlite.JDBC" />
    <property name="url" value="jdbc:sqlite:repository.sqlite" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>

<!-- create job-meta tables automatically -->
<jdbc:initialize-database data-source="dataSource">
    <jdbc:script
      location="org/springframework/batch/core/schema-drop-sqlite.sql" />
    <jdbc:script location="org/springframework/batch/core/schema-sqlite.sql" />
</jdbc:initialize-database>

<!-- stored job-meta in memory -->
<!-- 
<bean id="jobRepository" 
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> 
    <property name="transactionManager" ref="transactionManager" />
</bean> 
-->

<!-- stored job-meta in database -->
<bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="transactionManager" ref="transactionManager" />
    <property name="databaseType" value="sqlite" />
</bean>

<bean id="transactionManager" class=
  "org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

Of course, a Java configuration is also available:

当然,也可以采用Java配置。

@Configuration
@EnableBatchProcessing
@Profile("spring")
public class SpringConfig {

    @Value("org/springframework/batch/core/schema-drop-sqlite.sql")
    private Resource dropReopsitoryTables;

    @Value("org/springframework/batch/core/schema-sqlite.sql")
    private Resource dataReopsitorySchema;

    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("org.sqlite.JDBC");
        dataSource.setUrl("jdbc:sqlite:repository.sqlite");
        return dataSource;
    }

    @Bean
    public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
      throws MalformedURLException {
        ResourceDatabasePopulator databasePopulator = 
          new ResourceDatabasePopulator();

        databasePopulator.addScript(dropReopsitoryTables);
        databasePopulator.addScript(dataReopsitorySchema);
        databasePopulator.setIgnoreFailedDrops(true);

        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource);
        initializer.setDatabasePopulator(databasePopulator);

        return initializer;
    }

    private JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource());
        factory.setTransactionManager(getTransactionManager());
        factory.afterPropertiesSet();
        return (JobRepository) factory.getObject();
    }

    private PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

5. Spring Batch Job Config

5.Spring Batch Job配置

Let’s now write our job description for the CSV to XML work:

现在让我们为CSV转XML的工作写下我们的工作描述。

<import resource="spring.xml" />

<bean id="record" class="com.baeldung.spring_batch_intro.model.Transaction"></bean>
<bean id="itemReader"
  class="org.springframework.batch.item.file.FlatFileItemReader">

    <property name="resource" value="input/record.csv" />

    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class=
                  "org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names" value="username,userid,transactiondate,amount" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.baeldung.spring_batch_intro.service.RecordFieldSetMapper" />
            </property>
        </bean>
    </property>
</bean>

<bean id="itemProcessor"
  class="com.baeldung.spring_batch_intro.service.CustomItemProcessor" />

<bean id="itemWriter"
  class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="recordMarshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.baeldung.spring_batch_intro.model.Transaction</value>
        </list>
    </property>
</bean>
<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

And here’s the similar Java-based job config:

而这里是类似的基于Java的工作配置。

@Profile("spring")
public class SpringBatchConfig {
    
    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    @Bean
    public ItemReader<Transaction> itemReader()
      throws UnexpectedInputException, ParseException {
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[] tokens = { "username", "userid", "transactiondate", "amount" };
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Transaction> lineMapper = 
          new DefaultLineMapper<Transaction>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<Transaction, Transaction> itemProcessor() {
        return new CustomItemProcessor();
    }

    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
      throws MalformedURLException {
        StaxEventItemWriter<Transaction> itemWriter = 
          new StaxEventItemWriter<Transaction>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }

    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(new Class[] { Transaction.class });
        return marshaller;
    }

    @Bean
    protected Step step1(ItemReader<Transaction> reader,
      ItemProcessor<Transaction, Transaction> processor,
      ItemWriter<Transaction> writer) {
        return steps.get("step1").<Transaction, Transaction> chunk(10)
          .reader(reader).processor(processor).writer(writer).build();
    }

    @Bean(name = "firstBatchJob")
    public Job job(@Qualifier("step1") Step step1) {
        return jobs.get("firstBatchJob").start(step1).build();
    }
}

Now that we have the whole config, let’s break it down and start discussing it.

现在我们有了整个配置,让我们把它分解开来,开始讨论它。

5.1. Read Data and Create Objects With ItemReader

5.1.使用ItemReader读取数据和创建对象

First, we configured the cvsFileItemReader that will read the data from the record.csv and convert it into the Transaction object:

首先,我们配置了cvsFileItemReader,它将从record.csv中读取数据并将其转换成Transaction对象。

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private LocalDateTime transactionDate;
    private double amount;

    /* getters and setters for the attributes */

    @Override
    public String toString() {
        return "Transaction [username=" + username + ", userId=" + userId
          + ", transactionDate=" + transactionDate + ", amount=" + amount
          + "]";
    }
}

To do so, it uses a custom mapper:

为了做到这一点,它使用了一个自定义映射器。

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
 
    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
        Transaction transaction = new Transaction();
 
        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt(1));
        transaction.setAmount(fieldSet.readDouble(3));
        String dateString = fieldSet.readString(2);
        transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
        return transaction;
    }
}

5.2. Processing Data With ItemProcessor

5.2.用ItemProcessor处理数据

We have created our own item processor, CustomItemProcessor. This doesn’t process anything related to the transaction object.

我们已经创建了我们自己的项目处理器,CustomItemProcessor。这并不处理任何与交易对象有关的东西。

All it does is pass the original object coming from reader to the writer:

它所做的只是把来自读者的原始对象传递给作者。

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

    public Transaction process(Transaction item) {
        return item;
    }
}

5.3. Writing Objects to the FS With ItemWriter

5.3.用ItemWriter将对象写到FS中

Finally, we are going to store this transaction into an XML file located at xml/output.xml:

最后,我们要把这个交易存入一个位于xml/output.xml的XML文件。

<bean id="itemWriter"
  class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="recordMarshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

5.4. Configuring the Batch Job

5.4.配置批处理作业

So, all we have to do is connect the dots with a job using the batch:job syntax.

因此,我们所要做的就是使用batch:job语法将点与作业连接起来。

Note the commit-interval. That’s the number of transactions to be kept in memory before committing the batch to the itemWriter.

注意commit-interval。这是在向itemWriter提交批处理之前要保留在内存中的事务数量。

It will hold the transactions in memory until that point (or until the end of the input data is encountered):

它将在内存中保留交易,直到该点(或直到遇到输入数据的终点)。

<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

5.5. Running the Batch Job

5.5.运行批处理作业

Now let’s set up and run everything:

现在让我们来设置和运行一切。

@Profile("spring")
public class App {
    public static void main(String[] args) {
        // Spring Java config
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SpringConfig.class);
        context.register(SpringBatchConfig.class);
        context.refresh();
        
        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("firstBatchJob");
        System.out.println("Starting the batch job");
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Status : " + execution.getStatus());
            System.out.println("Job completed");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Job failed");
        }
    }
}

We run our Spring application using -Dspring.profiles.active=spring profile.

我们使用-Dspring.profiles.active=spring配置文件运行我们的Spring应用程序。

In the next section, we configure our example in a Spring Boot application.

在下一节,我们在Spring Boot应用程序中配置我们的例子。

6. Spring Boot Configuration

6.Spring Boot配置

In this section, we’ll create a Spring Boot application and convert the previous Spring Batch Config to run in the Spring Boot environment. In fact, this is roughly the equivalent of the previous Spring Batch example.

在本节中,我们将创建一个Spring Boot应用程序,并将之前的Spring Batch配置转换为在Spring Boot环境中运行。事实上,这与之前的Spring Batch例子大致相当。

6.1. Maven Dependencies

6.1.Maven的依赖性

Let’s start by declaring spring-boot-starter-batch dependency in a Spring Boot application in the pom.xml:

让我们先在pom.xml中声明Spring Boot应用程序中的spring-boot-starter-batch依赖性。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

We need a database to store the Spring Batch job information. In this tutorial, we use an in-memory HSQLDB database. Therefore, we need to use hsqldb with Spring Boot:

我们需要一个数据库来存储Spring Batch作业信息。在本教程中,我们使用内存中的HSQLDB数据库。因此,我们需要在Spring Boot中使用hsqldb

<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <version>2.7.0</version>
    <scope>runtime</scope>
</dependency>

6.2. Spring Boot Config

6.2.Spring Boot配置

We use the @Profile annotation to distinguish between the Spring and Spring Boot configurations. We set the spring-boot profile in our application:

我们使用@Profile注解来区分Spring和Spring Boot的配置。我们在应用程序中设置spring-boot配置文件。

@SpringBootApplication
public class SpringBatchApplication {

    public static void main(String[] args) {
        SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
        springApp.setAdditionalProfiles("spring-boot");
        springApp.run(args);
    }

}

6.3. Spring Batch Job Config

6.3.Spring Batch Job配置

We use the batch job configuration the same as the SpringBatchConfig class from earlier:

我们使用的批处理作业配置与前面的SpringBatchConfig类相同。

@Configuration
@EnableBatchProcessing
@Profile("spring-boot")
public class SpringBootBatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("input/recordWithInvalidData.csv")
    private Resource invalidInputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    // ...
}

We start with a Spring @Configuration annotation. Then, we add the @EnableBatchProcessing annotation to the class. The @EnableBatchProcessing annotation automatically creates the dataSource object and provides it to our job.

我们从Spring的@Configuration注解开始。然后,我们向该类添加@EnableBatchProcessing注解。@EnableBatchProcessing注解自动创建dataSource对象,并将其提供给我们的工作

7. Conclusion

7.结论

In this article, we learned how to work with Spring Batch and how to use it in a simple use case.

在这篇文章中,我们学习了如何使用Spring Batch以及如何在一个简单的用例中使用它。

We saw how we can easily develop our batch processing pipeline and how we can customize different stages in reading, processing, and writing.

我们看到了我们如何能够轻松地开发我们的批处理管道,以及我们如何能够定制读取、处理和写入的不同阶段。

As always, the full implementation of this article can be found in over on GitHub.

一如既往,本文的完整实现可以在GitHub上找到。