Published on

스프링 배치 도입과 이를 통한 성능 개선기(데이터 3000개 기준)

Authors
  • avatar
    Name
    불타는 라쿤들
    Twitter

🦡 작성자 : 원준

기존 방식

트랜잭션 범위 조절을 통한 갱신

변경된 방식

SpringBatch 적용 WithChunk(Size = 50)

멘토링 자동 완료

기존 코드

@Service
@Slf4j
@RequiredArgsConstructor
public class MentoringManageUseCase {
		...

    @Scheduled(cron = "0 59 23 * * *", zone = "Asia/Seoul")
    public void updateAutoDone() {
        List<Mentoring> expectedMentorings = mentoringGetService.byExpected();
        expectedMentorings.stream()
                .filter(mentoring -> {
                    try {
                        return mentoring.checkAutoDone();
                    } catch (Exception ex) {
                        slackErrorMessage.sendSlackError(mentoring, ex);
                        return false;
                    }
                })
                .forEach(mentoringRenewalUseCase::updateDoneWithAuto);
    }

    ...
}

@RequiredArgsConstructor
@Service
@Transactional
@Slf4j
public class MentoringRenewalUseCase {
    ...

    public void updateDoneWithAuto(Mentoring mentoring) {
        try {
            Mentoring doneMentoring = mentoringGetService.byMentoringIdWithLazy(mentoring.getMentoringId());
            Senior senior = mentoring.getSenior();
            Salary salary = salaryGetService.bySenior(senior);
            salaryUpdateService.plusTotalAmount(salary, doneMentoring.calculateForSenior());
            mentoringUpdateService.updateDone(doneMentoring, salary);
            log.info("mentoringId : {} 자동 완료", mentoring.getMentoringId());
        } catch (Exception ex) {
            slackErrorMessage.sendSlackError(mentoring, ex);
            log.error("mentoringId : {} 자동 완료 실패", mentoring.getMentoringId());
            log.error(ex.getMessage());
            currentTransactionStatus().setRollbackOnly();
        }
    }
}

진행 예정 상태의 멘토링을 조건이 만족되는 경우 완료 상태로 변경하고, 선배 정산 금액을 업데이트 하는 기존의 로직이다.

우선, 해당 상태의 모든 멘토링을 조회하고 stream().filter() 를 통해 특정 조건에 만족하는 멘토링을 골라낸 후 각각의 멘토링에 대해 메소드를 호출하는 방식이다.

이때, 중간에 하나라도 실패하면 전체가 롤백되는 문제가 발생하였고 이를 방지하기 위하여 멘토링 업데이트에 대해 별도의 클래스로 분리 후 각각의 멘토링에 대해 트랜잭션 설정을 하였고, 각각의 멘토링별로 트랜잭션이 걸리도록 하여 만약 중간에 특정 멘토링에서 문제가 발생하면 해당 멘토링만 롤백 처리 후 나머지는 모두 정상처리 될 수 있도록 구현하였다.


변경 코드

@Configuration
@RequiredArgsConstructor
public class DoneJobConfig {
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final DataSource dataSource;
    private final DoneMentoringProcessor doneMentoringProcessor;
    private final DoneMentoringWriter doneMentoringWriter;

    private static final int CHUNK_SIZE = 50;

    @Bean(name = "doneJob")
    public Job doneJob() throws Exception {
        return new JobBuilder("doneJob", jobRepository)
                .start(doneStep())
                .build();
    }

    @Bean(name = "doneStep")
    public Step doneStep() throws Exception {
        return new StepBuilder("doneStep", jobRepository)
                .<DoneMentoring, DoneMentoring>chunk(CHUNK_SIZE, transactionManager)
                .reader(doneReader())
                .processor(doneMentoringProcessor)
                .writer(doneMentoringWriter)
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(Integer.MAX_VALUE)
                .build();
    }

    @Bean(name = "doneReader")
    public JdbcPagingItemReader<DoneMentoring> doneReader() throws Exception {
        return new JdbcPagingItemReaderBuilder<DoneMentoring>()
                .pageSize(CHUNK_SIZE)
                .fetchSize(CHUNK_SIZE)
                .dataSource(dataSource)
                .rowMapper(new DoneMentoringRowMapper())
                .queryProvider(doneQueryProvider())
                .name("doneReader")
                .build();
    }

    @Bean(name = "doneQuery")
    public PagingQueryProvider doneQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("select m.mentoring_id, m.senior_senior_id, m.salary_salary_id, m.date, p.pay");
        queryProvider.setFromClause("from mentoring m join payment p on m.payment_payment_id = p.payment_id");
        queryProvider.setWhereClause("where m.status = 'EXPECTED'");
        queryProvider.setSortKey("mentoring_id");
        return queryProvider.getObject();
    }
}

@Component
public class DoneMentoringProcessor implements ItemProcessor<DoneMentoring, DoneMentoring> {
    @Override
    public DoneMentoring process(DoneMentoring doneMentoring) {
        DateTimeFormatter formatter = ofPattern("yyyy-MM-dd-HH-mm");
        LocalDateTime doneDate = parse(doneMentoring.date(), formatter);
        if (now().minusDays(3)
                .isAfter(doneDate))
            return doneMentoring;
        return null;
    }
}

@Component
@RequiredArgsConstructor
@Slf4j
public class DoneMentoringWriter implements ItemWriter<DoneMentoring> {
    private final DoneMentoringRepository doneMentoringRepository;

    @Override
    public void write(Chunk<? extends DoneMentoring> chunk) {
        List<DoneMentoring> doneMentorings = new ArrayList<>();
        chunk.forEach(doneMentorings::add);
        doneMentoringRepository.updateAllMentoring(doneMentorings);
        doneMentoringRepository.updateAllSalary(doneMentorings);
        log.info("멘토링 자동 완료 size : {}", chunk.size());
    }
}

@Component
@RequiredArgsConstructor
public class DoneMentoringRepository {
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final int CHARGE = TermUnit.SHORT.getCharge();
    private static final String UPDATE_MENTORING = "update mentoring set status = 'DONE' where mentoring_id = :mentoringId";
    private static final String UPDATE_SALARY = "update salary set total_amount = total_amount + :amount where salary_id = :salaryId";

    public void updateAllSalary(List<DoneMentoring> mentorings) {
        jdbcTemplate.batchUpdate(UPDATE_SALARY, generateParameterSource(mentorings));
    }

    public void updateAllMentoring(List<DoneMentoring> mentorings) {
        jdbcTemplate.batchUpdate(UPDATE_MENTORING, generateParameterSource(mentorings));
    }

    private SqlParameterSource[] generateParameterSource(List<DoneMentoring> mentorings) {
        return mentorings.stream()
                .map(mentoring -> new MapSqlParameterSource(generateEntityParams(mentoring)))
                .toArray(SqlParameterSource[]::new);
    }

    private Map<String, Object> generateEntityParams(DoneMentoring doneMentoring) {
        HashMap<String, Object> parameter = new HashMap<>();
        parameter.put("mentoringId", doneMentoring.mentoringId());
        parameter.put("amount", doneMentoring.pay() - CHARGE);
        parameter.put("salaryId", doneMentoring.salaryId());
        return parameter;
    }
}

@Component
@RequiredArgsConstructor
@Slf4j
public class DoneMentoringSkipListener implements SkipListener<DoneMentoring, DoneMentoring> {
    private final SlackErrorMessage slackErrorMessage;

    @Override
    public void onSkipInRead(Throwable t) {
        log.info("멘토링 자동 완료 ItemReader Skip message : {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(DoneMentoring doneMentoring, Throwable t) {
        log.info("mentoringId : {} 자동 완료 실패, message : {}", doneMentoring.mentoringId(), t.getMessage());
        slackErrorMessage.sendSlackMentoringError(doneMentoring.mentoringId(), t);
    }

    @Override
    public void onSkipInWrite(DoneMentoring doneMentoring, Throwable t) {
        log.info("mentoringId : {} 자동 완료 실패, message : {}", doneMentoring.mentoringId(), t.getMessage());
        slackErrorMessage.sendSlackMentoringError(doneMentoring.mentoringId(), t);
    }
}

기존의 배치 처리를 스프링 배치 프레임 워크를 활용하는 방식으로 리팩토링 하였다.

Chunk 기반으로 트랜잭션 처리를 하는 방식이며 서버의 스펙상 10~100 정도의 ChunkSize를 권장하고 있기에 중간 값인 50으로 설정하여 수행하였다.

reader 에서 기존과 같이 멘토링을 조회하며, processor 에서 stream().filter() 에서 수행하던 역할을 진행하도록 하였다.

그리고 writer 에서 차이가 발생하는데, Chunk 단위로 동작을 하기 때문에 한번에 최대 50개를 한번에 Insert 및 Update를 수행하도록 하였다.

이를 통해 1개씩 수행하던 기존과 다르게 50개 단위로 처리하며 오류 발생시 해당 데이터만 스킵하고 해당 Chunk를 다시 시작하도록 하여 기존의 오류 제어는 그대로 가져가도록 구현하였다.

마찬가지로 skipListener 를 통해 기존과 같이 오류 발생시 슬랙으로 메시지를 보내고 로그 처리하는 방식 또한 유지하도록 하며 성능 향상을 가져왔다.

⭐ 결과 : 2분 12초 → 2초

정산 자동 생성

기존 코드

@Service
@RequiredArgsConstructor
public class SalaryManageUseCase {
    ...

    @Scheduled(cron = "0 0 0 * * 4", zone = "Asia/Seoul")
    public void createSalary() {
        List<Salary> salaries = salaryGetService.findAllLast();
        slackSalaryMessage.sendSlackSalary(salaries);

        List<SeniorAndAccount> seniorAndAccounts = seniorGetService.findAllSeniorAndAccount();
        LocalDate salaryDate = getSalaryDate().plusDays(7);
        seniorAndAccounts.stream()
                .filter(seniorAndAccount -> salaryGetService.bySeniorOptional(seniorAndAccount.senior(), salaryDate).isEmpty())
                .forEach(seniorAndAccount -> salaryRenewalUseCase.createSalaryWithAuto(seniorAndAccount, salaryDate));
    }
}

@RequiredArgsConstructor
@Service
@Transactional
@Slf4j
public class SalaryRenewalUseCase {
    private final SalarySaveService salarySaveService;

    public void createSalaryWithAuto(SeniorAndAccount seniorAndAccount, LocalDate salaryDate) {
        Salary salary = SalaryMapper.mapToSalary(seniorAndAccount.senior(), salaryDate, seniorAndAccount.account());
        salarySaveService.save(salary);
    }
}

1주일에 한번씩 일괄 처리하는 과정으로, 선배 각각에 대해서 새로운 정산 데이터를 생성하는 로직이다.

우선 이번 정산을 모두 조회한 후 슬랙으로 알림을 보내도록 하였고, 이후 각 선배의 계좌와 선배 정보를 조회한 후 해당 정보들을 기반으로 정산을 새롭게 생성하는 것이다.

이때, 멘토링과 마찬가지로 중간에 하나라도 실패하면 전체가 롤백 되는 경우가 발생하였고 이를 방지하기 위해 정산 생성을 별도의 클래스로 분리 후 각각의 선배를 기준으로 트랜잭션 범위를 설정하여 특정 선배의 처리에서 문제가 발생하면 해당 데이터만 롤백하고 나머지는 정상 처리할 수 있도록 구현하였다.


변경 코드

@Configuration
@RequiredArgsConstructor
@Slf4j
public class CreateSalaryJobConfig {
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final SlackSalaryMessage slackSalaryMessage;
    private final SalaryGetService salaryGetService;
    private final CreateSalaryItemWriter createSalaryItemWriter;
    private final DataSource dataSource;

    private static final int CHUNK_SIZE = 50;

    @Bean(name = "salaryJob")
    public Job salaryJob() throws Exception {
        return new JobBuilder("salaryJob", jobRepository)
                .start(sendSlackStep())
                .next(createSalaryStep())
                .build();
    }

    @Bean(name = "sendSlackStep")
    public Step sendSlackStep() {
        return new StepBuilder("sendSlackStep", jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    List<Salary> salaries = salaryGetService.findAllLast();
                    slackSalaryMessage.sendSlackSalary(salaries);
                    log.info("salarySize : {}", salaries.size()); // 임시
                    salaries.forEach(salary -> log.info("salary : {}", salary.getTotalAmount())); // 임시
                    return RepeatStatus.FINISHED;
                }, transactionManager)
                .build();
    }

    @Bean(name = "createSalaryStep")
    public Step createSalaryStep() throws Exception {
        return new StepBuilder("createSalaryStep", jobRepository)
                .<CreateSalary, CreateSalary>chunk(CHUNK_SIZE, transactionManager)
                .reader(salaryReader())
                .writer(createSalaryItemWriter)
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(Integer.MAX_VALUE)
                .build();
    }

    @Bean(name = "salaryReader")
    public JdbcPagingItemReader<CreateSalary> salaryReader() throws Exception {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("salaryDate", getSalaryDate().plusDays(7));
        return new JdbcPagingItemReaderBuilder<CreateSalary>()
                .pageSize(CHUNK_SIZE)
                .fetchSize(CHUNK_SIZE)
                .dataSource(dataSource)
                .rowMapper(new CreateSalaryRowMapper())
                .queryProvider(salaryQueryProvider())
                .parameterValues(parameters)
                .name("salaryReader")
                .build();
    }

    @Bean(name = "salaryQuery")
    public PagingQueryProvider salaryQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("SELECT s.senior_id, a.bank, a.account_id, a.account_holder, a.account_number");
        queryProvider.setFromClause("FROM senior s\n" +
                "JOIN user u ON s.user_user_id = u.user_id\n" +
                "LEFT JOIN account a ON s.senior_id = a.senior_senior_id");
        queryProvider.setWhereClause("WHERE u.is_delete = false\n" +
                "AND s.senior_id NOT IN (\n" +
                "SELECT senior_senior_id\n" +
                "FROM salary\n" +
                "WHERE salary_date = :salaryDate\n" +
                ")");
        queryProvider.setSortKey("senior_id");
        return queryProvider.getObject();
    }
}

@Component
@RequiredArgsConstructor
@Slf4j
public class CreateSalaryItemWriter implements ItemWriter<CreateSalary> {
    private final CreateSalaryRepository createSalaryRepository;

    @Override
    public void write(Chunk<? extends CreateSalary> chunk) throws Exception {
        List<CreateSalary> createSalaries = new ArrayList<>();
        chunk.forEach(createSalaries::add);
        createSalaryRepository.insertAllSalary(createSalaries);
        log.info("salary 자동 생성 ChunkSize : {}", chunk.size());
    }
}

@Component
@RequiredArgsConstructor
public class CreateSalaryRepository {
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final String INSERT_SALARY = "insert into salary " +
            "(salary_date, status, total_amount, senior_senior_id, account_holder, account_number, bank) " +
            "values (:salaryDate, false, 0, :seniorId, :accountHolder, :accountNumber, :bank)";

    public void insertAllSalary(List<CreateSalary> createSalaries) {
        jdbcTemplate.batchUpdate(INSERT_SALARY, generateParameterSource(createSalaries));
    }

    private SqlParameterSource[] generateParameterSource(List<CreateSalary> createSalaries) {
        return createSalaries.stream()
                .map(createSalary -> new MapSqlParameterSource(generateEntityParams(createSalary)))
                .toArray(SqlParameterSource[]::new);
    }

    private Map<String, Object> generateEntityParams(CreateSalary createSalary) {
        HashMap<String, Object> parameter = new HashMap<>();
        parameter.put("salaryDate", getSalaryDate().plusDays(7));
        parameter.put("seniorId", createSalary.seniorId());
        parameter.put("accountHolder", createSalary.accountHolder());
        parameter.put("accountNumber", createSalary.accountNumber());
        parameter.put("bank", createSalary.bank());
        return parameter;
    }
}

기존의 배치 처리 프로세스를 스프링 배치 프레임워크를 활용하는 방식으로 리팩토링 하였다.

ChunkSize 설정은 멘토링과 동일하게 설정하였다.

TaskletChunk 두가지를 함께 사용하였으며 Tasklet 에서 기존과 같이 한번에 모든 정산을 읽은 후 정산 예정 금액을 슬랙으로 전송한 후 Chunk 단위로 새로운 정산을 생성하도록 하였다.

reader 에서 기존과 같이 정산 생성을 위한 조회를 진행하는데, 기존의 stream().ilter() 에서 처리하는 내용을 SQL을 통해 처리한 후 바로 writer 로 넘기도록 하였다.

그리고 writer 에서 차이가 발생하는데, 마찬가지로 한번에 최대 50개를 한번에 Insert 및 Update를 수행하도록 하였으며 1개씩 수행하던 기존과 다르게 50개 단위로 처리하며 오류 발생시 해당 데이터만 스킵하고 해당 Chunk를 다시 시작하도록 하여 기존의 오류 제어는 그대로 가져가도록 구현하였다.

마찬가지로 skipListener 를 통해 기존과 같이 오류 발생시 슬랙으로 메시지를 보내고 로그 처리하는 방식 또한 유지하도록 하며 성능 향상을 가져왔다.

⭐ 결과 : 1분 10초 → 2.5초