SpringBatchでChunk、Event Listenerの実装方法と起動まで解説しています

SpringBoot

はじめに

Chunkに関して、学んだことをまとめました。

こちらの記事を読む事によって、簡単なChunkの実装方法と起動までを学ぶことが出来ます。

Chunkの基本概念

  • SpringBatchでは大量のデータを一定のサイズの「チャンク(塊)」に分割して処理します
  • 一度にすべてのデータを処理するのではなく、一定の単位で区切って処理する事でメモリの消費を抑えつつ効率的な処理を実現します。

一定の単位で区切るとは、具体的には…

  • 読み取り

Item Reader : データを読み取るコンポーネント

  • 処理

ItemProcessor : データを読み取るコンポーネント

  • 書き込み

ItemWriter : データを出力/保存するコンポーネント

Chunkの処理フロー

  1. Readerが1件ずつデータを取得する。
  2. Processorがデータを加工する。(必要に応じてフィルタリングも可能)
  3. Chunkサイズに達したら、Writerがバッチでデータを書き込む。
  4. トランザクションがコミットされる。
  5. 上記を繰り返して処理をする。

ItemReaderを実装する

以下のように「chunk」パッケージと「Reader.java」を作成します。

Reader .java
package com.batch.chunk;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@StepScope
@Slf4j

// 文字列を読み込むReader
// 型はString型で読み取る
public class Reader implements ItemReader<String>{
	// 繰り返し用のフィールド
	private int intIndex = 0;
	
	@Override
	public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		
		// nullになるまで繰り返して表示する
		String[] readDataList = {"rice", "miso", "natto", null};
		
		// @Slf4jを適用しているためlog.infoとなる
		// System.out.printlnでもOKです
		// readDataListを1つずつ出力
		log.info("readData={}" , readDataList[intIndex]);
		
		// インクリメントする
		return readDataList[intIndex++];
	}

}

ItemProcessorを実装する

以下のように「Reader.java」を作成します。

Processor ,java
package com.batch.chunk;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@StepScope
@Slf4j
// 型は文字列を受け取り文字列を返す
public class Processor implements ItemProcessor<String, String>{

	@Override
	public String process(String item) throws Exception {
		
		// 読み込んだ文字列を大文字に変換して返す
		item = item.toUpperCase();
		
		log.info("processor item={}" , item);
		return item;
	}

}

ItemWriterを実装する

以下のように「Writer.java」を作成します。

Writer .java
package com.batch.chunk;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@StepScope
@Slf4j
// 型は受け取った文字列を文字列で返す
public class Writer implements ItemWriter<String>{@Override
	public void write(Chunk<? extends String> chunk) throws Exception {
		log.info("Writer chunk={}" , chunk);
		log.info("---------------------------");
		
	}
	
}

SpringConfigに定義を記述する

ここまでで、「ItemReader」「ItemProcessor」「ItemWriter」三つを実装したと思いますので、「SpringConfig.java」に定義を記述していきます。

SpringConfig .java
package com.batch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.batch.validator.Tasklet1JobParametersValidator;

/**
 * 設定クラス
 * このクラスはアプリケーションの設定を管理します。
 */

@Configuration
public class SpringConfig {
	// ジョブの実行を管理する
	private final JobLauncher jobLauncher;
	// ジョブの実行結果や進行状況をデータベースへ記録する
	private final JobRepository  jobRepository;
	// トランザクションを開始し、コミットまたはロールバックする
	private final PlatformTransactionManager platformTransactionManager;
	
	@Autowired
	@Qualifier("Tasklet1")
	private Tasklet Tasklet1;
	
	@Autowired
	@Qualifier("Tasklet2")
	private Tasklet Tasklet2;
	
	@Autowired
	private ItemReader<String> Reader;
	
	@Autowired
	private ItemProcessor<String , String> Processor;
	
	@Autowired
	private ItemWriter<String> Writer;
	

	public SpringConfig(JobLauncher jobLauncher, JobRepository jobRepository,
			PlatformTransactionManager platformTransactionManager) {
		this.jobLauncher = jobLauncher;
		this.jobRepository = jobRepository;
		this.platformTransactionManager = platformTransactionManager;
	}
	
	// 検証クラスの呼び出し
	@Bean
	public JobParametersValidator jobParametersValidator() {
		return new Tasklet1JobParametersValidator();
	}
	
	
	// Stepを定義する
	@Bean
	public Step tasklet1Step1() {
		// 第一引数はStep名を定義する
		return new StepBuilder("tasklet1Step1" , jobRepository)
				// Taskletを指定する
				.tasklet(Tasklet1 , platformTransactionManager)
				.build();
	}
	
	// Stepを定義する
		@Bean
		public Step tasklet1Step2() {
			// 第一引数はStep名を定義する
			return new StepBuilder("tasklet1Step2" , jobRepository)
					// Taskletを指定する
					.tasklet(Tasklet2 , platformTransactionManager)
					.build();
		}
		
	// 
	@Bean
	public Step ChunkStep() {
		// 第一引数はStep名を定義する
		return new StepBuilder( "ChunkStep" ,jobRepository )
		// 任意のchunkサイズに第一引数で変える事が出来る
				.<String , String>chunk(1 , platformTransactionManager)
				.reader(Reader)
				.processor(Processor)
				.writer(Writer)
				.build();
	}
	
	// Jobを定義する
	@Bean
	public Job tasklet1Job1() {
		// 第一引数はJob名を定義する
		return new JobBuilder("tasklet1Job1" , jobRepository)
				// ジョブを実行するたびに一意のIDを生成
				.incrementer(new RunIdIncrementer())
				// 実行するStepを指定する
				.start(tasklet1Step1())
				.next(tasklet1Step2())
				// chunkを指定する
				.next(ChunkStep())
				// 検証する
				.validator(jobParametersValidator())
				.build();
	}
	
}

chunkサイズを変えて実行する

SpringBatchで大事なのは実行する時のChunkサイズの指定によって、書き込まれるスピードとメモリの消費量が変動していく事です。

以下はchunkサイズ1の場合の実行例です。

Java
.<String , String>chunk(1 , platformTransactionManager)

実行してみます。

2024-12-07 17:20:54,732 INFO Executing step: [ChunkStep] 
2024-12-07 17:20:54,863 INFO readData=rice 
2024-12-07 17:20:54,865 INFO processor item=RICE 
2024-12-07 17:20:54,866 INFO Writer chunk=[items=[RICE], skips=[]] 
2024-12-07 17:20:54,866 INFO --------------------------- 
2024-12-07 17:20:54,921 INFO readData=miso 
2024-12-07 17:20:54,922 INFO processor item=MISO 
2024-12-07 17:20:54,922 INFO Writer chunk=[items=[MISO], skips=[]] 
2024-12-07 17:20:54,922 INFO --------------------------- 
2024-12-07 17:20:55,318 INFO readData=natto 
2024-12-07 17:20:55,318 INFO processor item=NATTO 
2024-12-07 17:20:55,318 INFO Writer chunk=[items=[NATTO], skips=[]] 
2024-12-07 17:20:55,318 INFO --------------------------- 
2024-12-07 17:20:55,360 INFO readData=null 
2024-12-07 17:20:55,484 INFO Step: [ChunkStep] executed in 751ms

では続いて3の場合を見てみましょう。

Java
.<String , String>chunk(3 , platformTransactionManager)
2024-12-07 17:28:56,358 INFO Executing step: [ChunkStep] 
2024-12-07 17:28:56,427 INFO readData=rice 
2024-12-07 17:28:56,429 INFO readData=miso 
2024-12-07 17:28:56,429 INFO readData=natto 
2024-12-07 17:28:56,430 INFO processor item=RICE 
2024-12-07 17:28:56,431 INFO processor item=MISO 
2024-12-07 17:28:56,431 INFO processor item=NATTO 
2024-12-07 17:28:56,431 INFO Writer chunk=[items=[RICE, MISO, NATTO], skips=[]] 
2024-12-07 17:28:56,431 INFO --------------------------- 
2024-12-07 17:28:56,464 INFO readData=null 
2024-12-07 17:28:56,502 INFO Step: [ChunkStep] executed in 143ms 

1の場合は「rice」を全て読み込みから書き込みまで一貫して行い、次の「miso」を処理しているのに対して3の場合は全て読み込んでから書き込みをしています。

このように、chunkサイズが大きいほど、処理速度は上昇しますが、メモリの消費量も上がります。

Event Listener

SpringBatchのイベントリスナーは、ハッチ処理の実行中に発生するイベント(開始、終了、エラー)などに対して処理を追加するための仕組みです。

様々なイベントリスナーがありますが、「JobExecutionListener」を実装していきます。

JobExecutionListenerの実装

以下のように「listener」パッケージと「TestJobExecutionListner.java」を作成しましょう。

TestJobExecutionListner.java
package com.batch.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class TestJobExecutionListner implements JobExecutionListener{

	@Override
	public void beforeJob(JobExecution jobExecution) {
		// jobが開始する時間をログで出力する
		log.info("job start time={}" , jobExecution.getStartTime());
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		// jobが終了する時間をログで出力する
		log.info("job end time={}" , jobExecution.getEndTime());
	}

}

SpringConfigに定義を記述する

SpringConfig .java
package com.batch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.batch.listener.TestJobExecutionListner;
import com.batch.validator.Tasklet1JobParametersValidator;

/**
 * 設定クラス
 * このクラスはアプリケーションの設定を管理します。
 */

@Configuration
public class SpringConfig {
	// ジョブの実行を管理する
	private final JobLauncher jobLauncher;
	// ジョブの実行結果や進行状況をデータベースへ記録する
	private final JobRepository  jobRepository;
	// トランザクションを開始し、コミットまたはロールバックする
	private final PlatformTransactionManager platformTransactionManager;
	
	@Autowired
	@Qualifier("Tasklet1")
	private Tasklet Tasklet1;
	
	@Autowired
	@Qualifier("Tasklet2")
	private Tasklet Tasklet2;
	
	@Autowired
	private ItemReader<String> Reader;
	
	@Autowired
	private ItemProcessor<String , String> Processor;
	
	@Autowired
	private ItemWriter<String> Writer;
	
	@Autowired
	private TestJobExecutionListner testJobExecutionListner;
	

	public SpringConfig(JobLauncher jobLauncher, JobRepository jobRepository,
			PlatformTransactionManager platformTransactionManager) {
		this.jobLauncher = jobLauncher;
		this.jobRepository = jobRepository;
		this.platformTransactionManager = platformTransactionManager;
	}
	
	// 検証クラスの呼び出し
	@Bean
	public JobParametersValidator jobParametersValidator() {
		return new Tasklet1JobParametersValidator();
	}
	
	
	// Stepを定義する
	@Bean
	public Step tasklet1Step1() {
		// 第一引数はStep名を定義する
		return new StepBuilder("tasklet1Step1" , jobRepository)
				// Taskletを指定する
				.tasklet(Tasklet1 , platformTransactionManager)
				.build();
	}
	
	// Stepを定義する
		@Bean
		public Step tasklet1Step2() {
			// 第一引数はStep名を定義する
			return new StepBuilder("tasklet1Step2" , jobRepository)
					// Taskletを指定する
					.tasklet(Tasklet2 , platformTransactionManager)
					.build();
		}
		
	// 
	@Bean
	public Step ChunkStep() {
		// 第一引数はStep名を定義する
		return new StepBuilder( "ChunkStep" ,jobRepository )
				// 任意のchunkサイズに第一引数で変える事が出来る
				.<String , String>chunk(3 , platformTransactionManager)
				.reader(Reader)
				.processor(Processor)
				.writer(Writer)
				.build();
	}
	
	// Jobを定義する
	@Bean
	public Job tasklet1Job1() {
		// 第一引数はJob名を定義する
		return new JobBuilder("tasklet1Job1" , jobRepository)
				// ジョブを実行するたびに一意のIDを生成
				.incrementer(new RunIdIncrementer())
				// 実行するStepを指定する
				.start(tasklet1Step1())
				.next(tasklet1Step2())
				// chunkを指定する
				.next(ChunkStep())
				// 検証する
				.validator(jobParametersValidator())
				// イベントリスナー(JobExecutionListner)を呼ぶ
				.listener(testJobExecutionListner)
				.build();
	}
	
}

実行する

Java
2024-12-07 19:05:39,973 INFO job start time=2024-12-07T19:05:39.910582 
2024-12-07 19:05:40,074 INFO Executing step: [tasklet1Step1] 
2024-12-07 19:05:40,293 INFO Tasklet1の出力です 
2024-12-07 19:05:40,293 INFO StringParam={}Java 
2024-12-07 19:05:40,293 INFO IntegerParam={}1 
2024-12-07 19:05:40,361 INFO Step: [tasklet1Step1] executed in 286ms 
2024-12-07 19:05:40,830 INFO Executing step: [tasklet1Step2] 
2024-12-07 19:05:40,913 INFO Tasklet2の出力です 
2024-12-07 19:05:40,914 INFO tasklet1JobValue1={}tasklet1JobValue1 
2024-12-07 19:05:40,985 INFO Step: [tasklet1Step2] executed in 154ms 
2024-12-07 19:05:41,183 INFO Executing step: [ChunkStep] 
2024-12-07 19:05:41,272 INFO readData=rice 
2024-12-07 19:05:41,275 INFO readData=miso 
2024-12-07 19:05:41,275 INFO readData=natto 
2024-12-07 19:05:41,276 INFO processor item=RICE 
2024-12-07 19:05:41,276 INFO processor item=MISO 
2024-12-07 19:05:41,276 INFO processor item=NATTO 
2024-12-07 19:05:41,276 INFO Writer chunk=[items=[RICE, MISO, NATTO], skips=[]] 
2024-12-07 19:05:41,276 INFO --------------------------- 
2024-12-07 19:05:41,310 INFO readData=null 
2024-12-07 19:05:41,348 INFO Step: [ChunkStep] executed in 164ms 
2024-12-07 19:05:41,397 INFO job end time=2024-12-07T19:05:41.397988600

1行目と22行目にそれぞれ、ログ出力されているのが確認できました。このように、開始と終了に処理を追加する事ができました。

おわりに

基本的なSpringBatchのChunk及び、Event Listenerについて解説しました。

お読みいただきありがとうございました。

コメント