사부작사부작
JpaPagingItemReader, JpaItemWriter 본문
https://devocean.sk.com/blog/techBoardDetail.do?ID=166902
[SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기
devocean.sk.com
해당 블로그 글을 기반으로 공부한 글입니다.
JpaPagingItemReader
소개 : 해당 클래스는 스프링 배치에서 제공하는 ItemReader 구현체 중 하나입니다. JPA를 이용해서 데이터베이스로부터 페이징 단위로 데이터를 읽어올 때 사용합니다.
주요 구성 요소 :
- EntityManagerFactory: JPA 엔티티 매니저 팩토리를 설정합니다.
- JpaQueryProvider: 데이터를 읽을 JPA 쿼리를 제공합니다.
- PageSize: 한 번에 읽어올 데이터의 개수 즉, 페이지 크기를 설정합니다.
- SkippableItemReader: 오류 발생 시 해당 아이템을 건너뛸 수 있도록 설정합니다.
- ReadListener: 읽기 시작, 종료, 오류 발생 등의 이벤트를 처리할 수 있도록 설정합니다.
- SaveStateCallback: 잡 중단 시 현재 상태를 저장하여 재시작 시 이어서 처리할 수 있도록 설정합니다.
특징 :
- 페이징 처리 : 지정된 페이지 크기만큼 나눠서 데이터를 읽어오기 때문에 메모리 사용량을 줄일 수 있습니다. 청크 크기와 한 번에 읽어올 페이지 크기를 같게 맞춰주는게 일반적입니다.
- JPQL, 네이티브 쿼리 : JPQL을 사용하여 엔티티 기반의 쿼리를 작성할 수도 있고, 네이티브 SQL 쿼리를 사용할 수도 있습니다.
jpaPagingItemReader.setQueryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc"); jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20)); - 네이티브 쿼리네이티브 쿼리를 사용할 경우 setUseNativeQuery(true) 설정을 해줘야 합니다. 여기서 setPreparedStatementSetter() 메서드는 파라미터에 값을 바인딩하는 방식을 설정합니다. 쿼리의 '?' 위치에 20 이라는 값을 바인딩하게 됩니다.
jpaPagingItemReader.setQueryString("SELECT * FROM customer WHERE age > ? ORDER BY id DESC");
jpaPagingItemReader.setUseNativeQuery(true);
jpaPagingItemReader.setPreparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{20}));
실습 :
간단한 코드로 실습해 보겠습니다.
@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
private String name;
private int age;
private String gender;
}
@Slf4j
@Configuration
public class JpaPagingReaderJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Autowired
EntityManagerFactory entityManagerFactory;
@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setQueryString(
"SELECT c FROM Customer c WHERE c.age > :age order by id desc"
);
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
jpaPagingItemReader.setPageSize(CHUNK_SIZE);
jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
return jpaPagingItemReader;
}
@Bean
public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerJpaFlatFileItemWriter")
.resource(new FileSystemResource("./output/customer_new_v2.csv"))
.encoding(ENCODING)
.delimited().delimiter("\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJpaPagingStep -----------------");
return new StepBuilder("customerJpaPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(customerJpaPagingItemReader())
.processor(new CustomerItemProcessor())
.writer(customerJpaFlatFileItemWriter())
.build();
}
@Bean
public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJpaPagingJob -----------------");
return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJdbcPagingStep)
.build();
}
}
db의 데이터와 로그들입니다. 나이가 20살이 넘는 고객만 조회해오기 때문에 2개의 데이터에 대한 로그만 존재합니다.

실행 결과로 생성된 csv 파일입니다.

JpaItemWriter
소개 : 해당 클래스는 스프링 배치에서 제공하는 ItemWriter의 구현체 중 하나입니다. JPA를 이용해서 데이터를 데이터베이스에 저장할 때 사용됩니다.
주요 구성 요소
- EntityManagerFactory: JPA EntityManager 생성을 위한 팩토리 객체
- JpaQueryProvider: 저장할 엔터티를 위한 JPA 쿼리를 생성하는 역할
실습 :
간단한 코드로 실습해 보겠습니다.
CSV 파일의 데이터를 읽어와, customer 테이블에 데이터를 저장해보겠습니다.
읽어올 CSV 파일입니다.

@Slf4j
@Configuration
public class JpaItemJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 200;
public static final String ENCODING = "UTF-8";
public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";
@Autowired
EntityManagerFactory entityManagerFactory;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("./customer.csv"))
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public JpaItemWriter<Customer> jpaItemWriter() {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
}
@Bean
public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init flatFileStep -----------------");
return new StepBuilder("flatFileStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader())
.writer(jpaItemWriter())
.build();
}
@Bean
public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
log.info("------------------ Init flatFileJob -----------------");
return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(flatFileStep)
.build();
}
}
jpaItemWriter() 코드를 보시면 엔티티 매니저를 통해 아이템을 persist하는 것을 보실 수 있습니다.
그러면 customer 테이블의 결과를 보겠습니다.

기존의 데이터에 읽어온 CSV파일의 데이터가 추가된걸 확인할 수 있습니다.
'스터디' 카테고리의 다른 글
| Custom ItemReader/ItemWriter 구현방법 알아보기 (1) | 2024.12.09 |
|---|---|
| JdbcPagingItemReader와 JdbcBatchItemWriter (1) | 2024.11.05 |
| 스프링 배치 3주차 (0) | 2024.10.22 |
| 스프링 배치 스터디 2주차 (0) | 2024.10.14 |