본문 바로가기
💻 개발 이야기/Apache Kafka

[Kafka] 프로듀서의 내부 구조와 최적화 전략

by Jinseong Hwang 2024. 4. 6.

 

안녕하세요.

이번 글에서는 카프카 프로듀서의 내부 구조와 최적화 전략에 대해 알아보겠습니다.

실제 카프카 클라이언트 코드도 어떻게 구현되어 있는지 함께 살펴봅시다.

 

 

[ 들어가기 전에 ]

아래에서 다루는 내용은 Apache Kafka Java Client에 대한 내용입니다. 언어별 Client에 따라 구현 내용 혹은 동작 방식이 다를 수 있습니다. 공식적으로는 Java만 지원하며, Confluent에서 관리하는 librdkafka 라이브러리를 사용하면 C/C++, Go, .NET, Python도 사용 가능한 것으로 보입니다. 그리고 Apache Kafka는 버전이 변경됨에 따라 Default 값, 내부 동작 방식 등 꾸준히 개선되고 변화하고 있습니다. 작성일 기준에 맞춰 작성됐다는 점을 참고해 주세요.

 

 

이전글을 읽으시면 이해하는 데 도움 되실 수 있습니다!

 

 

00 | 이 글에서 얻을 수 있는 것


  • 클라이언트에서 브로커까지 메시지가 어떻게 전달되는지 자세하게 이해할 수 있다.
  • 프로듀서가 어떻게 동작하는지 코드 레벨까지 명확하게 이해할 수 있다.
  • 프로듀서의 다양한 설정값과 최적화 전략에 대해 이해할 수 있다.
  • 카프카에 대해 아는 척할 수 있다 ⭐️

 

 

01 | 프로듀서의 역할


카프카 시스템을 크게 나눠보면 다음과 같이 3개로 나눌 수 있습니다.

 

1) 데이터를 발행하는 프로듀서(Producer)

2) 프로듀서가 발행한 데이터를 안정적으로 관리하며 컨슈머에게 데이터를 전달해 주는 클러스터(Cluster)

3) 클러스터에서 가져온 데이터를 처리하는 컨슈머(Consumer)

 

전체적인 카프카 시스템의 데이터는 프로듀서로부터 시작됩니다.

 

프로듀서가 발행한 데이터는 여러 브로커의 파티션에 복제되어 저장됩니다. 원본 데이터를 저장하며 프로듀서와 직접 통신하는 리더 파티션이 있고, 리더 파티션으로부터 메시지를 받아 복제본을 저장하는 팔로워 파티션이 있습니다. 프로듀서는 반드시 리더 파티션과 통신합니다.

 

 

02 | 프로듀서의 내부 구조


 

 

[ 주요 항목에 대한 설명 ]

 

ProducerRecord

카프카로 전송할 메시지 객체입니다. "어디로 무엇을 언제" 보내는지에 대한 정보가 담겨 있습니다. 다만, "오프셋은 왜 없지?"라는 의문이 생길 수 있습니다. 오프셋은 실제 브로커에 저장되는 시점에 결정됩니다. 따라서 ProducerRecord에서 결정할 수 없고, 전송 후 응답 데이터인 RecordMetadata에서 오프셋 값을 얻을 수 있습니다.

 

send()

레코드를 전송할 때 사용하는 메서드입니다. 하지만 `send()`를 호출한다고 해서 그 즉시 전송되는 건 아닙니다.

 

Partitioner

어떤 파티션으로 전송할지 결정하는 객체입니다. 레코드에 Key가 있으면 항상 같은 파티션으로 보내주고, Key가 없다면 Round Robin으로 분배하는 특징을 가지고 있습니다. 커스텀 파티셔너를 만들어서 사용하는 것도 가능합니다. 메시지의 순서 보장이 중요하다면, 영향을 미칠 수 있으니 유심히 살펴봐야 합니다.

 

ProducerAccumulator

레코드를 모아서 전송하는 역할을 합니다. 프로듀서와 카프카 클러스터가 TCP Connection을 맺고 전송하는 과정은 비용이 비쌉니다. 비용을 아끼기 위해 모아서 한방에 전송하도록 동작합니다. 이 덕분에 카프카 프로듀서는 높은 처리량을 가질 수 있습니다.

 

Sender

특정 시점에 되면 실제로 카프카 클러스터에게 전송합니다.

 

 

[ 메시지 전송 동작 과정 ]

 

카프카 클라이언트가 메시지를 카프카로 전송한다는 상황이라고 가정하겠습니다.

  1. 메시지가 포함된 ProducerRecord 객체를 생성합니다.
  2. `send()` 함수를 통해 ProducerRecord 객체를 전달합니다.
  3. Serializer가 전송할 메시지를 Byte Array 형태로 직렬화합니다.
  4. Partitioner가 전송할 메시지를 어떤 파티션에 저장할지 결정합니다.
  5. RecordAccumulator에서 전송할 메시지를 토픽, 파티션을 기준으로 묶어서 배치로 관리합니다.
  6. Sender는 Accumulator의 배치를 하나씩 브로커로 전송합니다.

 

 

[ 메시지 전송 후 응답 과정 ]

 

이후 카프카 브로커는 메시지 전송 요청에 대한 응답을 내려줍니다.

  1. 메시지 저장에 성공했다면 `send()` 함수는 메타데이터를 담은 Future 객체를 반환합니다.
  2. 메시지 저장에 실패했다면 다시 Accumulator 배치에 넣어서 재전송을 대기합니다.
  3. 메시지 저장에 실패했는데 재시도 횟수를 초과했다면 예외(callback)가 발생합니다.

 

 

[ 알게 된 카프카 프로듀서의 특징 ]

  • RecordAccumulator를 통해 배치 처리를 하면서 레코드 하나씩 전송하는 것보다 훨씬 좋은 성능을 얻을 수 있었습니다. 프로듀서 측 처리량이 높아지고, 압축과 전송 효율도 좋아졌습니다.
  • Sender는 Java IO Multiplexing을 이용해서 전송을 위한 별도의 스레드를 생성하지 않아도 돼서 성능을 크게 향상했습니다.

 

 

03 | 클라이언트에서 브로커까지 메시지가 전송되는 과정


 

[ Spring Kafka 코드 뜯어보기 ]

 

스프링 애플리케이션에서 카프카로 메시지를 전송한다고 가정합시다. 스프링을 사용한다면 대부분의 개발자들은 spring-kafka를 사용할 것입니다. (무려 2.1K⭐️!)

 

Spring Kafka 코드를 살펴봅시다. Spring Kafka에서는 카프카 클라이언트를 `KafkaTemplate`이라는 클래스로 추상화되어 있습니다. `KafkaTemplate`을 사용해서 카프카로 메시지를 전송하기 위해서는 `send()` 메서드를 호출하면 됩니다. 

 

spring-kafka / KafkaTemplate.java

 

다양한 파라미터 타입의 `send()` 메서드가 오버라이드 되어 있습니다. 그중 가장 위에 있는 `send(String topic, V data)`가 가장 기본적인 전송 메서드입니다. 전송 시 레코드가 저장될 토픽과 레코드 값은 필수적으로 지정해줘야 합니다. 반면에 키 값이나 파티션 번호는 필수로 지정하지 않아도 됩니다.

 

모든 `send()` 메서드는 내부적으로 `observeSend()`를 호출하고 있습니다.

따라갑시다.

 

spring-kafka / KafkaTemplate.java

 

레코드 관찰을 위한 Observation이 시작되고 `doSend()` 메서드를 호출합니다. 참고로 Observation 옵션은 따로 설정하지 않았다면 활성화되지 않습니다. `OpservationRegistry` 디폴트 값이 `OpservationRegistry.NOOP`이기 때문입니다. 활성화를 하고 싶다면 `spring.kafka.listener.observation-enabled=true` 프로퍼티를 넣어주시면 됩니다.

 

`doSend()` 메서드를 계속 따라갑시다.

 

spring-kafka / KafkaTemplate.java

 

큼지막한 동작 3가지만 살펴봅시다.

 

1. 토픽에 적절한 프로듀서 객체 가져오기

`getTheProducer()` 메서드 호출 시 토픽 정보를 전달하고 있습니다. 내부적으로 우선 트랜잭션 프로듀서인지 아닌지를 판단합니다. 프로듀서에서 `transactional.id` 값을 설정했다면 트랜잭션 프로듀서로, 별도로 설정하지 않았다면 트랜잭션을 사용하지 않는 일반 프로듀서처럼 동작하게 됩니다.

 

2. ProducerRecord에 인터셉터를 등록/실행하기

메시지를 전송하기 전/후로 실행되는 동작을 제어할 수 있습니다. 카프카 클라이언트의 코드를 고치지 않으면서 모든 애플리케이션에 동일한 동작을 집어넣을 때 유용합니다. ProducerInterceptor의 `onSend()`, `onAcknowledgement()` 메서드를 구현하면 됩니다.

 

Spring Web MVC의 interceptor와 유사합니다.

비교 동작 이전 동작 이후
Apache Kafka onSend(record) onAcknowledgement(metadata, exception)
Spring Web MVC preHandle(req, res, handler) postHandle(req, res, handler)

 

인터셉터는 모니터링, 트레이싱, 공통 헤더 삽입 등의 경우에 사용합니다.

 

3. 메시지 전송하기

(드디어) `producer.send()`를 합니다. 하지만 이 메서드가 실행됐다고 해서 즉시 전송되는 것은 아닙니다. Sender에게 보내주고, Sender 스레드에 여유가 생기면 배치 단위로 전송하게 됩니다.

 

 

[ Kafka 코드 뜯어보기 ]

 

위에서 호출한 `send()` 메서드는 Spring Kafka가 아닌 Apache Kafka의 Producer.java 인터페이스에 있는 메서드입니다.

 

kafka / KafkaProducer.java

 

KafkaProducer의 `send()` 메서드도 내부적으로 `doSend()` 메서드를 호출합니다.

 

kafka / KafkaProducer.java

 

굉장히 긴 함수가 나왔습니다. 중요한 부분만 큼지막히 살펴봅시다.

 

1. 직렬화

가장 먼저 수행되는 것은 파라미터로 전달받은 ProducerRecord의 키와 값이 네트워크 상에서 전송될 수 있도록 직렬화해서 Byte Array로 변환하는 과정입니다.

 

대부분의 경우에는 Kafka 혹은 Spring Kafka에서 제공되는 직렬화 도구로 충분하지만, 직접 구현해야 할 경우에는 Apache Avro, ApacheThrift, Google Protobuf와 같은 범용 직렬화 프레임워크를 사용하는 것을 권장합니다.

 

2. 파티셔닝

파티션을 명시적으로 지정하지 않았다면, 파티셔너에게 ProducerRecord 객체를 전달해서 어떤 파티션으로 전송할지 결정합니다. 파티셔너는 레코드를 어떤 파티션으로 전송할지 결정해 주는 역할을 합니다. 이 부분에 대해서는 아래에서 자세히 알아봅시다.

 

3. Accumulator 배치에 추가

전송할 ProducerRecord의 토픽과 파티션이 결정됐다면, RecordAccumulator의 알맞은 배치에 ProducerRecord를 append 합니다.

 

kafka / RecordAccumulator.java

 

위 코드는 토픽의 정보를 저장하는 TopicInfo 클래스입니다. 말 그대로 토픽의 정보를 저장하는데, 첫 번째 멤버 변수가 `batches`라는 이름을 가지고 있습니다. Map 자료구조인데 Key에는 파티션 넘버가, Value에 레코드 배치가 Deque로 저장됩니다. 즉, 토픽 내 파티션 단위로 배치가 생성되는 것을 알 수 있습니다.

 

kafka / RecordAccumulator.java

 

Producer Record를 적절한 배치에 넣기 위해서는 배치 저장소(Deque)를 가져와야 합니다. RecordAccumulator에 구현된 코드를 보면, 토픽과 파티션 값을 묶어서 저장하는 VO인 TopicPartition 객체를 `getOrCreateDeque()` 메서드가 파라미터로 받습니다.

 

그리고 토픽과 파티션에 해당하는 배치가 있으면 그대로 반환하고 없으면 ArrayDeque를 만들어서 반환합니다.

 

4. Sender의 배치 전송

특정 배치가 가득 차거나, 새로운 배치가 생성됐거나, `linger.ms` 만큼 시간이 흘렀거나, `batch.size` 만큼 가득 찼다면 배치를 전송하게 됩니다.

 

kafka / RecordAccumulator.java

 

이때 Accumulator에는 Sender 스레드를 wakeup만 해주고, Sender의 스레드에 여유가 생겼을 때 배치에 접근해서 실제로 전송을 진행합니다.

 

5. 응답 처리

`send()`는 `Future<RecordMetadata>` 를 반환합니다.

 

메시지가 성공적으로 브로커에 저장됐다면, 브로커는 토픽, 파티션, 레코드가 저장된 위치(순서)값인 오프셋을 담은 RecordMetadata를 반환합니다. 메시지 저장에 실패했다면 재시도를 하고, 재시도 횟수를 초과했을 경우에는 에러가 리턴됩니다.

 

참고로 acks 옵션을 0으로 설정하면 어떤 오프셋에 저장되었는지 관심을 가지지 않습니다. 따라서 RecordMetadata의 offset 값에도 -1이 들어가게 됩니다. 오프셋은 0부터 순차적으로 커지는 값인데도 불구하고요.

 

6. Producer Graceful shutdown

프로듀서를 종료할 때는 `producer.close()`를 해줘야 합니다. ProducerAccumulator의 배치에 저장된 나머지 데이터를 카프카 클러스터로 보내주고 종료해야 합니다. 그래야 데이터 유실이 발생하지 않습니다.

 

 

04 | 파티셔너


 

[ 카프카의 DefaultPartitioner ]

 

카프카 클라이언트에서 기본적으로 제공하는 파티셔너는 UniformStickyPartitioner, RoundRobinPartitioner로 2개가 있습니다. 별도의 파티셔너를 지정하지 않으면 DefaultPartitioner로 지정됩니다.

 

DefaultPartitioner는 Kafka 2.3 버전까지 RoundRobinPartitioner 구현체를 사용했고, 2.4 버전 이후로는 쭉 UniformStickyPartitioner 구현체를 사용합니다. 즉, 이 글을 읽으시는 대부분은 별도의 설정을 하시지 않았다면 파티셔너는 UniformStickyPartitioner 방식으로 동작할 것입니다.

 

 

[ RoundRobinPartitioner와 UniformStickyPartitioner ]

 

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

 

두 가지 파티셔너는 서로 다르게 동작합니다.

 

이미지의 왼쪽이 RoundRobinPartitioner의 동작입니다. Accumulator에서 배치로 레코드를 묶지 않고, 메시지 전송 요청이 들어오는 대로 파티션을 순회하면서 전송합니다.

 

이미지의 오른쪽이 UniformStickyPartitioner의 동작입니다. RoundRobinPartitioner의 성능을 개선시킨 형태라고 보시면 됩니다. Accumulator에서 배치로 레코드를 묶어서 한방에 전송하도록 합니다. 배치 크기를 초과하지 않는 수준에서 같은 파티션으로 보내는 레코드는 최대한 묶어서 전송합니다. 이 방식도 결국 모든 파티션을 순회하게 되어 있기 때문에 공정하게 분배되나 성능은 RoundRobinPartitioner보다 좋습니다.

 

실제 성능 측정 표는 다음과 같습니다.

 

https://www.conduktor.io/kafka/producer-default-partitioner-and-sticky-partitioner/

 

Round Robin 방식과 Sticky 방식은 파티션이 많아질수록 성능 차이가 크게 나는 모습을 볼 수 있습니다.

 

 

[ 레코드 Key가 파티셔너에 미치는 영향 ]

 

구현체보다 더 중요한 것은 ProducerRecord의 Key 유무입니다. Key는 Value와 함께 레코드를 표현하는 부가 정보이면서, 파티셔닝에 활용되는 중요한 정보입니다. 그리고 (나중에 알아볼) 로그 Clean Up 정책에 맞춰 압축 진행 시, 기준점이 되는 중요한 정보입니다.

 

Key가 있으면 Key의 해시값(MurmurHash2 Algorithm)으로 파티션이 결정됩니다. Java의 해시 알고리즘과 무관한 알고리즘이므로 Java 버전 업/다운그레이드를 해도 파티션 넘버가 바뀌는 불상사는 발생하지 않습니다.

 

레코드 Key가 없다면, 파티셔너는 레코드를 최대한 균일하게 파티션에 분배합니다.

 

 

[ 순서 보장이 필요한 메시지라면 ]

 

순서 보장이 필요하다면 파티셔너가 중요합니다. 파티셔너 만큼이나 중요한 것이 파티션의 개수입니다. 예를 들어, 레코드 키로 파티션에 들어가는 메시지의 순서를 보장하고 있다고 가정합시다. 어떤 파티션으로 갈지는 Key의 해시값을 파티션 개수로 Mod 연산해서 얻게 됩니다.

 

아래는 예시입니다. 파티션 개수가 바뀌면 동일한 메시지(Key)라도 서로 다른 파티션으로 할당되게 됩니다.

 

파티션 개수 변경 전: 3개

  • 15 (메시지 키 해시값) % 3
    • 0번 파티션으로 들어간다.

 

파티션 개수 변경 후: 4개

  • 15 (메시지 키 해시값) % 4
    • 3번 파티션으로 들어간다.

 

 

[ 문제 예방하기 ]

 

이러한 문제를 예방하기 위해서는 미리 파티션을 충분히 만들어 두는 것입니다.

 

예를 들어 프로듀서 1대가 초당 100개의 메시지를 발행하며, 컨슈머 1대는 초당 메시지 10개를 처리할 수 있다고 가정합시다. 이럴 때 파티션도 10개 만들면 이론상 적합합니다. 하지만 프로듀서가 발생하는 데이터 양이 늘어난다면 파티션 개수를 변경해야 합니다. 따라서 파티션 개수를 처음부터 50~100개 등 널널하게 만들어두면 이런 문제로부터 자유로워질 수 있습니다.

 

 

[ 정리 ]

 

ProducerRecord에 파티션 넘버를 설정했다면, 설정한 파티션으로 들어갑니다.

ProducerRecord에 파티션 넘버를 설정하지 않고 Key를 설정했다면, Key의 해시값을 기반으로 파티션이 결정됩니다.

ProducerRecord에 파티션 넘버를 설정하지 않고 Key도 설정하지 않았다면, UniformStickyPartitioner 방식으로 파티셔닝 됩니다.

 

 

05 | 파티셔너 커스텀하기


 

파티셔너를 커스텀 할 수도 있습니다.

 

kafka / Partitioner.java

 

`org.apache.kafka.clients.producer.Partitioner` 인터페이스를 활용해서 커스텀 파티셔너를 만드실 수 있습니다.

 

 

[ 파티셔너 예시 코드 ]

 

package com.jinseong.producer.partitioner

import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster

class MyCustomPartitioner : Partitioner {

    override fun partition(
        topic: String,
        key: Any?,
        keyBytes: ByteArray?,
        value: Any?,
        valueBytes: ByteArray?,
        cluster: Cluster
    ): Int {
        val partitionCount = cluster.partitionsForTopic(topic).size
        return (key.hashCode() & Integer.MAX_VALUE) % partitionCount
    }

    override fun close() {}

    override fun configure(configs: Map<String, *>?) {}
}

 

`partition()`, `close()`, `configure()` 3개의 메서드를 오버라이딩 하면 됩니다.

 

 

[ 파티셔너 적용 ]

 

구현한 클래스의 package 위치를 partitioner.class 프로퍼티에 입력하시면 적용됩니다.

 

partition.class=com.jinseong.producer.partitioner.MyCustomPartitioner

 

 

06 | 프로듀서의 필수 설정값


 

프로듀서를 사용하기 위해 필수적으로 설정해야 하는 값들에 대해 알아보겠습니다.

 

[ bootstrap.servers ]

 

카프카 클러스터와 최초 커넥션을 생성하기 위해 사용하는 host:port 리스트입니다. 모든 브로커의 정보를 작성할 필요는 없지만, 2개 이상 작성하는 것을 권장합니다. 만약 커넥션을 시도하는 시점에 1개가 동작하지 않을 경우 나머지 1개로 접속할 수 있기 때문입니다.

 

최초 커넥션 생성 시, bootstrap.servers에 작성한 첫 번째 주소로 메타데이터 조회를 시도합니다. 특정 브로커는 각 파티션 별 리더 파티션의 정보를 응답으로 내려줍니다. 프로듀서는 해당 정보를 가지고 다른 리더 파티션들이 있는 브로커의 정보를 알 수 있습니다. 그렇기 때문에 모든 브로커에 대한 정보를 작성할 필요가 없습니다.

 

예시) bootstrap.servers=korea-a-server:9092,korea-b-server:9092,korea-c-server:9092

 

[ key.serializer ]

 

ProducerRecord의 Key를 Btye Array로 직렬화하는 클래스입니다. 일반적으로 문자열을 사용하기 때문에 StringSerializer를 사용하시면 됩니다. Key를 사용할 일이 없다면 VoidSerializer로 설정하셔도 됩니다.

 

예시) key.serializer=org.apache.kafka.common.serialization.StringSerializer

 

[ value.serializer ]

 

ProducerRecord의 Value를 Btye Array로 직렬화 하는 클래스입니다. 일반적으로 JSON을 사용하기 때문에 Spring Kafka에서 제공되는 JsonSerializer를 사용하시면 됩니다. 

 

예시) value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

 

kafka / Serializer.java

 

그 외 다른 타입으로 Value를 보내고 싶으시다면 직접 Serializer를 구현해서 설정하셔도 됩니다. Serializer를 직접 구현하려면 `org.apache.kafka.common.serialization.Serializer` 인터페이스를 구현해서 등록하시면 됩니다.

 

 

07 | 프로듀서의 선택 설정값


 

프로듀서와 관련된 옵션들입니다. 프로듀서에서 설정해야 하는 값도 있고 브로커에서 설정해야 하는 값도 있습니다. 반드시 설정할 필요는 없습니다. 디폴트 값 그대로 사용하셔도 문제가 없을 확률이 높고 준수한 성능을 보여줍니다. 아래의 옵션들은 제대로 이해하고 변경하는 것을 추천드립니다.

 

중요하다고 생각하는 몇 가지만 가져왔고, 나머지는 공식 문서를 참고해 주세요!

 

[ acks ]

 

프로듀서가 전송한 레코드가 브로커에 정상적으로 저장됐는지 확인하는 옵션입니다. 이 옵션은 매우 중요하므로 다음 섹션에서 자세히 다루겠습니다. 기본값은 -1(all) 입니다.

 

예시) acks=-1

 

[ linger.ms ]

 

ProducerAccumulator에서 배치를 전송하기 전까지 기다리는 최소 시간입니다. 이 값을 늘리면 배치를 더 오래 모았다가 전송하기 때문에 처리량을 늘릴 수 있습니다. 기본값은 0이라서 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있습니다. 처리량을 늘리려면 10~100 정도로 늘려도 좋습니다.

 

예시) linger.ms=10

 

[ retries ]

 

레코드(배치)를 브로커로 전송했는데 다양한 이유로 에러가 발생할 수 있다. 에러를 받고 난 뒤 재전송을 시도하는 횟수입니다. 기본값 1은 Int.MAX(약 21억)입니다. 메시지 일부가 유실돼도 상관없는 시스템은 0~10 정도로 낮춰도 좋습니다. 재전송 옵션을 완전히 끄고 싶다면 0으로 설정하면 됩니다.

 

예시) retries=1

 

[ buffer.memory ]

 

프로듀서가 메시지를 전송하기 전, 전송 대기 중인 메시지를 저장하는 버퍼 메모리 크기를 결정합니다. 만약 클러스터에서 받을 수 있는 메시지 개수가 초당 10개까지 가능한데, 프로듀서에서 초당 20개를 전송하려 한다면 버퍼 메모리가 금방 가득 찰 것입니다.

 

버퍼 메모리가 가득 찬 후, 전송을 시도한다면 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생길 때까지 기다립니다. 하지만 max.block.ms 만큼 기다렸음에도 버퍼 메모리 공간이 확보되지 않으면 예외를 발생시킵니다. 대부분의 프로듀서 예외는 `send()` 메서드가 반환하는 Future 객체에서 발생합니다. 하지만 이 예외는 `send()` 메서드 자체에서 발생한다는 차이가 있습니다.

 

기본값은 33,554,432입니다. (32MB)

 

예시) buffer.memory=33554432

 

[ compression.type ]

 

기본적으로 메시지를 압축하지 않지만, 압축해서 브로커로 전송하고 싶다면 이 옵션을 설정하면 됩니다. 주로 'snappy', 'gzip', 'lz4', 'zstd' 등의 압축 알고리즘으로 설정할 수 있습니다.

 

특히 구글에서 개발한 Snappy 압축 알고리즘은 CPU 부하가 작으면서도 성능이 좋으며 꽤 괜찮은 압축률을 보여줍니다. 어찌 됐든 압축을 하게 되면 CPU 리소스가 추가로 필요하니, 네트워크 대역폭이 제한적일 때 압축 알고리즘을 설정하면 됩니다. 기본값은 none입니다.

 

예시) compression.type=snappy

 

[ batch.size ]

 

같은 토픽 같은 파티션으로 여러 레코드를 전송할 경우 프로듀서의 ProducerAccumulator에 이를 모아서 한꺼번에 전송합니다. batch.size는 각각의 배치에 사용될 메모리 양을 결정합니다.

 

하지만 이 값을 크게 잡는다고 해서 가득 찰 때까지 기다리지는 않는다. 프로듀서는 절반만 찬 배치 혹은 하나의 레코드만 들어있는 배치도 전송합니다. 그렇기 때문에 이 매개변수를 지나치게 크게 잡는다고 해서 메시지 전송이 지연되지는 않는다. linger.ms 만큼 시간이 지나면 전송하기 때문입니다. 하지만 지나치게 작게 잡는다면 너무 자주 전송해야 하기 때문에 오버헤드가 발생할 수 있습니다.

 

만약 batch.size (byte단위)에 설정된 값보다 큰 크기의 레코드가 들어온다면 배치로 모으지 않고 곧바로 전송합니다.

기본값은 16,384입니다. (16MB)

 

예시) batch.size=16384

 

[ max.request.size ]

 

메시지의 최대 크기를 제한하기도 하고, 한 번의 요청에 보낼 수 있는 메시지의 최대 개수 역시 제한하는 값입니다. 예를 들어, 이 값을 1MB로 설정했다고 가정합시다. 이 경우 전송 가능한 메시지의 최대 크기는 1MB가 됩니다. 그리고 1KB 짜리 메시지는 1024개까지 전송할 수 있습니다.

 

그리고 브로커에는 브로커가 받아들일 수 있는 최대 메시지 크기를 결정하는 message.max.bytes 옵션이 있습니다. 이 옵션 역시 조절해서 프로듀서가 브로커가 받아들일 수 있는 크기 내에서 메시지를 전송하도록 강제하는 것이 좋습니다.

 

기본값은 1,048,576입니다. (1MB)

 

예시) max.request.size=1048576

 

[ max.in.flight.requests.per.connection ]

 

프로듀서가 클러스터로부터 응답을 받지 못한 상황에서 전송할 수 있는 최대 레코드 개수입니다. 다시 말해, Sender와 카프카 클러스터 사이에 유지하고 있는 커넥션 개수를 의미합니다. 설정된 값만큼 동시에 전송할 수 있습니다. 처리량을 늘리고 싶다면 이 값을 늘려보는 것도 좋습니다. 물론 메모리 사용량도 비례해서 증가됩니다. 기본값은 5입니다.

 

만약 순서에 민감한 데이터를 카프카로 전송하고 있다면 이 값을 주의해서 설정해야 합니다. retries를 0보다 큰 값으로 설정하고, max.in.flight.requests.per.connection을 1보다 큰 값으로 설정한 경우 순서 보장이 깨지는 경우가 발생할 수 있습니다. 그렇다고 max.in.flight.requests.per.connection을 1로 설정하는 것은 성능상 문제가 생길 수 있습니다. 이 상황에서 가장 합리적인 선택은 enable.idempotence를 true로 설정하는 것입니다.

 

예시) max.in.flight.requests.per.connection=5

 

[ partitioner.class ]

 

레코드가 저장될 파티션을 결정하는 파티셔너를 설정합니다. 기본값은 DefaultPartitioner입니다. DefaultPartitioner는 카프카 2.5 버전부터 내부적으로 UniformStickyPartitioner를 사용합니다.

 

에시) partitioner.class=com.jinseong.producer.MyCustomPartitioner

 

[ enable.idempotence ]

 

카프카의 "정확히 한 번(Exactly once)" 의미 구조의 핵심 옵션입니다. 멱등성 프로듀서로 동작하게 할지 결정합니다. 즉, 같은 메시지를 여러 번 보내도 동일한 상태를 가지게 할지 결정할 수 있습니다. 멱등성 프로듀서를 활성화하면 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보내게 됩니다. 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 되며, 프로듀서는 DuplicateSequenceException을 발생시킵니다. 기본값은 true입니다.

 

enable.idempotence를 true로 설정하기 위해서는 retries가 1 이상, max.in.flight.requests.per.connection가 5 이하, acks가 all로 설정되어 있어야 합니다. 만약 이 조건을 만족하지 않는다면 ConfigException이 발생합니다.

 

예시) enable.idempotence=true

 

[ transactional.id ]

 

프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 결정합니다. 기본값은 null입니다. 이 부분도 아래에서 조금 더 자세히 알아봅시다.

 

예시) transactional.id=my-tx-id

 

[ client.id ]

 

프로듀서의 식별자입니다. 애플리케이션을 구별하는 데 사용됩니다. 디버깅이나 추적하는 것을 쉽게 하기 위해 설정하곤 합니다. 기본값은 ""입니다.

 

예시) client.id=jinseong-producer

 

 

08 | acks 옵션으로 신뢰도 조절하기


https://www.conduktor.io/kafka/kafka-producer-acks-deep-dive/

ISR(In-Sync-Replicas)은 리더 파티션과 팔로워 파티션이 모두 동기화가 완료된 상태를 의미합니다. 리더 파티션의 오프셋 개수와 팔로워 파티션의 오프셋 개수가 동일하다면 ISR이라고 볼 수 있습니다. ISR 상태가 되어야 온전히 Fail over가 가능해집니다.

 

프로듀서와 컨슈머는 토픽의 리더 파티션과 통신합니다. 만약 리더 파티션에 장애가 발생한다면 어떻게 될까요? ISR이 갖춰진 상태여야만 팔로워 파티션이 리더 파티션을 온전히 대체할 수 있습니다. 당연하게도, 리더 파티션과 팔로워 파티션 간 데이터 복제가 이뤄지려면 시간이 필요합니다. 파티션 간 데이터를 복제하는 데 걸리는 지연 시간을 Replication Lag이라고 부릅니다.

 

acks 옵션을 사용하면 카프카 클러스터의 신뢰도를 조절할 수 있습니다.

 

[ acks 옵션에 따른 동작 ]

 

acks=0

프로듀서가 리더 파티션으로 데이터를 전송하지만, 잘 저장됐는지 확인하지 않겠다는 의미입니다. 데이터 전송 속도는 다른 acks 옵션보다 훨씬 빠릅니다. 하지만 데이터 유실이 발생할 수 있습니다. 예를 들어, 유저 활동 이벤트, GPS 이벤트 등을 다룰 때 적합합니다.

 

acks=1

프로듀서는 리더 파티션에게 데이터를 전송합니다. 그리고 리더 파티션에 데이터가 성공적으로 저장됐는지 확인하겠다는 의미입니다. 만약 리더 파티션에 정상적으로 적재되지 않았다면 재시도를 하게 됩니다. 꽤 빠르며 대부분의 경우에 사용할 수 있습니다.

 

acks=-1 (all)

프로듀서가 리더 파티션에 데이터를 전송하고, 리더 파티션이 팔로워 파티션들에게 데이터를 전송(동기화)하고, 팔로워 파티션과 리더 파티션 모두 정상적으로 적재 됐는지 확인하겠다는 의미입니다. 다른 acks 옵션에 비해 가장 느리지만, 신뢰도가 가장 높습니다. 예를 들어, 돈 관련 작업처럼 절대 유실이 발생하면 안 되는 경우에 이 옵션을 사용하면 적합합니다. 작성일 기준 카프카 최신 버전에서 기본값으로 설정되어 사용됩니다.

 

 

[ min.insync.replicas ]

 

여기서 함께 보면 좋은 옵션이 있습니다. 바로 `min.insync.replicas`입니다. acks가 all 일 때 유의미한 동작을 하게 됩니다.

 

만약 Replication factor를 3으로 설정해서 3개의 파티션(리더 1 + 팔로워 2)에 토픽 데이터가 복제되어 있다고 가정해 봅시다. 반드시 3개의 파티션에 완벽히 복제가 필요한 상황은 나머지 2대(리더 1 + 팔로워 1)에 동시에 장애가 발생한 상황입니다. 하지만 정말! 이런 경우가 생길 가능성은 매우 희박합니다.

 

min.insync.replicas를 2로 설정하면 리더 파티션 1개와 팔로워 파티션 1개에 대해서만 동기화를 보장하고, 나머지 1대에 대해서는 따로 확인하지 않습니다. 하지만 acks를 all로 설정하고 min.insync.replicas를 1로 설정하면 acks=all이 무시되고 acks=1로 설정한 것처럼 동작합니다. 이처럼 이 값을 잘 활용하면 신뢰도와 성능 트레이드오프를 더욱 세밀하게 조절할 수 있게 됩니다.

 

 

[ acks와 end-to-end latency의 상관관계 ]

 

acks 옵션을 사용하면 신뢰도와 성능을 트레이드 오프 할 수 있다는 것을 알게 됐습니다. 하지만 레코드가 생성되어 컨슈머가 읽을 수 있는 상태가 되기까지 시간인 종단 지연(End to end latency)은 acks 옵션과 상관없습니다. 카프카는 일관성을 유지하기 위해 모든 ISR에 복제가 완료된 뒤에 컨슈머가 레코드를 읽어갈 수 있게 하기 때문입니다. 따라서 단순히 프로듀서 처리량을 넘어서 종단 지연이 고려되어야 하는 상황이라면 acks 옵션을 변경할 필요가 없습니다. acks=0과 acks=all이 동일한 종단 지연을 가지기 때문입니다.

 

 

09 | 멱등성 프로듀서


 

멱등성 프로듀서 관련해서 다루고 싶은 내용이 많습니다. 이번 글에서는 가볍게만 알아봅시다.

 

 

[ 멱등성 프로듀서란 ]

 

일반 프로듀서는 하나의 메시지를 카프카 클러스터에게 적어도 한번(At least once) 전달하는 방식으로 동작합니다. 즉, 중복 저장이 가능합니다. 이러한 문제를 멱등성 프로듀서를 사용하면 해결할 수 있습니다.

 

멱등성 프로듀서는 하나의 메시지를 정확히 한번(Exactly once)만 전달되도록 하고 싶을 때 사용합니다. 다시 말해, 동일한 데이터를 여러 번 전송하더라도 클러스터에 단 한 번만 저장됩니다.

 

멱등성 프로듀서로 설정하기 위해서는 `enable.idempotence=true`로 설정하시면 됩니다. 2 버전에서는 false가 기본값이었는데, 3 버전 이후로는 true가 기본값입니다. `enable.idempotence=true`로 설정하면 강제로 reties=Integer.MAX_VALUE, acks=all로 설정됩니다.

 

 

[ 멱등성 프로듀서의 동작 방식 ]

 

  1. 프로듀서가 레코드를 브로커로 전달할 때 PID(Producer unique ID), Epoch 값과 시퀀스 넘버를 함께 전달합니다.
  2. 브로커는 프로듀서의 PID와 시퀀스 넘버를 확인해서 동일한 메시지 적재 요청이 왔을 때 중복 제거 후 단 1건만 저장합니다.

 

 

[ 멱등성 프로듀서 운영 시 고려할 점 ]

 

  • 시퀀스 넘버는 파티션 내에서 순차적으로 증가해야 하는데 넘버 점프가 발생하면 OutOfOrderSequenceException이 발생합니다.
  • 동일한 프로듀서에 대해서만 멱등성 보장이 가능합니다. PID로 판단하기 때문이다. 항상 보장되는 것은 아니다.

 

 

[ 멱등성 프로듀서가 아니라면 ]

 

반드시 멱등성을 프로듀서 쪽에서 보장해 줄 필요는 없습니다. 컨슈머에서 여러 메시지가 들어왔을 때 단 한 번만 처리하도록 해도 무관합니다. 하지만 저장 공간, 네트워크 IO 등 모두 비용이기 때문에 프로듀서 측에서 한 번만 저장되도록 하는 것이 여러모로 저렴합니다.

 

 

10 | 트랜잭션 프로듀서


 

카프카의 트랜잭션을 제대로 사용하기 위해서는 프로듀서뿐만 아니라 컨슈머 쪽에서도 신경을 써줘야 합니다. 이번 글에서는 트랜잭션 프로듀서에 관해서만 가볍게 알아봅시다.

 

 

[ 트랜잭션 프로듀서 ]

 

관계형 데이터베이스에서 트랜잭션의 성질로 ACID를 자주 언급하곤 합니다. 이 중, 특히 메시지 간 원자성(Atomic)을 보장하는 방법으로 카프카에서는 트랜잭션 기능을 제공하고 있습니다. 일반적인 트랜잭션 컨셉과 유사하게, 모든 메시지가 저장에 성공해야 commit이 되고 단 하나의 메시지라도 저장에 실패하면 모두가 실패하게 됩니다.

 

 

[ 트랜잭션 프로듀서의 동작 ]

 

`transactional.id` 값을 설정하면 트랜잭션 프로듀서가 활성화됩니다. 이 값은 프로듀서별로 고유한 값을 사용해야 합니다. 프로듀서를 특정할 수 있는 고유한 문자열이나 UUID를 사용하셔도 나쁘지 않습니다.

 

예시로 다음과 같습니다.

transactional.id=my-tx-producer

 

 

 

[ 트랜잭션 프로듀서 예제 코드 ]

 

config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID());
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

var producer = new KafkaProducer<>(config);

producer.initTransactions();

producer.beginTransaction();
producer.send(new ProducerRecord<>("Topic", "Value"));
producer.commitTransaction();

producer.close();

 

  • `transactional.id` 와 isolation level을 설정해줘야 합니다. 특히 isolation level은 read_committed로 설정해야 커밋된 데이터만 가져옵니다. 기본값은 read_uncommitted입니다.
  • 트랜잭션을 시작하고 메시지를 전송하고 트랜잭션을 커밋해줘야 합니다.

 

 

11 | 맺음말


 

계속해서 공부를 하고 있는 내용인데 결코 간단하지 않다는 걸 많이 느꼈습니다. 따라서 이 글은 비정기적으로 내용을 보충/수정하겠습니다. 혹시 잘못된 정보나 부족한 부분이 있다면 댓글 부탁드려요. 감사한 마음으로 적극 반영하겠습니다. 긴 글 읽어주셔서 감사합니다!

 

그리고 함께 스터디하는 출고팀에 감사함을..

 

 

12 | References