Record Help

번외. 카프카 테스트 코드 예제

Consumer 애플리케이션이 데이터를 제대로 수신하는지에 대해 테스트하는 예제이다.

통합 테스트로 작성할 것이며 EmbeddedKafka, TestContainer를 사용하는 두 가지 방법으로 작성할 것이다.

관련 코드는 consumer test에서 확인할 수 있다.

예제 코드

  • Consumer Config

    컨슈머 세팅 코드

    @EnableKafka @EnableConfigurationProperties(KafkaProperties.class) @Configuration @ConditionalOnProperty(name = "spring.kafka.consumer.enabled", havingValue = "true") public class KafkaConsumerConfig { private final KafkaProperties kafkaProperties; public KafkaConsumerConfig(final KafkaProperties kafkaProperties) { this.kafkaProperties = kafkaProperties; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Chat>> kafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // factory.setConcurrency(3); return factory; } @Bean public ConsumerFactory<String, Chat> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new JsonDeserializer<>(Chat.class)); } @Bean public Map<String, Object> consumerConfig() { return Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "chatGroupId", ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); } }
  • Consumer

    메시지를 수신하고 처리할 컨슈머

    @Slf4j @Service public class ChatConsumer { private final ChatRepository chatRepository; public ChatConsumer(final ChatRepository chatRepository) { this.chatRepository = chatRepository; } @KafkaListener(id = "chatListener1", topics = "#{'${spring.kafka.topics.chat}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") public void recordChat(final List<Chat> chat, final Acknowledgment ack) { log.info("Consumed size : {}", chat.size()); chat.forEach(chatRepository::save); ack.acknowledge(); } }
  • Chat

    수신할 토픽

    @Getter public class Chat { private String id; private Long createdAt; private String content; public Chat() { } public Chat(final String id, final Long createdAt, final String content) { this.id = id; this.createdAt = createdAt; this.content = content; } }
  • application.yml

    spring: datasource: url: jdbc:postgresql://localhost:5432/postgres username: postgres password: postgres driver-class-name: org.postgresql.Driver kafka: consumer: enabled: true topics: chat: chat
  • schema.sql

    CREATE TABLE IF NOT EXISTS chat ( id VARCHAR(255), created_at BIGINT NOT NULL, content TEXT NOT NULL, PRIMARY KEY (id) );
  • pom.xml

    테스트 컨테이너 관련 의존성 (그외 더 많음)

    <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency>

테스트 코드

EmbeddedKafka

로컬 DB와 임베디드 카프카를 사용하는 방식이다.

@SpringBootTest @EmbeddedKafka(partitions = 1, topics = "${spring.kafka.topics.chat}", brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}, ports = 9092) @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ChatConsumerTestUsingKafkaUtils { @Value("${spring.kafka.topics.chat}") private String topic; @Autowired private EmbeddedKafkaBroker embeddedKafkaBroker; @Autowired private ObjectMapper objectMapper; private Producer<String, String> producer; @Autowired private JdbcTemplate jdbcTemplate; @BeforeAll void setUp() { producer = new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafkaBroker), new StringSerializer(), new StringSerializer()).createProducer(); } @AfterAll void tearDown() { producer.close(); jdbcTemplate.execute("TRUNCATE TABLE chat"); } @Test void recordChat() throws Exception { // given for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(new Chat(String.valueOf(i), 1624368000000L, "Hello Kafka " + i)))); } // when Thread.sleep(3000L); // then final Integer actual = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM chat", Integer.class); assertThat(actual).isEqualTo(100); } }

TestContainer

@SpringBootTest @Testcontainers @AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) @Sql(scripts = "classpath:schema.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD) class ChatConsumerTestUsingTestContainer { @Container static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); @Container static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer(DockerImageName.parse("postgres:latest")) .withDatabaseName("postgres") .withUsername("postgres") .withPassword("postgres"); @DynamicPropertySource static void kafkaProperties(final DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); registry.add("spring.datasource.username", postgreSQLContainer::getUsername); registry.add("spring.datasource.password", postgreSQLContainer::getPassword); registry.add("spring.datasource.driver-class-name", () -> postgreSQLContainer.getDriverClassName()); } @Value("${spring.kafka.topics.chat}") private String topic; @Autowired private ObjectMapper objectMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private JdbcTemplate jdbcTemplate; @Test void recordChat() throws Exception { // given for (int i = 0; i < 100; i++) { kafkaTemplate.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(new Chat(String.valueOf(i), 1624368000000L, "Hello Kafka " + i)))); } // when Thread.sleep(3000L); // then final Integer actual = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM chat", Integer.class); assertThat(actual).isEqualTo(100); } @TestConfiguration static class KafkaTestConfig { @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { return Map.of( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class ); } } }

마치며

간단하게 메시지를 수신하고 저장하는 예제를 테스트 코드로 작성해보았다.

다만 메시지를 배치로 받는 것, 오프셋이 제대로 들어가는지 이런 것들을 테스트하는 것이 더 중요해보이는데 이런 걸 테스트하는 방법에 대해서는 더 고민이 필요할 것 같다.

Last modified: 14 May 2024