Spring Batch

  • 배치(Batch) : 일괄 처리 방식을 의미
  • 스프링 배치는 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기, 리소스 관리 등 대용량 레코드 처리에 필수적인 기능을 제공
  • 배치가 실패하여 작업 재시작을 하게 된다면 처음부터가 아닌 실패한 지점부터 실행
  • 중복 실행을 막기 위해 성공한 이력이 있는 Batch는 동일한 Parameters로 실행 시 Exception이 발생

  • 배치 작업이 필요한 이유
    • 대용량의 비지니스 데이터를 복잡한 작업으로 처리해야하는 경우
    • 특정한 시점에 스케줄러를 통해 자동화된 작업이 필요한 경우
    • 대용량 데이터의 포맷을 변경, 유효성 검사 등의 작업을 트랜잭션 안에서 처리 후 기록해야하는 경우
implementation "org.springframework.boot:spring-boot-starter-batch"

구조

image

ExecutionContext

  • 프레임워크에서 Execution의 상태를 저장하는 공유 객체
  • Job 재 시작시 이미 처리한 Row 데이터는 건너뛰고 이후로 수행하도록 할 때 상태 정보를 활용한다
  • 종류 : JobExecution, StepExecution

  • JobExecution : 각 Job에 생성되며 Job 간에 공유되지 않는다. 다만, Job에 포함된 Step 간에는 공유된다
  • StepExecution : 각 Step에 생성되며 Step 간에 공유되지 않는다.
public class ExecutionContext {
    Map<String, Object> map = new ConcurrentHashMap();
} 

Batch Scope

  • Scope : 스프링 컨테이너에서 빈이 관리되는 범위
    • ex) singleton, prototype, request, session, application 등
  • Spring Batch에서만 제공하는 스코프가 존재한다
    • @JobScope, @StepScpoe
    • Job 과 Step 의 빈 생성과 실행에 관여하는 스코프
    • 해당 스코프가 선언되면 빈이 생성이 어플리케이션 구동시점이 아닌 빈의 실행시점에 이루어진다 (Lazy Binding)
    • 어플리케이션 구동시점에 프록시 객체를 생성해 놓고 실행시점step()에 실제 빈을 생성하면서 value를 바인딩한다
  • @Value를 사용해 빈의 실행시점에 값을 주입할 수 있다
    @Value("#{jobParameters[파라미터명]}")
    @Value("#{jobExecutionContext[파라미터명]”})
    @Value("#{stepExecutionContext[파라미터명]})
    
  • @Value 를 사용할 경우 @Bean에 @JobScope, @StepScope 를 정의하지 않으면 오류 발생

1. @JobScope

  • Step에 정의
  • @Value : jobParameter, jobExecutionContext 사용가능

2. @StepScope

  • Tasklet이나 ItemReader, ItemWriter, ItemProcessor에 정의
  • @Value : jobParameter, jobExecutionContext, stepExecutionContext 사용가능

Job

  • 배치처리 과정을 하나의 단위로 만들어 놓은 객체
  • 1개의 Job에는 1개 이상의 Step이 구성되야 한다 (Job:Step = 1:N)
  • 구현체 : SimpleJob, FlowJob

1. JobLauncher

  • Job과 JobParameter를 사용하여 Job을 실행시키는 객체
  • 배치처리를 완료 한 다음에 Client에게 JobExecution을 반환한다 (동기적 실행)
    • 비동기적 실행도 가능함
public interface JobLauncher {
    JobExecution run(Job job, JobParameters jobParameters)
}

2. JobInstance

  • Job의 논리적 실행단위 객체로써 고유하게 식별 가능한 작업실행
  • JobName + JobParameter 가 같을 경우 동일한 JobInstance를 리턴해 Exception 발생 (중복불가)
  • Job:JobInstance = 1:N

3. JobParameters

  • Job 실행 시 사용되는 파라미터
  • 여러 개의 JobInstance를 구분하기 위한 용도
  • JobParameters:JobInstance = 1:1
public class JobParameters implements Serializable {
    private final Map<String, JobParameter> parameters;

    public JobParameters() {
        this.parameters = new LinkedHashMap<>();
    }
    ...
}

4. JobExecution

  • JobInstance의 실행시도 객체
  • JobInstance 실행상태, 시작시간, 종료시간, 생성시간 등의 정보를 저장
  • ExitStatus가 FAILED이면 재실행 가능하며 새로운 JobExecution이 생성됨
    • COMPLITED이면 재실행이 불가하며 실행시도시 JobInstanceAlreadyCompleteException 발생
  • JobInstance:JobExecution = 1:N

5. JobRepository

  • 모든 배치 정보를 갖고 있는 메커니즘 (메타데이터 저장소)
  • Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성하게 되며 JobRepository에서 Execution 정보들을 DB에 저장하고 조회하며 사용한다
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class ExampleJobConfig {

    public static final String JOB_NAME = "example";

    private final JobBuilderFactory jobBuilderFactory;
    private final Step taskStep;
    private final Step chunkStep;

    @Bean(JOB_NAME)
    public Job ExampleJob(){

        Job exampleJob = jobBuilderFactory  // JobBuilderFactory
                .get(JOB_NAME)              // JobBuilder
                .start(taskStep)         // SimpleJobBuilder
                .next(chunkStep)
 //             .incrementer(new UniqueRunIdIncrementer())
                .incrementer(new CustomJobParametersIncrementer())
                .validator(new CustomJobParametersValidator())
                .preventRestart(false)              // default : true
                .build();                   // SimpleJob

        return exampleJob;
    }
}
  • Validator
public class CustomJobParametersValidator implements JobParametersValidator {

    @Override
    public void validate(JobParameters parameters) throws JobParametersInvalidException {
        if (parameters.getString("name") == null) {
            throw new JobParametersInvalidException();
        }
    }
}
  • Incrementer
public class CustomJobParametersIncrementer implements JobParametersIncrementer {
    static final SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd-hhmmss");

    @Override
    public JobParameters getNext(JobParameters parameters) {
        String id = format.format(new Date());
        return new JobParametersBuilder().addString("run.id", id).toJobParameters();
    }
}

Step

  • Job의 배치처리 과정을 정의하고 순차적으로 실행되는 객체 (Job의 세부 과정)
  • 구현체 : TaskletStep, PartitionStep, JobStep, FlowStep
public interface Step {
    void execute(StepExecution stepExecution) throws JobInterruptedException;
}

1. StepExecution

  • Step의 실행시도 객체, Step별로 StepExecution이 생성됨
  • JobExecution에 저장되는 정보 외에 read 수, write 수, commit 수, skip 수 등의 정보도 저장
  • JobExecution:StepExecution = 1:N
  • Job이 재시작되더라도 이미 성공된 Step은 스킵되고 실패한 Step만 실행된다 (Step의 allowStartIfComplete=true이면 스킵되지 않게 설정가능 )
  • Step의 StepExecution이 하나라도 실패하면 JobExecution은 실패처리 된다 image

  • StepBuilder
    • 호출하는 메서드에 따라 사용하는 하위 StepBuilder가 다르며 그에 따른 Step이 생성된다
public class StepBuilder extends StepBuilderHelper<StepBuilder> {

    // Tasklet 사용
    public TaskletStepBuilder tasklet(Tasklet tasklet) {
        return new TaskletStepBuilder(this).tasklet(tasklet);
    }

    // Chunk 기반의 Tasklet 사용
    public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
        return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
    }
    public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
        return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
    }

    // PartitionStep
    public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
        return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
    }
    public PartitionStepBuilder partitioner(Step step) {
        return new PartitionStepBuilder(this).step(step);
    }

    // JobStep
    public JobStepBuilder job(Job job) {
        return new JobStepBuilder(this).job(job);
    }

    // FlowStep
    public FlowStepBuilder flow(Flow flow) {
        return new FlowStepBuilder(this).flow(flow);
    }
}
@Slf4j
@Component
@JobScope
@RequiredArgsConstructor
public class ExampleStepConfig {

    public final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step taskStep(){
        Step taskStep = stepBuilderFactory     // StepBuilderFactory
                .get("taskStep")               // StepBuilder
                .tasklet((contribution, chunkContext) -> {  // TaskletStepBuilder
                    log.info("taskStep is executed!");
                    return RepeatStatus.FINISHED;
                })
                .build();                       // TaskletStep

        return taskStep;
    }

    @Bean
    public Step chunkStep(){
        Step chunkStep = stepBuilderFactory     // StepBuilderFactory
                .get("chunkStep")               // StepBuilder
                .<String, String>chunk(10)      // SimpleStepBuilder
                .reader(new CustomReader())
                .processor(new CustomProcessor())
                .writer(new CustomWriter())
                .build();                       // TaskletStep

        return chunkStep;
    }
}

Tasklet

  • Step 내에서 실행되는 객체, 주로 단일 task를 수행
  • TaskletStep에 의해 반복적으로 실행(While loop)되며 반환값에 따라 실행/종료 된다
    • RepeatStatus.CONTINUABLE : 반복
    • RepeatStatus.FINISHED : 종료
  • Step:Tasklet = 1:1
public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

image

1. Task 기반

  • 단일 작업 기반으로 처리하는 방식 image

2. Chunk 기반 (COT)

  • Chunk기반 tasklet의 구현체 ChunkOrientedTasklet 사용
    • Chunk 마다 새로운 ChunkOrientedTasklet을 생성해서 사용한다??? (확인필요)
  • 하나의 큰 덩어리를 N개씩 나누어 처리하는 방식
  • Chunk 단위로 트랜잭션을 처리한다 image
public class Chunk<W> implements Iterable<W> {

    private List<W> items = new ArrayList<>();
    private List<SkipWrapper<W>> skips = new ArrayList<>();
    private List<Exception> errors = new ArrayList<>();

    public void add(W item) {
        items.add(item);
    }
    public class ChunkIterator implements Iterator<W> {
        ...
    }
}
public class ChunkOrientedTasklet<I> implements Tasklet {
    private final ChunkProvider<I> chunkProvider;       // reader
    private final ChunkProcessor<I> chunkProcessor;     // processor, writer
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        ...

        Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
        if (inputs == null) {
            inputs = chunkProvider.provide(contribution);       // (1)
            ...
        }

        chunkProcessor.process(contribution, inputs);           // (2)
        chunkProvider.postProcess(contribution, inputs);

        ...

        chunkContext.removeAttribute(INPUTS_KEY);
        chunkContext.setComplete();
        ...

        return RepeatStatus.continueIf(!inputs.isEnd());        // (3)

	}
}

(1) Item 을 Chunk size 만큼 반복해서 읽은 다음 Chunk 에 저장하고 반환
(2) ChunkProvider 로 부터 받은 Chunk 의 아이템 개수만큼 데이터를 가공하고 저장
(3) 읽을 Item 이 더 존재하는지 체크해서 존재하면 Chunk 프로세스 반복하고 null 일경우 RepeatStaus.FINISHED 반환하고 Chunk 프로세스 종료

  • ItemReader, ItemWriter, ItemProcesseor
    • Step 과정에서 Item을 읽어 데이터를 처리한 다음 결과를 처리하는 객체
    • ItemReaderItemProcesseor는 Chunk 내의 개별 item을 처리하지만 ItemWriter는 Chunk 모든 items를 일괄 처리 한다
    • ItemReaderItemWriter는 필수요소 지만 ItemProcesseor는 선택요소이다

1. ItemReader

  • input 데이터를 읽어오는 인터페이스
public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
  • 구현체
    • FlatFileItemReader : txt, csv 등 파일
    • JsonItemReader : json 파일
    • MultiResourceItemReader : 여러 개의 파일 조합
    • JdbcCursorItemReader : JDBC Cursor
    • JpaCursorItemReader : JPA Cursor
    • JdbcPagingItemReader : JDBC Paging
    • JpaPagingItemReader : JPA Paging
    • SynchronizeditemStreamReader : Thread-safe ItemReader
    • CustomItemReader : Custom ItemReader

* JDBC

  • Cursor Base
    • 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환을 이루는 Streaming 방식의 I/O
    • DB connection이 연결되면 배치 처리가 완료될 때까지 데이터를 읽어온다 (Timeout 시간이 충분히 길 필요가 있음)
    • 모든 결과를 메모리에 올리기 때문에 메모리 사용량이 많다
    • Thread-safe 하지 않다
  • Paging Base
    • 페이지 단위로 데이터를 조회하는 방식으로 Page Size 만큼 한번에 데이터를 가지고 온 다음 한 개씩 읽는다 (Offset, Limit 사용)
    • 한 페이지를 읽을 때마다 Connection을 맺고 끊는다
    • 페이징 단위의 결과만 메모리에 올려 메모리 사용량이 적다
    • Thread-safe 하다

2. ItemWriter

  • Chunk 단위로 데이터를 받아 일괄 출력하는 인터페이스
  • 출력이 완료되면 트랜잭션이 종료되고 새로운 Chunk 단위 프로세스로 이동한다
public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}
  • 구현체
    • FlatFileItemWriter : txt, csv 등 파일
    • JsonFileItemWriter : json 파일
    • JdbcBatchItemWriter : JDBC
      • JDBC Batch 기능을 사용해 bulk update 처리
    • JpaItemWriter : JPA
      • Entity를 하나씩 chunk 크기만큼 update 처리
    • CustomItemWriter : Custom ItemWriter

3. ItemProcessor

  • 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링 하는 역할

  • 구현체

    • CompositeItemProcessor : ItemProcessor들을 체이닝 실행
    • ClassifierCompositeItemProcessor : 라우팅으로 ItemProcessor들 중 하나를 호출
public interface ItemProcessor<I, O> {
    O process(@NonNull I item) throws Exception;
}

image

image