1. Overview
1.概述
Spring Batch is a powerful framework for batch processing in Java, thus making it a popular choice for data processing activities and scheduled job runs. Depending on the business logic complexity, a job can rely on different configuration values and dynamic parameters.
Spring批处理是一个功能强大的Java批处理框架,因此成为数据处理活动和计划作业运行的热门选择。根据业务逻辑的复杂程度,作业可以依赖于不同的配置值和动态参数。
In this article, we’ll explore how to work with JobParameters and how to access them from essential batch components.
在本文中,我们将探讨如何使用 JobParameters 以及如何从基本批处理组件访问它们。
2. Demo Setup
2.演示设置
We’ll develop a Spring Batch for a pharmacy service. The main business task is to find medications that expire soon, calculate new prices based on sales, and notify consumers about meds that are about to expire. Additionally, we’ll read from the in-memory H2 database and write all processing details to logs to simplify implementation.
我们将为一家药房开发 Spring Batch 服务。主要业务任务是查找即将过期的药品,根据销售情况计算新价格,并通知消费者即将过期的药品。此外,我们将从内存中的 H2 数据库读取数据,并将所有处理细节写入日志以简化实现。
2.1. Dependencies
2.1 依赖性
To start with the demo application, we need to add Spring Batch and H2 dependencies:
要开始演示应用程序,我们需要添加 Spring Batch 和 H2 依赖项:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.2.0</version>
</dependency>
We can find the latest H2 and Spring Batch versions in the Maven Central repository.
我们可以在 Maven Central 资源库中找到最新的 H2 和 Spring Batch 版本。
2.2. Prepare Test Data
2.2.准备测试数据
Let’s start by defining the schema in schema-all.sql:
首先,让我们在 schema-all.sql 中定义 schema :
DROP TABLE medicine IF EXISTS;
CREATE TABLE medicine (
med_id VARCHAR(36) PRIMARY KEY,
name VARCHAR(30),
type VARCHAR(30),
expiration_date TIMESTAMP,
original_price DECIMAL,
sale_price DECIMAL
);
Initial test data is provided in data.sql:
初始测试数据在 data.sql 中提供:
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
Spring Boot runs these files as part of the application startup and we’ll use these test data in our test executions.
Spring Boot 运行这些文件是应用程序启动的一部分,我们将在测试执行中使用这些测试数据。
2.3. Medicine Domain Class
2.3.医学领域类
For our service, we’ll need a simple Medicine entity class:
对于我们的服务,我们需要一个简单的 Medicine 实体类:
@AllArgsConstructor
@Data
public class Medicine {
private UUID id;
private String name;
private MedicineCategory type;
private Timestamp expirationDate;
private Double originalPrice;
private Double salePrice;
}
ItemReader uses the expirationDate field to calculate if the medication expires soon. The salePrice field will be updated by ItemProcessor when the medication is close to the expiration date.
ItemReader 使用 expirationDate 字段计算药品是否即将过期。当药品接近过期日期时,ItemProcessor 将更新 salePrice 字段。
2.4. Application Properties
2.4.应用程序属性
The application needs multiple properties in the src/main/resources/application.properties file:
应用程序需要 src/main/resources/application.properties 文件中的多个属性:
spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1
As we’ll configure only one job, spring.batch.job.enabled should be set to false to disable the initial job execution. By default, Spring runs the job after the context startup with empty parameters:
由于我们将只配置一个作业,spring.batch.job.enabled应设置为false以禁用初始作业执行。默认情况下,Spring 会在上下文启动后以空参运行作业:
[main] INFO o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []
The batch.medicine.cron property defines the cron expression for the scheduled run. Based on the defined scenario, we should run the job daily. However, in our case, the job starts every minute to be able to check the processing behavior easily.
batch.medicine.cron 属性定义了用于计划运行的 cron 表达式。根据定义的方案,我们应该每天运行一次作业。不过,在我们的案例中,为了方便检查处理行为,作业每分钟启动一次。
Other properties are needed for InputReader, InputProcessor, and InpurWriter to perform business logic.
InputReader、InputProcessor 和 InpurWriter 需要其他属性来执行业务逻辑。
3. Job Parameters
3.工作参数
Spring Batch includes a JobParameters class designed to store runtime parameters for a particular job run. This functionality proves beneficial in various situations. For instance, it allows the passing of dynamic variables generated during a specific run. Moreover, it makes it possible to create a controller that can initiate a job based on parameters provided by the client.
Spring批处理包含一个JobParameters类,用于存储特定作业运行时的参数。该功能在各种情况下都证明是有益的。例如,它允许传递在特定运行期间生成的动态变量。此外,它还可以创建一个控制器,根据客户端提供的参数启动作业。
In our scenario, we’ll utilize this class to hold application parameters and dynamics runtime parameters.
在我们的方案中,我们将利用该类来保存应用程序参数和动态运行时参数。
3.1. StepScope and JobScope
3.1. 步骤范围和 任务范围</em
In addition to the well-known bean scopes in regular Spring, Spring Batch introduces two additional scopes: StepScope and JobScope. With these scopes, it becomes possible to create unique beans for each step or job in a workflow. Spring ensures that the resources associated with a particular step/job are isolated and managed independently throughout its lifecycle.
除了常规 Spring 中众所周知的 Bean 作用域外,Spring Batch 还引入了两个额外的作用域:StepScope和JobScope。有了这些作用域,就可以为工作流中的每个步骤或作业创建唯一的 Bean。Spring 可确保与特定步骤/任务相关的资源在其整个生命周期中都是独立隔离和管理的。
Having this feature, we can easily control contexts and share all the needed properties across read, process, and write parts for specific runs. To be able to inject job parameters we need to annotate depending beans with @StepScope or @JobScope.
有了这项功能,我们就可以轻松地控制上下文,并在特定运行的读取、处理和写入部分中共享所有需要的属性。要注入作业参数,我们需要用 @StepScope 或 @JobScope 对依赖 bean 进行注解。
3.2. Populate Job Parameters in Scheduled Execution
3.2.在计划执行中填充任务参数
Let’s define the MedExpirationBatchRunner class that will start our job by cron expression (every 1 minute in our case). We should annotate the class with @EnableScheduling and define the appropriate @Scheduled entry method:
让我们定义MedExpirationBatchRunner类,它将通过cron 表达式(在我们的例子中为每 1 分钟)启动我们的作业。我们应使用 @EnableScheduling 对该类进行注解,并定义适当的 @Scheduled 输入方法:
@Component
@EnableScheduling
public class MedExpirationBatchRunner {
...
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
public void runJob() {
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
launchJob(now);
}
}
As we want to launch the job manually, we should use the JobLaucher class and provide a populated JobParameter in JobLauncher#run() method. In our example, we’ve provided values from application.properties as well as two run-specific parameters (date when the job got triggered and trace id):
由于我们要手动启动作业,因此应使用 JobLaucher 类,并在 JobLauncher#run() 方法中提供已填充的 JobParameter 。在我们的示例中,我们提供了来自 application.properties 的值以及两个特定于运行的参数(触发作业的日期和跟踪 id):
public void launchJob(ZonedDateTime triggerZonedDateTime) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
.addString(BatchConstants.ALERT_TYPE, alertType)
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
.addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
.toJobParameters();
jobLauncher.run(medExpirationJob, jobParameters);
} catch (Exception e) {
log.error("Failed to run", e);
}
}
After configuring parameters, we have several options for how to use these values in code.
配置参数后,我们有几个选项可以选择如何在代码中使用这些值。
3.3. Read Job Parameters in Bean Definition
3.3.在 Bean 定义中读取任务参数
Using SpEL we can access job parameters from a bean definition in our configuration class. Spring combines all parameters to a regular String to Object map:
使用 SpEL,我们可以从配置类中的 Bean 定义访问任务参数。Spring 将所有参数合并为一个常规的 String 到 Object 的映射:
@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
...
}
Inside the method, we’ll use jobParameters to initiate the proper fields of MedicineProcessor.
在该方法中,我们将使用 jobParameters 来启动 MedicineProcessor 的适当字段。
</em
3.4. Read Job Parameters in Service Directly
3.4.直接读取服务中的任务参数
Another option is to use setter injection in the ItemReader itself. We can fetch the exact parameter value just like from any other map via SpEL expression:
另一种方法是在 ItemReader 本身中使用 setter 注入。我们可以通过 SpEL 表达式像从其他地图一样获取准确的参数值:
@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
}
We just need to ensure that the key used in SpEL is the same as the key used during parameters initialization.
我们只需确保 SpEL 中使用的密钥与参数初始化时使用的密钥相同。
3.5. Read Job Parameters via Before Step
3.5.通过步骤前读取任务参数
Spring Batch provides a StepExecutionListener interface that allows us to listen for step execution phases: before the step starts and once the step is completed. We can utilize this feature, access properties before the step is started, and perform any custom logic. The easiest way is just to use @BeforeStep annotation which corresponds to beforeStep() method from StepExecutionListener:
Spring批处理提供了一个StepExecutionListener接口,允许我们监听步骤执行阶段:步骤开始前和步骤完成后。我们可以利用这一功能,在步骤开始前访问属性,并执行任何自定义逻辑。最简单的方法是使用 @BeforeStep 注解,该注解对应于 StepExecutionListener 中的 beforeStep() 方法:
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
JobParameters parameters = stepExecution.getJobExecution()
.getJobParameters();
...
log.info("Before step params: {}", parameters);
}
4. Job Configuration
4.工作配置
Let’s combine all the parts to see the whole picture.
让我们把所有部分结合起来,看看整个画面。
There are two properties, that are required for the reader, processor, and writer: BatchConstants.TRIGGERED_DATE_TIME and BatchConstants.TRACE_ID.
读取器、处理器和写入器都需要两个属性:BatchConstants.TRIGGERED_DATE_TIME 和 BatchConstants.TRACE_ID. </em
We’ll use the same extraction logic for common parameters from all step bean definitions:
我们将对所有步骤 bean 定义中的通用参数使用相同的提取逻辑:
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
.toString()));
}
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
}
}
Altogether other parameters are component-specific and don’t have common logic.
其他参数都是针对特定组件的,没有通用逻辑。
</em
4.1. Configuring ItemReader
4.1.配置项目阅读器</em
At first, we want to configure ExpiresSoonMedicineReader and enrich common parameters:
首先,我们要配置 ExpiresSoonMedicineReader 并丰富常用参数:
@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
enrichWithJobParameters(jobParameters, medicineReader);
return medicineReader;
}
Let’s take a closer look at the exact reader implementation. TriggeredDateTime and traceId parameters are injected directly during bean construction, defaultExpiration parameter is injected by Spring via setter. For demonstration, we have used all of them in doOpen() method:
让我们仔细看看阅读器的具体实现。TriggeredDateTime 和 traceId 参数在构建 Bean 时直接注入,而 defaultExpiration 参数则由 Spring 通过设置器注入。为便于演示,我们在 doOpen() 方法中使用了所有这些参数:
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
private ZonedDateTime triggeredDateTime;
private String traceId;
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
private List<Medicine> expiringMedicineList;
...
@Override
protected void doOpen() {
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
if (!expiringMedicineList.isEmpty()) {
setMaxItemCount(expiringMedicineList.size());
}
}
@PostConstruct
public void init() {
setName(ClassUtils.getShortName(getClass()));
}
}
ItemReader should not be marked as @Component. Also, we need to call setName() method to set the required reader name.
ItemReader 不应标记为 @Component。此外,我们还需要调用 setName() 方法来设置所需的阅读器名称。
4.2. Configuring ItemProcessor and ItemWriter
4.2.配置 ItemProcessor 和 ItemWriter</em
ItemProcessor and ItemWriter follow the same approaches as ItemReader. So they don’t require any specific configuration to access parameters. The bean definition logic initializes common parameters through the enrichWithJobParameters() method. Other parameters, that are used by a single class and do not need to be populated in all components, are enriched by Spring through setter injection in the corresponding classes.
ItemProcessor 和 ItemWriter 采用与 ItemReader 相同的方法。因此,它们不需要任何特定配置来访问参数。Bean 定义逻辑通过 enrichWithJobParameters() 方法初始化常用参数。其他参数由单个类使用,无需在所有组件中填充,Spring 会通过在相应类中注入设置器来丰富这些参数。
We should mark all properties-dependent beans with @StepScope annotation. Otherwise, Spring will create beans only once at context startup and will not have parameters’ values to inject.
我们应该使用 @StepScope 注解标记所有依赖于属性的 Bean。否则,Spring 将仅在上下文启动时创建一次 Bean,并且不会注入参数值。
4.3. Configuring Complete Flow
4.3.配置完整流程
We don’t need to take any specific action to configure the job with parameters. Therefore we just need to combine all the beans:
我们不需要采取任何特定操作来配置工作参数。因此,我们只需将所有Bean组合起来:
@Bean
public Job medExpirationJob(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
MedicineWriter medicineWriter,
MedicineProcessor medicineProcessor,
ExpiresSoonMedicineReader expiresSoonMedicineReader) {
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
.reader(expiresSoonMedicineReader)
.processor(medicineProcessor)
.writer(medicineWriter)
.faultTolerant()
.transactionManager(transactionManager)
.build();
return new JobBuilder("medExpirationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(notifyAboutExpiringMedicine)
.build();
}
5. Running the Application
5.运行应用程序
Let’s run a complete example and see how the application uses all parameters. We need to start the Spring Boot application from the SpringBatchExpireMedicationApplication class.
让我们运行一个完整的示例,看看应用程序是如何使用所有参数的。我们需要从 SpringBatchExpireMedicationApplication 类启动 Spring Boot 应用程序。
As soon as the scheduled method executes, Spring logs all parameters:
一旦计划方法执行完毕,Spring 就会记录所有参数:
INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]
Firstly, ItemReader writes info about meds that have been found based on the DEFAULT_EXPIRATION parameter:
首先,ItemReader根据DEFAULT_EXPIRATION参数写入已找到的药品信息:
INFO c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon
Secondly, ItemProcessor uses SALE_STARTS_DAYS and MEDICINE_SALE parameters to calculate new prices:
其次,ItemProcessor使用SALE_STARTS_DAYS和MEDICINE_SALE参数来计算新价格:
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb
Lastly, ItemWriter writes updated medications to logs within the same trace:
最后,ItemWriter 将更新后的药物写入同一跟踪内的日志:
</em
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z
6. Conclusion
6.结论
In this article, we’ve learned how to work with Job Parameters in Spring Batch. ItemReader, ItemProcessor, and ItemWriter can be manually enriched with parameters during bean initialization or might be enriched by Spring via @BeforeStep or setter injection.
在本文中,我们学习了如何在 Spring Batch 中使用作业参数。ItemReader、ItemProcessor 和 ItemWriter 可以在 Bean 初始化期间手动添加参数,也可以由 Spring 通过 @BeforeStep 或 setter 注入添加参数。
As always, the complete examples are available over on GitHub.
一如既往,完整的示例可在 GitHub 上获取。