Spring Batch 已处理记录的计数示例

Spring Batch 已处理记录的计数示例

原文: https://howtodoinjava.com/spring-batch/records-count-example/

了解如何使用ItemStreamChunkListener计数 Spring Batch 作业处理的记录数,并将记录数记录在日志文件或控制台中。

使用ItemStream计数记录

在给定的ItemStream实现下方,计算定期处理的记录数。

ItemCountItemStream.java

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class ItemCountItemStream implements ItemStream {

	public void open(ExecutionContext executionContext) throws ItemStreamException {
	}

	public void update(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
	}

	public void close() throws ItemStreamException {
	}
}

如何使用ItemCountItemStream

Tasklet中使用SimpleStepBuilder.stream()方法注册上面创建的ItemCountItemStream

BatchConfig.java

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.stream(stream())
			.build();
}

@Bean
public ItemCountItemStream stream() {
	return new ItemCountItemStream();
}

使用ChunkListener计数记录

在给定的ChunkListener实现下方,计算定期处理的记录数。

ItemCountListener.java

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class ItemCountListener implements ChunkListener {

	@Override
	public void beforeChunk(ChunkContext context) {
	}

	@Override
	public void afterChunk(ChunkContext context) {

		int count = context.getStepContext().getStepExecution().getReadCount();
		System.out.println("ItemCount: " + count);
	}

	@Override
	public void afterChunkError(ChunkContext context) {
	}
}

如何使用ItemCountListener

Tasklet中使用SimpleStepBuilder.listener()方法注册上面创建的ItemCountListener

BatchConfig.java

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.listener(listener())
			.build();
}

@Bean
public ItemCountListener listener() {
	return new ItemCountListener();
}

计数记录演示

我正在使用上述ItemCountListener配置来处理此 CSV。

inputData.csv

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

完整的批处理配置如下所示:

BatchConfig.java

package com.howtodoinjava.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

import com.howtodoinjava.demo.model.Employee;

@Configuration
@EnableBatchProcessing
public class BatchConfig 
{
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Value("/input/inputData.csv")
	private Resource inputResource;

	@Bean
	public Job readCSVFilesJob() {
		return jobBuilderFactory
				.get("readCSVFilesJob")
				.incrementer(new RunIdIncrementer())
				.start(step1())
				.build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory
				.get("step1")
				.<Employee, Employee>chunk(1)
				.reader(reader())
				.writer(writer())
				.listener(listner())
				.build();
	}

	@Bean
	public ItemCountListener listner() {
		return new ItemCountListener();
	}

	@Bean
	public FlatFileItemReader<Employee> reader() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
		itemReader.setLineMapper(lineMapper());
		itemReader.setLinesToSkip(1);
		itemReader.setResource(inputResource);
		return itemReader;
	}

	@Bean
	public LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
		lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}

	@Bean
	public ConsoleItemWriter<Employee> writer() {
		return new ConsoleItemWriter<Employee>();
	}
}

作为 Spring 启动应用程序启动该应用程序。 Spring 任务调度器将开始工作。

App.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class App
{
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
    }

    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

现在观看控制台。

Console

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]

2018-07-11 16:38:00 INFO  - Executing step: [step1]

Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

将我的问题放在评论部分。

学习愉快!

参考:

ItemStream Java 文档

ChunkListener Java 文档