번외. 카프카 테스트 코드 예제
Consumer 애플리케이션이 데이터를 제대로 수신하는지에 대해 테스트하는 예제이다.
통합 테스트로 작성할 것이며 EmbeddedKafka
, TestContainer
를 사용하는 두 가지 방법으로 작성할 것이다.
관련 코드는 consumer test에서 확인할 수 있다.
예제 코드
Consumer Config
컨슈머 세팅 코드
@EnableKafka @EnableConfigurationProperties(KafkaProperties.class) @Configuration @ConditionalOnProperty(name = "spring.kafka.consumer.enabled", havingValue = "true") public class KafkaConsumerConfig { private final KafkaProperties kafkaProperties; public KafkaConsumerConfig(final KafkaProperties kafkaProperties) { this.kafkaProperties = kafkaProperties; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Chat>> kafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // factory.setConcurrency(3); return factory; } @Bean public ConsumerFactory<String, Chat> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new JsonDeserializer<>(Chat.class)); } @Bean public Map<String, Object> consumerConfig() { return Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "chatGroupId", ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); } }Consumer
메시지를 수신하고 처리할 컨슈머
@Slf4j @Service public class ChatConsumer { private final ChatRepository chatRepository; public ChatConsumer(final ChatRepository chatRepository) { this.chatRepository = chatRepository; } @KafkaListener(id = "chatListener1", topics = "#{'${spring.kafka.topics.chat}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") public void recordChat(final List<Chat> chat, final Acknowledgment ack) { log.info("Consumed size : {}", chat.size()); chat.forEach(chatRepository::save); ack.acknowledge(); } }Chat
수신할 토픽
@Getter public class Chat { private String id; private Long createdAt; private String content; public Chat() { } public Chat(final String id, final Long createdAt, final String content) { this.id = id; this.createdAt = createdAt; this.content = content; } }application.yml
spring: datasource: url: jdbc:postgresql://localhost:5432/postgres username: postgres password: postgres driver-class-name: org.postgresql.Driver kafka: consumer: enabled: true topics: chat: chatschema.sql
CREATE TABLE IF NOT EXISTS chat ( id VARCHAR(255), created_at BIGINT NOT NULL, content TEXT NOT NULL, PRIMARY KEY (id) );pom.xml
테스트 컨테이너 관련 의존성 (그외 더 많음)
<dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency>
테스트 코드
EmbeddedKafka
로컬 DB와 임베디드 카프카를 사용하는 방식이다.
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "${spring.kafka.topics.chat}", brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}, ports = 9092)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ChatConsumerTestUsingKafkaUtils {
@Value("${spring.kafka.topics.chat}")
private String topic;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ObjectMapper objectMapper;
private Producer<String, String> producer;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeAll
void setUp() {
producer = new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafkaBroker), new StringSerializer(), new StringSerializer()).createProducer();
}
@AfterAll
void tearDown() {
producer.close();
jdbcTemplate.execute("TRUNCATE TABLE chat");
}
@Test
void recordChat() throws Exception {
// given
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(new Chat(String.valueOf(i), 1624368000000L, "Hello Kafka " + i))));
}
// when
Thread.sleep(3000L);
// then
final Integer actual = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM chat", Integer.class);
assertThat(actual).isEqualTo(100);
}
}
TestContainer
@SpringBootTest
@Testcontainers
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@Sql(scripts = "classpath:schema.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
class ChatConsumerTestUsingTestContainer {
@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Container
static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer(DockerImageName.parse("postgres:latest"))
.withDatabaseName("postgres")
.withUsername("postgres")
.withPassword("postgres");
@DynamicPropertySource
static void kafkaProperties(final DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", postgreSQLContainer::getUsername);
registry.add("spring.datasource.password", postgreSQLContainer::getPassword);
registry.add("spring.datasource.driver-class-name", () -> postgreSQLContainer.getDriverClassName());
}
@Value("${spring.kafka.topics.chat}")
private String topic;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
void recordChat() throws Exception {
// given
for (int i = 0; i < 100; i++) {
kafkaTemplate.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(new Chat(String.valueOf(i), 1624368000000L, "Hello Kafka " + i))));
}
// when
Thread.sleep(3000L);
// then
final Integer actual = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM chat", Integer.class);
assertThat(actual).isEqualTo(100);
}
@TestConfiguration
static class KafkaTestConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
);
}
}
}
마치며
간단하게 메시지를 수신하고 저장하는 예제를 테스트 코드로 작성해보았다.
다만 메시지를 배치로 받는 것, 오프셋이 제대로 들어가는지 이런 것들을 테스트하는 것이 더 중요해보이는데 이런 걸 테스트하는 방법에 대해서는 더 고민이 필요할 것 같다.
Last modified: 14 May 2024