Introduction to AutoMQ: A Cost-Effective Kafka Alternative

1. 개요

Apache Kafka는 가장 인기 있고 널리 사용되는 메시징 및 이벤트 스트리밍 플랫폼 중 하나로 자리 잡았습니다. 그러나 Kafka 클러스터를 설정하고 관리하는 것은 복잡한 작업으로, 대규모 조직에서는 고가용성, 신뢰성, 로드 밸런싱 및 확장을 보장하기 위해 전담 팀이 수행합니다.

AutoMQ는 비용 절감과 효율성 증대에 중점을 둔 Apache Kafka의 클라우드 네이티브 대안입니다. AutoMQ는 공유 스토리지 아키텍처를 사용하여 데이터를 Amazon Simple Storage Service (S3)에 저장하며, Amazon Elastic Block Store (EBS)를 통해 내구성을 보장합니다.

이 튜토리얼에서는 AutoMQ를 Spring Boot 애플리케이션에 통합하는 방법을 살펴보겠습니다. 로컬 AutoMQ 클러스터 설정 과정을 안내하고 기본 프로듀서-소비자 패턴을 구현해 보겠습니다.

2. Testcontainers를 이용한 AutoMQ 설정

로컬 개발 및 테스트를 용이하게 하기 위해 Testcontainers를 사용하여 AutoMQ 클러스터를 설정합니다. Testcontainers를 통해 AutoMQ 클러스터를 실행하려면 활성화된 Docker 인스턴스와 Docker Compose가 필요합니다.

AutoMQ는 Amazon S3 서비스를 에뮬레이트하는 LocalStack와 Amazon EBS를 에뮬레이트하는 로컬 파일 시스템을 사용하는 Docker Compose 파일을 제공합니다. 이 Compose 파일을 설정에 사용하겠습니다.

다음 설정은 프로덕션 환경을 위한 것이 아님을 주의하시기 바랍니다.

2.1. 종속성 추가

우리 프로젝트의 pom.xml 파일에 필요한 종속성을 추가해보겠습니다:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.0</version>
</dependency>

AutoMQ는 Apache Kafka와 완벽하게 호환되며, 동일한 API를 구현하고 동일한 프로토콜과 구성 속성을 사용합니다. 이 덕분에 친숙한 spring-kafka 종속성을 사용하여 AutoMQ를 애플리케이션에 통합할 수 있습니다.

다음으로, 몇 가지 테스트 종속성을 추가하겠습니다:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-testcontainers</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

spring-boot-testcontainers 종속성는 AutoMQ 클러스터에 필요한 덤퍼 Docker 인스턴스를 생성하는 데 필요한 클래스를 제공합니다. 추가로, awaitility 라이브러리를 추가했습니다. 이는 비동기 프로듀서-소비자 구현을 테스트하는 데 도움이 될 것입니다.

2.2. Testcontainers Beans 정의

이제 @TestConfiguration 클래스를 생성하여 Testcontainers beans를 정의해 보겠습니다:

@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {

    private static final String COMPOSE_URL = "https://download.automq.com/community_edition/standalone_deployment/docker-compose.yaml";

    @Bean
    public ComposeContainer composeContainer() {
        File dockerCompose = downloadComposeFile();
        return new ComposeContainer(dockerCompose)
                .withLocalCompose(true);
    }

    private File downloadComposeFile() {
        File dockerCompose = Files.createTempFile("docker-compose", ".yaml").toFile();
        FileUtils.copyURLToFile(URI.create(COMPOSE_URL).toURL(), dockerCompose);
        return dockerCompose;
    }
}

여기서는 Testcontainers의 Docker Compose 모듈을 사용합니다. 먼저, AutoMQ Docker Compose 파일을 다운로드하고 그 내용으로 ComposeContainer bean을 생성합니다.

withLocalCompose() 메서드를 사용하여 true로 설정하여, Testcontainers에 개발 또는 CI 머신에 설치된 Docker Compose 바이너리를 사용하도록 지시합니다.

그러나 Docker Compose의 container_name 속성은 현재 Testcontainers에서 지원하지 않습니다. 임시 해법을 구현해 보겠습니다:

private File downloadComposeFile() {
    // ... 위와 동일
    return removeContainerNames(dockerCompose);
}

private File removeContainerNames(File composeFile) {
    List<String> filteredLines = Files.readAllLines(composeFile.toPath())
            .stream()
            .filter(line -> !line.contains("container_name:"))
            .toList();
    Files.write(composeFile.toPath(), filteredLines);
    return composeFile;
}

비공식 removeContainerNames() 메서드는 다운로드한 Docker Compose 파일에서 containername 속성을 제거합니다. 이 우회 방법은 ComposeContainer bean을 인스턴스화하는 데 사용하는 Docker Compose가 containername 속성을 포함하지 않도록 보장합니다.

마지막으로, 애플리케이션이 AutoMQ 클러스터에 연결할 수 있도록 bootstrap-servers 속성을 설정합니다:

@Bean
public DynamicPropertyRegistrar dynamicPropertyRegistrar() {
    return registry -> {
        registry.add("spring.kafka.bootstrap-servers", () -> "localhost:9094,localhost:9095");
    };
}

기본 AutoMQ bootstrap 서버로 localhost:9094,localhost:9095를 구성하면서 DynamicPropertyRegistrar bean을 정의합니다.

올바른 연결 세부정보가 구성되면 Spring Boot는 나중에 튜토리얼에서 사용할 KafkaTemplate bean을 자동으로 생성합니다.

2.3. 개발 중 Testcontainers 사용하기

Testcontainers는 주로 통합 테스트에 사용되지만, 로컬 개발 중에도 사용할 수 있습니다.

이를 위해 src/test/java 디렉터리에 별도의 메인 클래스를 생성합니다:

public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.from(Application::main)
                .with(TestcontainersConfiguration.class)
                .run(args);
    }
}

TestApplication 클래스를 생성하고, 그 내부의 main() 메서드에서 TestcontainersConfiguration 클래스를 사용하여 메인 Application 클래스를 시작합니다.

이 설정은 외부 서비스를 로컬에서 설정하고 관리하는 데 도움이 됩니다. Spring Boot 애플리케이션을 실행하며 Testcontainers를 통해 시작된 외부 서비스에 연결할 수 있습니다.

3. 프로듀서-소비자 패턴 구현하기

로컬 AutoMQ 클러스터를 설정했으니, 이를 사용하는 기본 프로듀서-소비자 패턴을 구현해 보겠습니다.

3.1. AutoMQ 소비자 구성

먼저 소비자가 수신할 주제 이름을 application.yml 파일에 정의합니다:

com:
  baeldung:
    topic:
      onboarding-initiated: user-service.onboarding.initiated.v1

다음으로, 구성된 주제로부터 메시지를 소비하는 클래스를 생성합니다:

@Configuration
class UserOnboardingInitiatedListener {

    private static final Logger log = LoggerFactory.getLogger(UserOnboardingInitiatedListener.class);

    @KafkaListener(topics = "${com.baeldung.topic.onboarding-initiated}", groupId = "user-service")
    public void listen(User user) {
        log.info("Dispatching user account confirmation email to {}", user.email());
    }

}

record User(String email) {}

여기서 listen() 메서드에 @KafkaListener 주석을 사용하여 주제와 소비자 그룹을 지정합니다. 이 메서드는 user-service.onboarding.initiated.v1 주제로 메시지가 게시될 때마다 호출됩니다.

메시지 페이로드를 나타내기 위해 User record를 정의합니다.

마지막으로 application.yml 파일에 다음 구성을 추가합니다:

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    properties:
      spring.json.value.default.type: com.baeldung.automq.User
      allow.auto.create.topics: true

소비자와 프로듀서 모두에 대해 키 및 값 직렬화 및 역직렬화 속성을 구성합니다. 또한 User record를 기본 메시지 페이로드 유형으로 지정합니다.

마지막으로 토픽의 자동 생성을 활성화하여 AutoMQ가 존재하지 않을 경우 자동으로 생성하게 합니다.

3.2. 메시지 소비 테스트하기

소비자를 구성했으니, 이제 구성된 주제에 게시된 메시지를 소비하고 로그에 기록하는지 확인해 보겠습니다:

@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
@Import(TestcontainersConfiguration.class)
class UserOnboardingInitiatedListenerLiveTest {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    @Value("${com.baeldung.topic.onboarding-initiated}")
    private String onboardingInitiatedTopic;

    @Test
    void whenMessagePublishedToTopic_thenProcessedByListener(CapturedOutput capturedOutput) {
        User user = new User("test@baeldung.com");
        kafkaTemplate.send(onboardingInitiatedTopic, user);

        String expectedConsumerLog = String.format("Dispatching user account confirmation email to %s", user.email());
        Awaitility
                .await()
                .atMost(1, TimeUnit.SECONDS)
                .until(() -> capturedOutput.getAll().contains(expectedConsumerLog));
    }
}

여기서 KafkaTemplate 인스턴스를 자동 주입하고, @Value를 사용해 application.yaml 파일에 저장된 구성된 주제 이름을 주입합니다.

먼저 User 객체를 생성하고, 이를 KafkaTemplate를 사용해 구성된 주제로 전송합니다. 그런 다음 awaitility와 CapturedOutput 인스턴스를 사용하여 소비자가 예상 로그 메시지를 로그에 기록했는지 확인합니다.

소비자가 시작되고 주제로 구독하는 데 약간의 시간이 걸릴 수 있기 때문에, 테스트 사례가 간헐적으로 실패할 수 있습니다. 이를 해결하기 위해 테스트 사례가 실행되기 전에 소비자가 파티션을 할당받을 때까지 기다리겠습니다:

@BeforeAll
void setUp(CapturedOutput capturedOutput) {
    String expectedLog = "partitions assigned";
    Awaitility
            .await()
            .atMost(Durations.ONE_MINUTE)
            .pollDelay(Durations.ONE_SECOND)
            .until(() -> capturedOutput.getAll().contains(expectedLog));
}

@BeforeAll로 주석 처리된 setUp() 메서드에서는 최대 1분 동안 대기하며, 매초마다 폴링하여 CapturedOutput 인스턴스에 파티션 할당 확인 로그가 포함될 때까지 대기합니다.

우리의 테스트 클래스는 비동기 작업을 테스트하는 awaitility 라이브러리의 강력을 보여줍니다.

4. 결론

이 기사에서는 AutoMQ를 Spring Boot 애플리케이션에 통합하는 방법을 살펴보았습니다.

Testcontainers의 Docker Compose 모듈을 사용하여 AutoMQ 클러스터를 시작하고, 로컬 테스트 환경을 만들었습니다.

그런 다음 기본 프로듀서-소비자 아키텍처를 구현하고 성공적으로 테스트했습니다.

모든 코드 예제는 GitHub에서 확인할 수 있습니다.

원본 출처

You may also like...

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다