본문 바로가기

Spring

[Spring Batch] MongoItemWriter 구현

MongoItemWriter를 사용하면 Chunk 사이즈로 bulk 삽입이 되지 않고 한 row 씩 저장되는 것을 확인했다.

bulk로 저장할 수 있도록 custom 하여 사용해보려고 한다.

 

MongoItemWriter

MongoItemWriter의 doWrite를 보면 아래와 같다.

protected void doWrite(List<? extends T> items) {
    if(!CollectionUtils.isEmpty(items)) {
        if(delete) {
            if(StringUtils.hasText(collection)) {
                for(Object object : items) {
                    template.remove(object, collection);
                }
            }
            else {
                for (Object object : items) {
                    template.remove(object);
                }
            }
        }
        else {
            if(StringUtils.hasText(collection)) {
                for(Object object : items) {
                    template.save(object, collection);
                }
            }
            else {
                for(Object object : items) {
                    template.save(Object);
                }
            }
        }
    }
}

item 하나씩 for문을 돌면서 저장을 하고 있다.

Mongo Bulk Operation

Mongodb는 다량의 데이터를 쓰기 위한 연산을 제공하는데, 이것이 bulk operation이다. 이는 단일 collection에서만 영향을 미친다.

Ordered vs Unordered Operations

Bulk operation은 ordered 또는 unordered로 연산을 수행할 수 있는데, ordered로 연산을 수행하면, 단어가 주는 뉘앙스처럼 순차적으로 데이터를 저장한다. 

 

Ordered : 순차적인 작업 목록을 사용하는 MongoDB는 작업을 순차적으로 실행합니다. 쓰기 작업 중 하나를 처리하는 중에 오류가 발생하면 MongoDB는 나머지 쓰기 작업을 처리하지 않고 반환합니다.

 

Unordered : 정렬되지 않은 작업 목록을 사용하는 MongoDB가 작업을 병렬로 실행할 수 있지만 이 동작은 보장되지 않습니다. 쓰기 작업 중 하나가 처리되는 동안 오류가 발생하면 MongoDB는 나머지 쓰기 작업을 계속 처리합니다.

 

bulk 실행시 연산을 묶어서 처리하는데, 이때 하나의 연산 그룹 당 최대 가질 수 있는 연산 수는 1000개이다.

 

MongoBulkWriter 재구현

해당 클래스를 재구현하여 MongoBulkWriter 클래스를 구현해보려 한다.

protected void doWrite(List<? extends T> items) {
    if(!CollectionUtils.isEmpty(items)) {
        if(delete) {
            if(StringUtils.hasText(collection)) {
                for(Object object : items) {
                    template.remove(object, collection);
                }
            }
            else {
                for (Object object : items) {
                    template.remove(object);
                }
            }
        }
        else {
            if(StringUtils.hasText(collection)) {
                BulkOperations bulkOperations = template.bulkOps(BulkOperations.BulkMode.UNORDERED, this.collection);
                bulkOperations.insert(items);
                bulkOperations.execute();
            }
            else {
                for(Object object : items) {
                    BulkOperations bulkOperations = template.bulkOps(BulkOperations.BulkMode.UNORDERED, items.getClass());
                    bulkOperations.insert(items);
                    bulkOperations.execute();
                }
            }
        }
    }
}

 

 

 

 

 

참고

 

https://github.com/dwpark1112/til/blob/master/mongodb/bulk-operation.md

 

'Spring' 카테고리의 다른 글

[Spring Batch] ItemWriter  (0) 2020.01.09
[Spring Batch] ItemReader  (0) 2020.01.09
[Spring Batch] Dynamic Scheduler 다이나믹 스케쥴링  (0) 2020.01.09
[개념] Spring Batch 란?  (0) 2020.01.08