Introduction to AutoMQ: A Cost-Effective Kafka Alternative
1. 개요
Apache Kafka는 가장 인기 있고 널리 사용되는 메시징 및 이벤트 스트리밍 플랫폼 중 하나로 자리 잡았습니다. 그러나 Kafka 클러스터를 설정하고 관리하는 것은 복잡한 과정으로, 대규모 조직에서는 고가용성, 신뢰성, 로드 밸런싱 및 확장을 보장하기 위해 전담 팀이 일반적으로 수행합니다.
AutoMQ는 Apache Kafka의 클라우드 네이티브 대안으로, 비용 절감과 효율성 증대에 중점을 두고 있습니다. 이 플랫폼은 공유 저장소 아키텍처를 사용하여 데이터를 Amazon Simple Storage Service (S3)에 저장하고, Amazon Elastic Block Store (EBS)를 통해 내구성을 보장합니다.
이 튜토리얼에서는 Spring Boot]() 애플리케이션에 AutoMQ를 통합하는 방법을 탐구해 보겠습니다. 로컬 AutoMQ 클러스터 설정과 기본 프로듀서-소비자 패턴 구현을 진행할 것입니다.
2. Testcontainers를 통한 AutoMQ 설정
로컬 개발 및 테스트를 용이하게 하기 위해 Testcontainers를 사용하여 AutoMQ 클러스터를 설정할 것입니다. Testcontainers를 통해 AutoMQ 클러스터를 실행하기 위한 전제 조건은 활성화된 Docker 인스턴스와 Docker Compose입니다.
AutoMQ는 로컬 배포를 위한 Docker Compose 파일을 제공하며, 이 파일은 LocalStack를 사용하여 Amazon S3 서비스를 에뮬레이트하고 로컬 파일 시스템을 통해 Amazon EBS를 에뮬레이트합니다. 이 Compose 파일을 설정에 사용할 것입니다.
다음의 설정은 프로덕션 환경을 위한 것이 아님을 유념해야 합니다.
2.1. 의존성 추가
우리 프로젝트의 pom.xml 파일에 필요한 의존성을 추가해 보겠습니다:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.0</version>
</dependency>
AutoMQ는 Apache Kafka와 완전히 호환되며, 즉 동일한 API를 구현하고 동일한 프로토콜 및 구성 속성을 사용합니다. 이를 통해 친숙한 spring-kafka 의존성을 사용하여 AutoMQ를 애플리케이션에 통합할 수 있습니다.
다음으로 몇 개의 테스트 의존성을 추가하겠습니다:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
spring-boot-testcontainers 의존성는 AutoMQ 클러스터에 필요한 일회성 Docker 인스턴스를 생성하는 데 필요한 클래스를 제공합니다.
추가로, 초래한 비동기 프로듀서-소비자 구현을 테스트하는 데 도움이 되는 awaitility 라이브러리를 추가했습니다.
2.2. Testcontainers 빈 정의
이제 @TestConfiguration 클래스를 생성하여 Testcontainers 빈을 정의해 보겠습니다:
@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {
private static final String COMPOSE_URL = "https://download.automq.com/community_edition/standalone_deployment/docker-compose.yaml";
@Bean
public ComposeContainer composeContainer() {
File dockerCompose = downloadComposeFile();
return new ComposeContainer(dockerCompose)
.withLocalCompose(true);
}
private File downloadComposeFile() {
File dockerCompose = Files.createTempFile("docker-compose", ".yaml").toFile();
FileUtils.copyURLToFile(URI.create(COMPOSE_URL).toURL(), dockerCompose);
return dockerCompose;
}
}
여기서 우리는 Testcontainers의 Docker Compose 모듈을 사용합니다. 먼저, AutoMQ Docker Compose 파일을 다운로드하고 그 내용으로 ComposeContainer 빈을 생성합니다.
withLocalCompose() 메서드를 사용하고 true로 설정하여 Testcontainers에 개발 또는 CI 머신에 설치된 Docker Compose 이진 파일을 사용하도록 지시합니다.
그러나 Docker Compose의 container_name 속성은 현재 Testcontainers에서 지원되지 않습니다. 이를 해결하기 위해 임시로 우회 구현을 진행하겠습니다:
private File downloadComposeFile() {
// ... 위와 동일
return removeContainerNames(dockerCompose);
}
private File removeContainerNames(File composeFile) {
List<String> filteredLines = Files.readAllLines(composeFile.toPath())
.stream()
.filter(line -> !line.contains("container_name:"))
.toList();
Files.write(composeFile.toPath(), filteredLines);
return composeFile;
}
개인적인 removeContainerNames() 메서드는 다운로드한 Docker Compose 파일에서 containername 속성을 제거합니다. 이 작업을 통해 ComposeContainer 빈을 인스턴스화하는 데 사용되는 Docker Compose에 containername 속성이 포함되지 않도록 합니다.
마지막으로 애플리케이션이 AutoMQ 클러스터에 연결할 수 있도록 bootstrap-servers 속성을 구성하겠습니다:
@Bean
public DynamicPropertyRegistrar dynamicPropertyRegistrar() {
return registry -> {
registry.add("spring.kafka.bootstrap-servers", () -> "localhost:9094,localhost:9095");
};
}
우리는 DynamicPropertyRegistrar 빈을 정의하면서 기본 AutoMQ 부트스트랩 서버인 localhost:9094,localhost:9095 를 구성합니다.
올바른 연결 세부정보가 구성되면 Spring Boot는 나중에 튜토리얼에서 사용할 KafkaTemplate 빈을 자동으로 생성합니다.
2.3. 개발 중 Testcontainers 사용
Testcontainers는 주로 통합 테스트에 사용되지만, 로컬 개발 중에도 사용할 수 있습니다.
이를 위해 src/test/java 디렉토리에 별도의 메인 클래스를 생성하겠습니다:
public class TestApplication {
public static void main(String[] args) {
SpringApplication.from(Application::main)
.with(TestcontainersConfiguration.class)
.run(args);
}
}
TestApplication 클래스를 만들고 내부의 main() 메서드에서 TestcontainersConfiguration 클래스를 사용하여 우리의 주 Application 클래스를 시작합니다.
이 설정은 외부 서비스를 로컬에서 설정하고 관리하는 데 도움이 됩니다. Spring Boot 애플리케이션을 실행하고 Testcontainers를 통해 시작된 외부 서비스에 연결할 수 있습니다.
3. 프로듀서-소비자 패턴 구현
이제 로컬 AutoMQ 클러스터를 설정했으므로 이를 사용하여 기본 프로듀서-소비자 패턴을 구현하겠습니다.
3.1. AutoMQ 소비자 구성
먼저, 소비자가 듣는 주제 이름을 application.yml 파일에 정의해 보겠습니다:
com:
baeldung:
topic:
onboarding-initiated: user-service.onboarding.initiated.v1
다음으로, 구성된 주제에서 메시지를 소비할 클래스를 생성하겠습니다:
@Configuration
class UserOnboardingInitiatedListener {
private static final Logger log = LoggerFactory.getLogger(UserOnboardingInitiatedListener.class);
@KafkaListener(topics = "${com.baeldung.topic.onboarding-initiated}", groupId = "user-service")
public void listen(User user) {
log.info("Dispatching user account confirmation email to {}", user.email());
}
}
record User(String email) {
}
여기서 우리는 @KafkaListener 주석을 사용하여 listen() 메서드에서 주제 및 소비자 그룹을 지정합니다. 이 메서드는 user-service.onboarding.initiated.v1 주제에 메시지가 게시될 때마다 호출됩니다.
우리는 메시지 페이로드를 나타내기 위해 User 레코드를 정의합니다.
마지막으로 application.yml 파일에 다음 구성을 추가하겠습니다:
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.value.default.type: com.baeldung.automq.User
allow.auto.create.topics: true
소비자와 프로듀서 모두에 대해 키 및 값 직렬화 및 비직렬화 속성을 구성합니다. 추가로, 기본 메시지 페이로드 유형으로 우리의 User 레코드를 지정합니다.
마지막으로 주제의 자동 생성을 활성화하여 AutoMQ가 존재하지 않는 경우 자동으로 생성하도록 합니다.
3.2. 메시지 소비 테스트
이제 소비자를 구성했으므로 로그가 설정된 주제로 게시된 메시지를 소비하는지 확인하겠습니다:
@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
@Import(TestcontainersConfiguration.class)
class UserOnboardingInitiatedListenerLiveTest {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
@Value("${com.baeldung.topic.onboarding-initiated}")
private String onboardingInitiatedTopic;
@Test
void whenMessagePublishedToTopic_thenProcessedByListener(CapturedOutput capturedOutput) {
User user = new User("test@baeldung.com");
kafkaTemplate.send(onboardingInitiatedTopic, user);
String expectedConsumerLog = String.format("Dispatching user account confirmation email to %s", user.email());
Awaitility
.await()
.atMost(1, TimeUnit.SECONDS)
.until(() -> capturedOutput.getAll().contains(expectedConsumerLog));
}
}
여기서 우리는 KafkaTemplate 클래스의 인스턴스를 자동 주입하고, @Value를 사용하여 application.yaml 파일에 저장된 주제 이름을 주입합니다.
먼저 User 객체를 생성하고 KafkaTemplate를 사용하여 구성된 주제로 전송합니다. 그런 다음 awaitility 및 OutputCaptureExtension에서 제공하는 CapturedOutput 인스턴스를 사용하여 소비자가 예상 로그 메시지를 기록했는지 확인합니다.
우리의 테스트 케이스는 소비자가 시작하여 주제에 가입하는 데 시간이 걸리므로 간헐적으로 실패할 수 있습니다. 이를 해결하기 위해 테스트 케이스가 실행되기 전에 소비자가 파티션을 할당받을 때까지 기다리겠습니다:
@BeforeAll
void setUp(CapturedOutput capturedOutput) {
String expectedLog = "partitions assigned";
Awaitility
.await()
.atMost(Durations.ONE_MINUTE)
.pollDelay(Durations.ONE_SECOND)
.until(() -> capturedOutput.getAll().contains(expectedLog));
}
@BeforeAll로 주석이 달린 setUp() 메서드에서는 주어진 시간 내에 CapturedOutput 인스턴스에 로그를 포함할 때까지 최대 1분을 기다립니다. 이를 통해 파티션 할당을 확인합니다.
우리의 테스트 클래스는 또한 비동기 작업을 테스트할 때 awaitility 라이브러리의 힘을 잘 보여줍니다.
4. 결론
이 기사에서는 AutoMQ를 Spring Boot 애플리케이션에 통합하는 방법을 탐구했습니다.
Testcontainers의 Docker Compose 모듈을 사용하여 AutoMQ 클러스터를 시작하고 로컬 테스트 환경을 구축했습니다.
그런 다음 기본 프로듀서-소비자 아키텍처를 구현하고 성공적으로 테스트했습니다.
모든 코드 예제는 GitHub에서 확인할 수 있습니다.