Import CSV in Elasticsearch Using Spring Boot
1. 소개
이 튜토리얼에서는 Spring Boot를 사용하여 CSV 파일에서 Elasticsearch로 데이터를 가져오는 방법을 배웁니다. CSV 파일에서 데이터를 가져오는 것은 레거시 시스템이나 외부 출처에서 데이터를 마이그레이션하거나 테스트 데이터셋을 준비해야 할 때 일반적인 사용 사례입니다.
2. Docker로 Elasticsearch 설정하기
Elasticsearch를 사용하기 위해 Docker를 사용하여 로컬에서 설정하겠습니다. Elasticsearch 컨테이너를 시작하려면 다음 단계를 따르세요:
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0
다음으로, 아래 명령어를 사용하여 컨테이너를 실행합니다:
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0
이제 “products.csv”라는 샘플 Excel 파일을 다음과 같은 데이터로 생성해봅시다:
id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...
3. 수동 for 루프를 사용하여 CSV 데이터 처리하기
첫 번째 방법은 수동 for 루프를 사용하여 CSV 파일에서 레코드를 읽고 Elasticsearch에 인덱싱하는 것입니다. 이 방법을 구현하기 위해 Apache Commons CSV 라이브러리를 사용하여 CSV 파일을 구문 분석하고, Elasticsearch Rest High-Level Client를 사용하여 Elasticsearch 검색 엔진과 통합하겠습니다.
먼저 pom.xml 파일에 필요한 종속성을 추가합니다:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.11</version>
</dependency>
종속성을 추가한 후, Elasticsearch 구성을 설정해야 합니다. RestHighLevelClient를 설정하기 위해 구성 클래스를 생성합니다:
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
return RestClients.create(ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build()).rest();
}
}
그 다음, CSV 데이터를 표현하기 위해 Product 클래스를 생성합니다:
@Document(indexName = "products")
public class Product {
@Id
private String id;
private String name;
private String category;
private double price;
private int stock;
// Getters and setters
}
그 후, Spring Boot 애플리케이션 내에서 CSV 가져오기 프로세스를 처리하는 서비스를 생성합니다. 서비스 내에서 CSV 파일의 각 레코드를 반복하기 위해 for 루프를 사용합니다:
@Autowired
private RestHighLevelClient restHighLevelClient;
public void importCSV(File file) {
try (Reader reader = new FileReader(file)) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withHeader("id", "name", "category", "price", "stock")
.withFirstRecordAsHeader()
.parse(reader);
for (CSVRecord record : records) {
IndexRequest request = new IndexRequest("products")
.id(record.get("id"))
.source(Map.of(
"name", record.get("name"),
"category", record.get("category"),
"price", Double.parseDouble(record.get("price")),
"stock", Integer.parseInt(record.get("stock"))
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
} catch (Exception e) {
// exception 처리
}
}
각 레코드에 대해 IndexRequest 객체를 구성하여 Elasticsearch에 인덱싱할 데이터를 준비합니다. 그리고 RestHighLevelClient를 사용하여 데이터를 인덱싱합니다.
CSV 파일에서 Elasticsearch 인덱스로 데이터를 가져와 봅시다:
File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);
그 다음, 첫 번째 인덱싱을 쿼리하여 내용이 예상 값과 일치하는지 확인합시다:
IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
"name", "Microwave",
"category", "Appliances",
"price", 705.77,
"stock", 136
), firstRequest.sourceAsMap());
이 접근법은 간단하며 프로세스를 완벽하게 제어할 수 있습니다. 그러나 대량의 파일에 대해서는 비효율적이고 시간이 많이 소요될 수 있기 때문에 소규모 데이터셋에 더욱 적합합니다.
4. Spring Batch를 사용하여 확장 가능한 데이터 가져오기
Spring Batch는 Java에서 배치 처리를 위한 강력한 프레임워크입니다. 이는 대규모 데이터 가져오기를 블록 단위로 처리하기에 적합합니다.
Spring Batch를 사용하기 위해 pom.xml 파일에 Spring Batch 의존성을 추가해야 합니다:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.4.1</version>
</dependency>
4.1. Spring 구성 파일 정의하기
다음으로, 배치 작업을 정의하기 위해 구성 클래스를 생성합니다. 이 구성에서 @EnableBatchProcessing 주석을 사용하여 배치 작업을 생성하고 관리할 수 있는 Spring Batch 기능을 활성화합니다.
FlatFileItemReader를 사용하여 CSV 파일을 읽고, ItemWriter를 사용하여 데이터를 Elasticsearch에 쓰도록 설정합니다. 또한 Spring 구성 파일에서 RestHighLevelClient 빈을 생성하고 구성합니다:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
// ...
@Autowired
private RestHighLevelClient restHighLevelClient;
}
4.2. 리더 정의하기
CSV 파일에서 데이터를 읽기 위해 reader() 메서드를 생성하고 FlatFileItemReader를 정의합니다. 다양한 설정으로 리더를 구성하기 위해 FlatFileItemReaderBuilder를 사용합니다:
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new FileSystemResource("products.csv"))
.delimited()
.names("id", "name", "category", "price", "stock")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Product.class);
}})
.build();
}
리더의 이름을 name() 메서드를 사용하여 지정하여 배치 작업 내에서 식별할 수 있게 합니다. 또한 resource() 메서드는 CSV 파일의 위치인 “products.csv”를 FileSystemResource를 사용하여 지정합니다. 이 파일은 구분된(쉼표로 구분된) 형식으로 되어 있어 delimited() 메서드를 통해 명시합니다.
names() 메서드는 CSV 파일의 열 헤더를 나열하고 이를 Product 클래스의 필드와 매핑합니다. 마지막으로, fieldSetMapper() 메서드는 CSV 파일의 각 줄을 BeanWrapperFieldSetMapper를 사용하여 Product 객체로 매핑합니다.
4.3. 라이터 정의하기
다음으로, 처리된 데이터를 Elasticsearch에 쓰기 위해 writer() 메서드를 생성합니다. 이 메서드는 Product 객체 목록을 수신하는 ItemWriter를 정의합니다. RestHighLevelClient를 사용하여 Elasticsearch와 상호작용합니다:
@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
return products -> {
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(Map.of(
"name", product.getName(),
"category", product.getCategory(),
"price", product.getPrice(),
"stock", product.getStock()
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
};
}
목록의 각 제품에 대해 IndexRequest를 생성하여 Elasticsearch 인덱스와 문서 구조를 지정합니다. id() 메서드는 Product 객체의 ID를 사용하여 각 문서에 고유 식별자를 할당합니다.
source() 메서드는 Product 객체의 name, category, price, stock 등의 필드를 Elasticsearch가 저장할 수 있는 키-값 형식으로 매핑합니다. 요청을 구성한 후, client.index() 메서드를 사용하여 Product 레코드를 Elasticsearch에 전송하여 해당 product를 검색 및 검색할 수 있도록 인덱싱합니다.
4.4. Spring Batch 작업 정의하기
마지막으로, importJob() 메서드를 생성하고 Spring Batch의 JobBuilder와 StepBuilder를 사용하여 작업 및 단계를 구성합니다:
@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager,
RestHighLevelClient restHighLevelClient) {
return new JobBuilder("importJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(reader())
.writer(writer(restHighLevelClient))
.build())
.build();
}
이 예제에서는 JobBuilder를 사용하여 작업을 구성합니다. 작업 이름 “importJob”과 JobRepository를 인자로 사용합니다. 또한 “step1″이라는 단계를 구성하고, 이 작업이 한 번에 10개의 레코드를 처리하도록 지정합니다. transactionManager는 청크 처리 중 데이터 일관성을 보장합니다.
reader() 및 writer() 메서드는 단계에 데이터 흐름을 처리하도록 통합됩니다. 다음으로, start() 메서드를 사용하여 작업과 단계를 연결합니다. 이 연결은 단계가 작업의 일부로 실행되도록 보장합니다. 이 구성이 완료되면 Spring의 JobLauncher를 사용하여 작업을 실행할 수 있습니다.
4.5. 배치 작업 실행하기
JobLauncher를 사용하여 Spring Batch 작업을 실행하는 코드를 살펴봅시다. 애플리케이션이 시작될 때 작업을 실행하는 CommandLineRunner 빈을 생성합니다:
@Configuration
public class JobRunnerConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
JobExecution execution = jobLauncher.run(importJob, new JobParameters());
} catch (Exception e) {
// 예외 처리
}
};
}
}
작업을 성공적으로 실행한 후, curl을 사용하여 요청을 수행하여 결과를 테스트할 수 있습니다:
curl -X GET "http://localhost:9200/products/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"match_all": {}
}
}'
예상 결과를 확인해봅시다:
{
...
"hits": {
"total": {
"value": 25,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Microwave",
"category": "Appliances",
"price": 705.77,
"stock": 136
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "2",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Vacuum Cleaner",
"category": "Appliances",
"price": 1397.23,
"stock": 92
}
},
...
]
}
}
이 방법은 이전 방법보다 설정이 더 복잡하지만 데이터 가져오기에 대한 확장성과 유연성을 제공합니다.
5. Logstash를 사용하여 CSV 데이터 가져오기
Logstash는 Elastic 스택의 일부로 데이터 처리 및 수집을 위해 설계되었습니다.
Docker를 사용하여 Logstash를 신속하게 설정할 수 있습니다. 먼저 Logstash 이미지를 풀하고 실행합니다:
docker pull docker.elastic.co/logstash/logstash:8.17.0
이미지를 다운로드한 후, Logstash를 위한 구성 파일 “csv-to-es.conf”를 생성합니다. 이 파일은 Logstash가 CSV 파일을 읽고 데이터를 Elasticsearch로 전송하는 방법을 정의합니다:
input {
file {
path => "/path/to/your/products.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id", "name", "category", "price", "stock"]
}
mutate {
convert => { "price" => "float" }
convert => { "stock" => "integer" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "products"
}
stdout {
codec => json_lines
}
}
이 파일에서는 데이터 파이프라인의 입력, 필터 및 출력 단계를 정의합니다. 입력 단계는 읽을 CSV 파일을 지정하고, 필터 단계는 데이터를 처리하고 변환합니다. 마지막으로 출력 단계는 처리된 데이터를 Elasticsearch에 전송합니다.
구성 파일을 설정한 후, Logstash 파이프라인을 실행하기 위해 docker run 명령을 호출해야 합니다:
docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/products.csv:/usr/share/logstash/products.csv \
docker.elastic.co/logstash/logstash:8.17.0
이 명령은 설정 및 CSV 파일을 Logstash 컨테이너에 마운트하고 데이터를 Elasticsearch로 가져오기 위해 데이터 파이프라인을 실행합니다. 명령을 성공적으로 실행한 후, 결과를 확인하기 위해 curl 쿼리를 다시 실행할 수 있습니다.
Logstash는 사용자 정의 코드 없이 CSV 데이터를 Elasticsearch에 효율적으로 가져오며, 대규모 데이터 세트를 처리하고 자동화된 데이터 파이프라인을 설정하는 데 인기가 있습니다.
6. 요약
이제 CSV 파일에서 Elasticsearch로 데이터를 가져오는 세 가지 방법을 살펴보았으므로 각 방법의 장단점을 비교해 보겠습니다:
방법 | 장점 | 단점 |
---|---|---|
수동 for 루프 | 구현이 쉽고 완전한 제어 가능 | 대량 파일에 비효율적 |
Spring Batch | 대규모 데이터셋에 맞게 확장 가능 | 초보자에게 복잡한 설정 |
Logstash | 코딩 필요 없이 높은 성능 | Logstash 설치 필요 |
7. 결론
이 기사에서는 CSV 데이터를 Elasticsearch로 가져오는 방법을 수동 for 루프, Spring Batch 및 Logstash의 세 가지 방법으로 다루었습니다. 각 접근법은 그 강점을 가지고 있으며 다양한 사용 사례에 적합합니다.
이 기사의 전체 구현은 GitHub에서 확인할 수 있습니다.