Notice
Recent Posts
Recent Comments
Link
«   2025/12   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30 31
Archives
Today
Total
관리 메뉴

사부작사부작

JpaPagingItemReader, JpaItemWriter 본문

스터디

JpaPagingItemReader, JpaItemWriter

민철킴 2024. 11. 12. 17:37

https://devocean.sk.com/blog/techBoardDetail.do?ID=166902

 

[SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기

 

devocean.sk.com

해당 블로그 글을 기반으로 공부한 글입니다. 

 

JpaPagingItemReader

 

소개 : 해당 클래스는 스프링 배치에서 제공하는 ItemReader 구현체 중 하나입니다. JPA를 이용해서 데이터베이스로부터 페이징 단위로 데이터를 읽어올 때 사용합니다.

 

주요 구성 요소 :

  • EntityManagerFactory: JPA 엔티티 매니저 팩토리를 설정합니다.
  • JpaQueryProvider: 데이터를 읽을 JPA 쿼리를 제공합니다.
  • PageSize: 한 번에 읽어올 데이터의 개수 즉, 페이지 크기를 설정합니다.
  • SkippableItemReader: 오류 발생 시 해당 아이템을 건너뛸 수 있도록 설정합니다.
  • ReadListener: 읽기 시작, 종료, 오류 발생 등의 이벤트를 처리할 수 있도록 설정합니다.
  • SaveStateCallback: 잡 중단 시 현재 상태를 저장하여 재시작 시 이어서 처리할 수 있도록 설정합니다.

특징 :

  • 페이징 처리 : 지정된 페이지 크기만큼 나눠서 데이터를 읽어오기 때문에 메모리 사용량을 줄일 수 있습니다. 청크 크기와 한 번에 읽어올 페이지 크기를 같게 맞춰주는게 일반적입니다.
  • JPQL, 네이티브 쿼리 : JPQL을 사용하여 엔티티 기반의 쿼리를 작성할 수도 있고, 네이티브 SQL 쿼리를 사용할 수도 있습니다.
    jpaPagingItemReader.setQueryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc");
    jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
    
  • 네이티브 쿼리네이티브 쿼리를 사용할 경우 setUseNativeQuery(true) 설정을 해줘야 합니다. 여기서 setPreparedStatementSetter() 메서드는 파라미터에 값을 바인딩하는 방식을 설정합니다. 쿼리의 '?' 위치에 20 이라는 값을 바인딩하게 됩니다.
jpaPagingItemReader.setQueryString("SELECT * FROM customer WHERE age > ? ORDER BY id DESC"); 
jpaPagingItemReader.setUseNativeQuery(true); 
jpaPagingItemReader.setPreparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{20}));

 

 

실습 : 

간단한 코드로 실습해 보겠습니다. 

@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private int id;
    private String name;
    private int age;
    private String gender;
}

 

@Slf4j
@Configuration
public class JpaPagingReaderJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Autowired
    EntityManagerFactory entityManagerFactory;

    @Bean
    public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
        JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
        jpaPagingItemReader.setQueryString(
                "SELECT c FROM Customer c WHERE c.age > :age order by id desc"
        );
        jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
        jpaPagingItemReader.setPageSize(CHUNK_SIZE);
        jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
        return jpaPagingItemReader;
    }

    @Bean
    public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() {

        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerJpaFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v2.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("Name", "Age", "Gender")
                .build();
    }


    @Bean
    public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerJpaPagingStep -----------------");

        return new StepBuilder("customerJpaPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(customerJpaPagingItemReader())
                .processor(new CustomerItemProcessor())
                .writer(customerJpaFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJpaPagingJob -----------------");
        return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcPagingStep)
                .build();
    }
}

 

db의 데이터와 로그들입니다. 나이가 20살이 넘는 고객만 조회해오기 때문에 2개의 데이터에 대한 로그만 존재합니다.

 

실행 결과로 생성된 csv 파일입니다.

 

 

JpaItemWriter

소개 : 해당 클래스는 스프링 배치에서 제공하는 ItemWriter의 구현체 중 하나입니다. JPA를 이용해서 데이터를 데이터베이스에 저장할 때 사용됩니다. 

 

주요 구성 요소

  • EntityManagerFactory: JPA EntityManager 생성을 위한 팩토리 객체
  • JpaQueryProvider: 저장할 엔터티를 위한 JPA 쿼리를 생성하는 역할

실습 :

간단한 코드로 실습해 보겠습니다.

 

CSV 파일의 데이터를 읽어와, customer 테이블에 데이터를 저장해보겠습니다.

읽어올 CSV 파일입니다.

 

@Slf4j
@Configuration
public class JpaItemJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 200;
    public static final String ENCODING = "UTF-8";
    public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";

    @Autowired
    EntityManagerFactory entityManagerFactory;

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader() {

        return new FlatFileItemReaderBuilder<Customer>()
                .name("FlatFileItemReader")
                .resource(new ClassPathResource("./customer.csv"))
                .encoding(ENCODING)
                .delimited().delimiter(",")
                .names("name", "age", "gender")
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public JpaItemWriter<Customer> jpaItemWriter() {
        return new JpaItemWriterBuilder<Customer>()
                .entityManagerFactory(entityManagerFactory)
                .usePersist(true)
                .build();
    }


    @Bean
    public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        log.info("------------------ Init flatFileStep -----------------");

        return new StepBuilder("flatFileStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(flatFileItemReader())
                .writer(jpaItemWriter())
                .build();
    }

    @Bean
    public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
        log.info("------------------ Init flatFileJob -----------------");
        return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(flatFileStep)
                .build();
    }
}

 

jpaItemWriter() 코드를 보시면 엔티티 매니저를 통해 아이템을 persist하는 것을 보실 수 있습니다. 

그러면 customer 테이블의 결과를 보겠습니다.

 

기존의 데이터에 읽어온 CSV파일의 데이터가 추가된걸 확인할 수 있습니다.