Kafka - Producer 깊게 파헤치기

1. 개요

 

앞서 살펴본 것처럼 Kafka는 여타 Message Queue처럼 Producer - Consumer 구조를 가지고 있으나 세부적인 디테일은 크게 다릅니다. 오늘은 그 중 Producer를 좀 더 심층적으로 알아보고자 합니다. 앞으로 RabbitMQ와의 비교까지 갈 길이 머니 얼른 정리해보겠습니다!

 

오늘 글은 다음 글을 기반으로 소스 코드를 보며 작성된 글입니다!

https://d2.naver.com/helloworld/6560422

 

 

2. Kafka Producer란?

 

Kafka는 Producer의 데이터 전송을 위해서 Producer API를 제공합니다. Producer는 브로커에 특정 Topic(partition까지도 가능)을 지정하여 메시지를 전달합니다. 오늘은 Producer에서 유저에게 직접적으로 제공하는 API뿐만 아니라 연관된 전체 흐름을 살펴보고자 합니다.

 

1)  기본 구성 요소

  1. KafkaProducer: 사용자가 직접 사용하는 파트. 이 클래스의 send()를 호출함으로써 Record를 전송합니다.
  2. RecordAccumulator: KafkaProducer가 send()를 호출하면 임시로 저장하는 Buffer. 실제 Broker로의 전송은 이후 비동기적으로 이루어집니다.
  3. Sender: KafkaProducer가 생성하는 별도의 Sender Thread. RecordAccumulator에 저장한 Record들을 Broker로 전송하고 응답을 받아 설정된 콜백을 실행한다. 또한 결과를 Future를 통해 사용자에게 전달합니다.

사용자가 KafkaProducer의 send를 호출하면 RecordAccumulator에 저장되었다가 비동기적으로 Sender Thread가 전송한다.


2) 전송 흐름

 

- send API 콜에서 RecordAccumulator에 저장되기 까지

 

1. KafkaProducer send();

 

사용자는 send를 통해 Kafka Broker로 메시지를 전송할 수 있습니다. 전송은 비동기로 이루어집니다. 이 비동기 구현은 전송 흐름 파트 전반을 살펴보면 이해할 수 있을 것입니다 :) 응답은 callback 형식으로 이루어집니다.

 

비동기에 대해 이해가 부족하시다면 제 예전 글인 동기, 비동기에 대한 글을 참고해주시면 되고

https://rawshrimpsushi.tistory.com/35

 

소켓 프로그래밍 - 블로킹, 논 블로킹, 비동기, epoll, IOCP, IO Uring

1. 들어가며 게임을 만들며 게임 서버가 필요한 로직이 들어간다면 Photon, 프라우드넷 등 다양한 네트워크 엔진을 고려하게 됩니다. 모두 유용한 도구이고 특히 프라우드넷의 경우 마비노기 영웅

rawshrimpsushi.tistory.com

추후에 비동기 소켓 프로그래밍에 대해서도 글을 작성할 예정입니다. 해당 글이 완료되면 같이 첨부하겠습니다.

 

콜백에 대해 이해가 부족하다면 다음 글을 추천드립니다.

https://velog.io/@pllap/Java%EC%97%90%EC%84%9C%EC%9D%98-%EB%B9%84%EB%8F%99%EA%B8%B0-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D

 

Java에서의 비동기 프로그래밍

동기? 비동기? 글을 시작하기에 앞서, 동기와 비동기가 무엇인지 간단하게 설명해 보자면 다음과 같다. > 작업을 수행하는 두 주체 A, B가 있다고 가정하자. 동기 (sync) A가 작업을 끝내는 시간에

velog.io

 

 

우선 send 메소드의 시그니처부터 살펴볼까요?

send 메소드 내부

 

사용자는 send() 호출 시 전송할 Record와 전송 완료 후 실행할 콜백을 지정할 수 있습니다.

Record는 topic, partition를 가지고 key와 value 형태로 이루어져 있습니다.

 

콜백 인터페이스 내부 메소드

 

위의 콜백 인터페이스를 보면 onCompletion 메소드를 가지고 있음을 알 수 있습니다. onCompletion 메소드는 RecordMetaData와 Exception이 있는데, RecordMetaData에는 Record가 저장된 partition과 offset 등이 담겨 있습니다. 예외가 발생했다면 담겨온 Exception에 대해 처리해주면 됩니다.

 

위 코드에선 doSend로 내부 구현이 은닉되어 있는데요. 내부 동작은 여러 단계로 나뉘어 있습니다.

send 호출 시 Serialization(직렬화), Partitioning(파티션 결정), Compression(압축) 작업이 순서대로 이루어집니다.

그리고 최종적으로 RecordAccumulator에 Record가 저장됩니다.

그림으로 표현하면 다음과 같습니다.

Producer.send(new ProducerRecord(“topic0”, “hello”), callback); 호출 시

 

그럼 각 단계를 하나씩 살펴보겠습니다.

 

2. Serialization (직렬화)

 

앞서 살펴본 것처럼 Record는 key, value 형태로 되어 있습니다. Record Key, Value는 지정된 Serializer에 의해서 byte array로 변환됩니다. Serializer key.serializer, value.serializer 설정값으로 지정하거나 KafkaProducer 생성 시 지정할 수 있습니다.

StringSerializer 외에 ByteArraySerializer, ByteBufferSerializer, BytesSerializer, DoubleSerializer, IntegerSerializer, LongSerializer를 제공하고 있습니다.

 

혹시 직렬화가 낯선 분이라면 다음 글을 참조해주세요!
https://inpa.tistory.com/entry/JAVA-%E2%98%95-%EC%A7%81%EB%A0%AC%ED%99%94Serializable-%EC%99%84%EB%B2%BD-%EB%A7%88%EC%8A%A4%ED%84%B0%ED%95%98%EA%B8%B0

 

3, Partitioning (파티션 지정)

 

Kafka Topic은 여러 개의 Partition으로 나뉘어 있습니다. 사용자의 Record는 지정된 Partioner에 의해서 어떤 파티션으로 보내질지 정해집다. partitioner.class를 설정하여 Partitioner를 지정할 수 있으며, Partitioner를 지정하지 않으면 DefaultPartitioner가 사용됩니다. DefaultPartitioner는 다음과 같이 동작합니다.

 Key 값이 있는 경우: Key 값의 Hash 값을 이용해서 Partition을 할당한다.
 Key 값이 없는 경우: Round-Robin 방식으로 Partition이 할당된다.

 

4. Compression (압축)

 

Kafka는 기본적으로 1MB 이하 크기의 메시지를 전송하도록 설계가 되어 있습니다. 그 이상의 용량을 전송할 땐 애초에 적합하지도 않습니다만 압축이라는 선택지도 있습니다.  메시지 용량이 1MB를 초과하지 않더라도 메시지 크기는 전송 속도에 영향을 주기 때문에 좋은 선택입니다. 사용자가 전송하려는 Record는 압축을 함으로써 네트워크 전송 비용도 줄일 수 있고 저장 비용도 줄일 수 있습니다.

compression.type을 설정하여 압축 시 사용할 코덱을 지정할 수 있습니다. 기본값은 none으로 gzip, snappy, Iz4가 사용 가능합니다. 일반적으로 gZip은 압축률이 높은만큼 CPU 사용량이 높고, LZ4는 반대입니다.

 

- RecordAccumulator에 저장되는 방식

 

위 과정을 거쳐서 Record는 전송되기 전 RecordAccumulator에 먼저 저장됩니다. 이번엔 어떤 절차를 거쳐 저장되는지 살펴보겠습니다.

 

1. RecordAccumulator batches

 

RecordAccumulator batches라는 Map을 가지고 있는데, Map Key TopicPartition이고, ValueDeque<RecordBatch>입니다.

batches MAP

2. 크기 검증

 

RecordAccumulator에 저장하기 전에 Record Serialized Size를 검사합니다. Serialized Size max.request.size 설정값 또는 buffer.memory 설정값보다 크면 RecordTooLargeException이 발생합니다. 크기가 문제없으면, RecordAccumulator append()를 이용해서 저장합니다.

 

3. RecordAccumulator append()

 

RecordAccumulator의 append()가 호출되면 batches에서 추가될 Record에 해당하는 TopicPartition의 Deque를 찾는다.

 

Deque Last에서 RecordBatch 하나를 꺼내서 Record를 저장할 공간이 있는지 확인합니다. 여유 공간이 있으면 해당 RecordBatch Record를 추가하고, 여유 공간이 없으면 새로운 RecordBatch를 생성해서 Last쪽으로 저장합니다. Queue를 사용하지 않고 Deque가 사용된 이유는 append() 시에 가장 최근에 들어간 RecordBatch를 꺼내서 봐야 하기 때문입니다.

 

 

- Broker로의 전송

 

RecordAccumulator에 저장되었다면 이제 드디어 전송할 준비를 마쳤습니다.

 

1. RecordAccumulator drain()

 

Broker Record를 전송하기 위해서 RecordAccumulator에서 Record를 꺼냅니다. RecordAccumulator drain()을 통해서 각 Broker별로 전송할 RecordBatch List를 얻을 수 있습니다.

 

2. Sender Thread

 

RecordAccumulator에서 꺼낸 Record의 실제 전송은 Sender Thread가 맡는다. Sender Thread RecordAccumulator에 저장된 Record를 꺼내서 Broker로 전송하고 응답을 처리한다.

 

전송은 max.request.size 설정값을 넘지 않을 때까지 Batch 단위로 모아서 전송됩니다. 이렇게 모인 RecordBatch List는 하나의 ProduceRequest로 만들어져서 Broker Node로 전송됩니다. ProduceRequest InFlightRequests라는 Node Deque에 먼저 저장된다. 그리고 이렇게 저장된 순서대로 실제 Broker Node로 전송이 이루어집니다

Broker Node로의 전송은 Java IO Multiplexing 방식으로 별도의 Thread를 사용하지 않고 Sender Thread에서 비동기적으로 이루어집니다. (Java NIO)

 

참고! 요청이 실패할 경우 retries 설정값이 1 이상인 경우 재시도하기 때문에 max.in.flight.requests.per.connection 값이 1보다 크면 순서가 바뀔 수 있다. 순서를 보장하려면 max.in.flight.requests.per.connection 값을 1로 설정해야 한다. 하지만 이렇게 설정하면 동시에 1개의 요청만 처리할 수 있기 때문에 전송 성능이 떨어질 수 있다.

 

'Backend > general' 카테고리의 다른 글

Kafka - Network Client 깊게 파헤치기  (0) 2024.06.19
Kafka - Consumer 깊게 파헤치기  (1) 2024.06.03
Kafka - 기본 아키텍처 알아보기  (0) 2024.06.03
트랜잭션과 락  (2) 2024.03.07
MySQL 아키텍쳐  (0) 2024.03.06