3. 카프카 프로듀서 구현
Producer
Spring Boot 3.1.2, Spring Kafka 3.4.1 버전을 기준으로 진행한다.
관련 코드는 producer에서 확인할 수 있다.
내부 동작 방식
프로듀서 API 호출
Key-Value 형태로 이뤄진 Kafka 구성 정보를 전달 받음
메시징 데이터를 바이트 배열로 직렬화
전송할 파티션 결정(옵션)
메시지를 보내기 위해 브로커 결정
API에 쓰기 요청
응답 수신 (응답이 수신되명 성공으로 간주) 오류 코드 수신 (예외를 발생시키거나 설정에 따른 재시도 수행)
프로듀서가 하는 역할은 다음과 같다.
카프카 프로커 URL 부트스트랩
프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나의 브로커에 연결한다.
보통 다수의 브로커에 연결한다.데이터 직렬화
브로커에게 메시지를 보내기 전 TCP 기반 데이터 송수신을 위하여 데이터 객체를 바이트 직렬화한다.
토픽 파티션 결정
일반적으로 메시지 데이터 객체의 키 해시를 이용하여 어떤 파티션으로 데이터가 전송되어야 하는지 결정한다.
처리 실패/재시도
프로듀서 API 설정을 통해 재시도 횟수나 예외 처리가 가능하다. 예외 종류에 따른 데이터 흐름을 결정할 수 있다.
배치 처리
배치 처리는 입출력 횟수를 줄이고 프로듀서 메모리를 최적화한다.
보통 메세지 수를 결정하여 처리하며, 전반적인 처리 속도는 배치에서의 메시지 수에 비례해 증가한다.
설정
batch.size
배치 크기. 배치가 다 차면 전송한다.
batch.size가 너무 작으면 한 번에 처리할 수 있는 데이터가 너무 적어지므로 처리량이 감소한다.
linger.ms
전송 대기 시간(기본값 O)
linger.ms 설정으로 어느 정도 대기 시간을 가진 후에 배치를 처리하므로 한 번에 처리할 수 있는 메시지가 늘어 처리량이 증가한다.
max.block.ms
주요 속성은 아닌데 send()로 데이터를 전송할 때, buffer.memory 값을 초과하는 경우 딜레이가 발생한다.
프로듀서가 브로커에 전송하는 속도보다 버퍼에 더 빠르게 쌓일 경우 max.block.ms 시간만큼 send() 요청을 블로킹한다.
buffer.memory
카프카 브로커로 메시지 전송 전에 사용할 최대 버퍼 메모리 크기
설정 한계에 도달하면 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 대기
batch.size가 크다면 프로듀서에 더 많은 메모리를 할당해야 함
request.timeout.ms를 사용해 시간 초과 설정을 적용하는 좋음
ack
0, 1, all 사용
0 : ack를 기다리지 않음 (fire and forget)
1 : 리더가 메시지를 기록하자마자 ack를 받음
all : 리더가 복제하는 팔로워까지 완료된 경우 ack를 받음
compression.type
프로듀서는 압축을 사용하지 않고 메시지를 전송이 Default임
GZIP, Snappy, LZA 형식의 압축이 가능하며, 배치 처리가 많을수록 압축 효과가 좋다.
되도록 사용하길 권장 Logstash에서 압축과 비압축 차이가 컸다
retres
메시지 전송이 실패했을시 예외를 발생시키기전 재전송 시도 값
partitioner.class
사용자 정의 파티션 생성 시 허용
timeout.ms
프로듀서에서 오류를 보내기 전에 팔로워의 Ack를 리더가 기다리는 시간
KafkaTemplate
Kafka에 메시지를 전송하기 위해서는 KafkaProducer 인스턴스를 생성하여 send()
메소드를 호출해야 한다.
KafkaTemplate은 KafkaProducer를 감싸고 있는 인스턴스라고 보면 좋다. 따라서 이 KafkaTemplate 구성을 구현해야 한다.
구성을 구현하는 방법은 크게 2가지가 있다.
KafkaTemplate 빈 등록
설정 파일을 통한 구성
이렇게 두 방법이 있으며 두 방법 모두 동일한 결과를 만들어낸다.
Kafka의 데이터 저장 방식은 직렬화를 통한 바이너리 배열 방식이다. 따라서 데이터를 전송하기 위해서는 Key, Value를 직렬화할 타입을 선언해야 한다.
Producer 구현
Spring Kafka 3.0 버전 이상에서의 예외처리는 CompletableFuture
를 사용하고 이전에는 ListenableFuture
를 사용한다.
비동기 방식으로 Callback을 받는 방식이고 동기 방식으로는 future.get()을 통해 결과를 받을 수 있다.
고려해야 할 점
데이터 유효성 검증을 해야 한다.
프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다.
이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 줄 수 있다.
예외처리를 해야 한다.
어떤 이유에서건 전송에 실패할 경우 상황에 대한 적절한 흐름 제어를 해야 한다.
재시도 횟수
카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다.
부트스트랩 URL 수
카프카 브로커 설정은 반드시 두 개 이상으로 설정한다.
프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다.
잘못된 파티셔닝 방식의 사용 방지
사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다.
사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.
메시지의 임시 보존
카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자.
마치며
카프카의 이론을 넘어 카프카 프로듀서를 직접 구현해보았다.