Backend/general

Kafka - Consumer 깊게 파헤치기

새우초밥 2024. 6. 3. 21:15

 

1. 개요

 

이번엔 Kafka의 큰 세가지 축(Producer - Broker - Consumer) 중 하나인 Consumer에 대해서 알아보고자 합니다.

Producer에 대해서 아직 안 읽어보셨다면 다음 글을 먼저 읽어보시는 걸 추천드립니다.

https://rawshrimpsushi.tistory.com/46

 

Kafka - Producer

1. 개요 앞서 살펴본 것처럼 Kafka는 여타 Message Queue처럼 Producer - Consumer 구조를 가지고 있으나 세부적인 디테일은 크게 다릅니다. 오늘은 그 중 Producer를 좀 더 심층적으로 알아보고자 합니다. 앞

rawshrimpsushi.tistory.com

 

오늘 글은 다음 글을 기반으로 소스 코드, 다양한 자료를 보며 작성되었습니다!

https://d2.naver.com/helloworld/0974525

 

2. Kafka Consumer란?

 

1) 기본 개념

기본 아키텍처 편에서 살펴봤던 것처럼 Kafka는 push 방식이 아닌 pull 방식으로 동작합니다. 즉 consumer가 수동적으로 메시지를 받는 것이 아닌 consumer가 능동적으로 자신의 상황에 맞게 메시지를 받아올 수 있습니다. (polling 구조라고도 합니다.)

 

또한 RabbitMQ와 같은 메시지 큐는 메시지가 한번 읽히면 삭제되는 것과 달리 Kafka는 메시지가 로그 방식으로 disk에 저장되어 여러 consumer가 동시 구독이 가능해집니다. offset을 기반으로 구독자들이 얼마나 읽었는지 커밋하고 있다가 읽을 때가 오면 그 다음부터 읽기 때문에 가능합니다.

 

그러면서도 Consumer Group을 통해 같은 주제를 처리하는 소비자들 간의 로드 밸런싱을 적절히 하여 서버 부담을 분산하게 할 수도 있습니다. 

 

이래저래 매력적인 Kafka의 Consumer가 실제로 어떻게 구동되는지 살펴보도록 하겠습니다 :)

 

2) 구성 요소

Consumer를 구성하는 6가지

 

KafkaConsumer는 Producer 때처럼 여러 구조로 나눌 수 있습니다. KafkaConsumer 자체를 제외하면 크게 5개로 나눌 수 있습니다.

 

  • ConsumerNetworkClient : KafkaConsumer의 모든 네트워크 통신을 담당합니다.
  • SubscriptionState: KafkaConsumer는 다른 메시지 시스템과 달리 자신이 소비하는 토픽, 파티션, 오프셋 정보를 추적 및 관리합니다. SubscriptionState가 토픽, 파티션, 오프셋 정보 관리를 담당하고 있습니다.
  • ConsumerCoordinator: 컨슈머 리밸런스, 오프셋 초기화(일부), 오프셋 커밋을 담당합니다. (각 개념은 추후에 살펴보겠습니다.)
  • Fetcher: 브로커로부터 데이터를 가져오는 역할을 담당하는 클래스입니다. 
  • HeartBeat Thread: Polling과 HeartBeat 구분을 위해 만들어진 별도의 스레드로 consumer group에서 active 상태인지 체크하기 위해 사용됩니다.

이렇게 구성요소만 봐서는 전체 흐름을 이해하기 힘들 것이라고 생각합니다! 구체적으로 어떻게 데이터를 받아오게 되는지 코드와 함께 각 구성요소를 거치면서 살펴보도록 하겠습니다.

 

3) KafkaConsumer의 poll

KafkaConsumer의 poll 메서드는 사용자가 직접 호출하는 것으로 브로커에서 데이터를 가져올 수 있게 해줍니다. 사용자가 사용할 수 있는 메서드인 만큼 코드와 함께 살펴보겠습니다.

// consumer 설정을 담을 props 선언
final Properties props = new Properties();  
props.put("bootstrap.servers", brokers);  
props.put("group.id", "testGroup");  // group id 입력
// serializer 지정
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

// KafkaConsumer 생성
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {  
    // 구독할 토픽
    consumer.subscribe(Arrays.asList("topic1"));

    // 무한 루프
    while (true) {
        // poll 메서드를 통해 데이터를 가져온다.
        final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
        for (final ConsumerRecord<byte[], byte[]> record : records) {
            // processing...
        }
    }
}

 

 

Properties에 다양한 설정값을 넣고 해당 설정을 통해 KafkaConsumer를 선언합니다. 그리고 topic을 구독 후 poll을 통해 지속적으로 데이터를 받아올 수 있습니다.

poll 메서드를 호출하면 KafkaConsumer는 내부 구성 요소들과 협력하여 컨슈머 그룹에 참여한 후 브로커로부터 데이터를 가져옵니다.

3번째 줄을 보면 group.id를 설정하는 부분이 있습니다. 이는 위에서도 잠시 언급된 Consumer Group을 지정하는 것입니다. 메시지(앞으로 Record)는 컨슈머 그룹 내에 오직 1개의 컨슈머로만 전달됩니다. 이를 통해 가용성 확보와 병렬 처리가 가능하게끔 합니다. 

Consumer Group의 처리량을 늘리고 싶다면?
group.id가 같은 새로운 KafkaConsumer를 만들어서 poll 메서드를 호출한다.

이유: 같은 컨슈머 그룹에 속한 KafkaConsumer는 다른 파티션을 할당받기 때문에 컨슈머 그룹 내 데이터 처리를 확장한다. Kafka는 파티션 단위로 데이터를 분배하기 때문에 파티션의 수보다 많은 컨슈머를 그룹에 추가한 경우 파티션의 수를 초과한 컨슈머는 파티션을 할당받지 못하여 데이터를 소비하지 못한다.

 

Consumer Group은 브로커의 GroupCoordinator에 의해 관리됩니다. 각각 할당된 파티션은 컨슈머 리밸런스(Consumer Rebalance)를 통해 재배치되는데 이는 추후에 더 살펴보겠습니다.

 

4) ConsumerNetworkClient

ConsumerNetworkClient는 KafkaConsumer의 모든 네트워크 통신을 담당한다.

 

ConsumerNetworkClient의 모든 요청은 비동기로 동작합니다. 따라서 ConsumerNetworkClient의 응답값은 RequestFuture 클래스로 확인합니다.

 

RequestFuture는 다음과 같은 메서드를 제공합니다.

public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {

	...
    private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
    private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
    private final CountDownLatch completedLatch = new CountDownLatch(1);
    
    
     // 응답을 처리할 수 있는지 확인
    public boolean isDone() {
        return result.get() != INCOMPLETE_SENTINEL;
    }
    
    // 응답 처리 가능할 때 value를 받아옴.
    public T value() {
        if (!succeeded())
            throw new IllegalStateException("Attempt to retrieve value from future which hasn't successfully completed");
        return (T) result.get();
    }
    
    // 요청이 성공했는지 확인
    public boolean succeeded() {
        return isDone() && !failed();
    }
    
    // 요청이 실패했는지 확인
    public boolean failed() {
        return result.get() instanceof RuntimeException;
    }
    
    /* 요청이 실패한 경우 호출된다. */
    public void raise(RuntimeException e);
    
    /* complete 메서드가 호출되었을 때 호출될 listener를 추가한다. */
    public void addListener(RequestFutureListener<T> listener);
    
    // 요청이 재시도 가능한지 확인
    public boolean isRetriable() {
        return exception() instanceof RetriableException;
    }
    // request future의 type을 다른 것으로 바꿈
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter);
}

 

RequestFuture 클래스는 Java에서 비동기 작업에 대한 반환값으로 사용되는 Future 클래스와 유사하지만 다릅니다. RequestFuture 클래스는 RequestFuture 타입을 바꿀 수 있는 compose 메서드와 비동기 요청이 완료되는 시점에 호출될 listener를 추가하는 addListener 메서드를 제공한다는 점이 Java Future 클래스와 다른 점입니다.

 

compose 메서드는 다음과 같이 연속적인 호출로 자주 사용됩니다.

client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());

 

ConsumerNetworkClient의 실질적은 네트워크 처리는 NetworkClient를 통해 이루어집니다. NetworkClient도 내용이 방대하기 때문에 다음 글로 따로 정리하도록 하겠습니다.

 

요청 전송 과정을 그림으로 표현하면 다음과 같습니다.

요청 전송 과정

  1. KafkaConsumer의 모든 요청은 ConsumerNetworkClient의 send 메서드를 통해 시작됩니다. 
  2. ConsumerNetworkClient는 send 메서드를 통해 전달된 모든 요청을 ClientRequest로 바꿉니다.
  3. ConsumerNetworkClient는 ClientRequest를 바로 전송하지 않고 내부 버퍼인 Unsent Map에 먼저 저장합니다.
  4. ClientRequest의 전송은 ConsumerNetworkClient의 poll 메서드가 호출될 때 이루어집니다. (위 그림의 a)

1번의 send 메서드는 다음과 같은 시그니처를 가집니다. 주석에도 2번에 설명한 바로 전송하지 않는다는 말이 담겨 있습니다.

send 메서드

코드를 계속 올리고 있는데 Kafka는 Java로 작성되어 있고 주석도 매우 잘 되어 있어 코드를 직접 읽어보시는 것을 강력 추천드립니다!!

 

 

 

응답 처리 과정을 그림으로 표현하면 다음과 같습니다.

응답 처리 과정

 

  1. 브로커가 응답을 주면 NetworkClient는 ConsumerNetworkClient의 내부 큐에 반환된 RequestFuture를 추가합니다. 여기서 큐 이름은 pendingCompletion입니다.
  2. pendingCompletion에 추가된 RequestFuture는 ConsumerNetworkClient의 poll 메서드가 호출될 때 완료 처리가 됩니다.

5) SubscriptionState

SubscriptionState는 토픽, 파티션, 오프셋 정보 관리를 담당합니다.

 

subscriptionState 동작

 

그림을 보면 KafkaConsumer에 토픽, 파티션 할당은 assign 메서드를 통해 이루어지는 걸 알 수 있습니다.

내부 코드를 보면

/**
 * Change the assignment to the specified partitions provided by the user,
 * note this is different from {@link #assignFromSubscribed(Collection)}
 * whose input partitions are provided from the subscribed topics.
 */
 public synchronized boolean assignFromUser(Set<TopicPartition> partitions);
 
/**
* Change the assignment to the specified partitions returned from the coordinator, note this is
* different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs.
*/
public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments)

 

사용자가 직접 호출할 때 불려지는 assignFromUser와 Group Coordinator가 그룹 관리를 할 때 사용되는 assignFromSubscribed로 나뉜 것을 알 수 있습니다.

사용자가 assign 메서드를 직접 호출한 경우 수동으로 토픽, 파티션을 할당할 수 있는데 이 경우에는 컨슈머 리밸런스가 일어나지 않습니다.

사용자가 구독을 요청한 토픽 정보는 SubscriptionState의 subscription에 저장됩니다. subscription에 저장된 토픽 정보는 컨슈머 리밸런스 과정에서 사용됩니다. 그룹 관리 기능을 사용한 경우에는 컨슈머 리밸런스 과정에서 코디네이터에 의해 토픽, 파티션이 할당됩니다.

 

assign 메서드를 통해 할당된 파티션은 초기 오프셋 값 설정이 필요합니다. seek 메서드를 통해 초기 오프셋 값을 설정한다. 초기 오프셋 설정은 오프셋 초기화 과정을 통해 이루어집니다. 사용자가 KafkaConsumer의 seek 메서드를 사용하여 설정할 수도 있습니다.

 

6) ConsumerCoordinator

 

ConsumerCoordinator는 컨슈머 리밸런스, 오프셋 초기화(일부), 오프셋 커밋을 담당합니다.

 

ConsumerCoordinator

 

위 그림을 보면 다양한 Handler 클래스를 사용하는 것을 알 수 있습니다. 각 클래스는 내부에 private class로 선언되어 있고 RequestFutureAdapter를 상속하고 있습니다.

각 클래스는

 

  • 컨슈머 리밸런스: JoinGroupResponseHandler, SyncGroupResponseHandler
  • 오프셋 초기화: OffsetFetchResponseHandler
  • 오프셋 커밋: OffsetCommitResponseHandler
  • HeartBeat 전송: HeartbeatResponseHandler

 

가 사용됩니다. 각 과정에 대해서 더 자세히 알고 싶다면https://d2.naver.com/helloworld/0974525

에 정말 상세하게 정리되어 있습니다. 여기선 간략하게 짚고 넘어가겠습니다.

 

- 컨슈머 리밸런스

 

  • 정의: 파티션의 소유권이 다른 컨슈머로 이전되는 것
  • 트리거: 토픽에 변경 사항이 생기거나 컨슈머 그룹에 새로운 컨슈머가 추가되거나 컨슈머 그룹에 속해 있던 컨슈머가 제외되는 경우에 발생한다.
  • 단계:
  1. GroupCoordinator 찾기: JoinGroup 요청을 보낼 GroupCoordinator를 찾는다.
  2. Join: GroupCoordinator는 그룹에 참여하는 클라이언트 정보와 그룹 메타데이터를 수집한다. 그 뒤 리더를 선정하고 그 클라이언트가 그룹 내 파티션을 할당한다.
  3. Sync: 파티션 할당을 그룹 내에 전파한다.

 

단계를 보면 유추할 수 있듯이 리밸런스 발생 시 컨슈머 그룹은 데이터를 가져오지 않아야 하며 일시정지한 것처럼 보이게 됩니다. 파티션 할당을 컨슈머 간 조정하는 일이기 때문입니다.

 

- 오프셋 초기화

 

  • 정의: 초기 오프셋 값을 설정한다.
  • 필요 이유: 브로커에서 데이터를 읽기 위해서는 파티션의 초기 오프셋 값이 필요하다. SubscriptionState의 assign 메서드를 통해 할당된 파티션은 초기 오프셋 값이 없다. 
  • 과정: 오프셋 초기화는 커밋된 오프셋을 가져오는 과정과 커밋된 오프셋이 없는 경우 오프셋 초기화 정책에 따라 오프셋을 초기화하기 위해 파티션의 오프셋을 가져오는 과정으로 이루어진다.

 

- 오프셋 커밋

 

  • 정의:  커밋할 토픽, 파티션과 오프셋 정보를 GroupCoordinator에게 보낸다.
  • 필요 이유: Kafka는 다른 메시지 서비스와 다르게 컨슈머가 오프셋 정보를 관리하기 때문에 데이터를 읽은 후 컨슈머는 적절한 시점에 오프셋을 커밋해야 한다.

 

7) Fetcher

Fetcher는 브로커로부터 데이터를 가져오는 역할을 담당하는 클래스입니다.

 

Fetcher

 

Consumer 리밸런스와 오프셋 초기화 과정이 끝나면 KafkaConsumer의 poll 메서드를 통해 브로커로부터 데이터를 가져올 수 있습니다. KafkaConsumer의 poll 메서드가 호출되면 먼저 Fetcher의 fetchedRecords 메서드가 호출됩니다. fetchedRecords 메서드는 내부 캐시인 nextInLineRecords와 completedFetches를 확인하여 브로커로부터 이미 가져온 데이터가 있는 경우에는 max.poll.records 설정 값만큼 레코드를 반환합니다.  브로커에서 가져온 데이터가 없는 경우에는 KafkaConsumer는 Fetcher의 sendFetches 메서드를 호출합니다. Fetcher의 sendFetches 메서드는 Fetch API 요청을 파티션 리더가 위치한 각 브로커에게 보냅니다. KafkaConsumer는 Fetcher가 브로커로부터 응답을 받을 때까지 대기합니다.

 

3. 마무리

 

KafkaConsumer poll 메서드가 호출되면 KafkaConsumer는 분주히 브로커로부터 데이터를 가져올 준비를 합니다.

KafkaConsumer가 올바르게 동작하기 위해서는 리밸런스는 필요하지만 안정적인 데이터 처리를 위해서 불필요한 리밸런스는 줄이는 것이 좋습니다. 불필요한 리밸런스를 줄이기 위해서는 max.poll.interval.ms max.poll.records를 적절히 조정하여 poll 메서드가 일정 간격으로 호출되도록 해야 합니다. 필요한 경우에는 heartbeat.interval.ms session.timeout.ms를 조정합니다.

 

앞에서 언급했듯이 리밸런스가 유발하는 Stop the world 현상을 완화시키기 위해 최근에 Kafka는 컨슈머 리밸런스 과정을 증분으로 진행하는 기능을 추가했습니다. 증분 리밸런스를 통해 리밸런스 과정에서 발생하는 컨슈머 정지 시간을 줄일 수 있습니다. 최신 버전의 KafkaConsumer를 사용한다면 증분 리밸런스 기능 사용을 고려해보면 좋을 것 같습니다.

 

또한 KafkaConsumer는 다양한 Processing guarantees(No guarantee, At most once, At least once, Effectively once)를 지원합니다. At least once 방식이 성능은 높이면서 처리를 보장합니다. 컨슈머가 중복된 데이터를 처리해도 문제가 없다면 At least once 방식을 사용하는 것이 유리합니다. 중복된 데이터를 처리해도 문제가 없기 위해 API의 멱등성을 보장하는 방법도 추천할 수 있습니다.