Apache Kafka

Apache Kafka의 오프셋 offset

김 정출 2024. 10. 28. 14:38

Apache Kafka의 오프셋 offset

  • Kafka에서 오프셋 관리는 각 컨슈머가 어디까지 메시지를 읽었는지를 추적하여 메시지 처리가 중복되거나 누락되지 않도록 하는 중요한 메커니즘입니다.
  • 이를 통해 Kafka는 높은 데이터 일관성을 유지하면서도 분산 환경에서 효율적으로 메시지를 관리할 수 있습니다.

1. 오프셋(Offset)의 개념

오프셋은 Kafka의 각 파티션에서 특정 메시지의 위치를 나타내는 숫자로, 각 메시지에는 파티션 내에서 고유한 오프셋이 할당됩니다. 예를 들어, 파티션 A에서 세 번째 메시지의 오프셋은 2(0부터 시작)이며, 이 오프셋을 기반으로 Kafka는 컨슈머가 메시지를 읽은 위치를 추적할 수 있습니다.


2. 오프셋 관리 방식

Kafka에서는 오프셋을 저장하고 관리하는 방식이 컨슈머 그룹의 상태와 메시지 재처리 요구에 따라 다르게 운영됩니다. Kafka의 오프셋 관리 방식은 다음과 같습니다:

자동 커밋과 수동 커밋

  • 자동 커밋: Kafka는 기본적으로 오프셋을 자동으로 커밋하도록 설정할 수 있습니다. 이 경우 컨슈머가 주기적으로 (기본값은 5초) 마지막으로 읽은 메시지의 오프셋을 Kafka에 자동으로 저장합니다. 하지만 이 방식은 중간에 실패가 발생해도 커밋이 완료된 오프셋 이후의 메시지는 이미 읽은 것으로 간주되기 때문에 메시지의 일부가 누락될 위험이 있습니다.
  • 수동 커밋: 컨슈머는 오프셋을 수동으로 커밋할 수도 있습니다. 이를 통해 오프셋을 메시지 처리 완료 시점에 정확히 커밋하여 중복 처리를 방지하거나, 필요한 경우 메시지를 재처리할 수 있습니다. 주로 중복이 없는 메시지 처리가 필요한 금융 거래 시스템 등에서 유용하게 사용됩니다.

수동 커밋 예시

Kafka에서 오프셋을 수동으로 커밋하는 예시를 Golang으로 구현하려면, 주로 Kafka 클라이언트 라이브러리인 confluent-kafka-go를 사용합니다. 이 라이브러리를 통해 Kafka 컨슈머에서 메시지를 처리한 후 수동으로 오프셋을 커밋할 수 있습니다.

다음은 confluent-kafka-go를 사용하여 Kafka 메시지를 수동 커밋하는 예제입니다.

package main

import (
    "fmt"
    "log"
    "os"
    "time"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    config := &kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "my-group",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": false,  // 수동 커밋 설정
    }

    c, err := kafka.NewConsumer(config)
    if err != nil {
        log.Fatalf("Failed to create consumer: %s", err)
    }

    defer c.Close()

    // 구독할 토픽 설정
    topics := []string{"my-topic"}
    err = c.SubscribeTopics(topics, nil)
    if err != nil {
        log.Fatalf("Failed to subscribe to topics: %s", err)
    }

    fmt.Println("Consumer started. Waiting for messages...")

    for {
        // 메시지 폴링
        msg, err := c.ReadMessage(10 * time.Second)
        if err != nil {
            if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrTimedOut {
                fmt.Println("Polling timed out, continuing...")
                continue
            }
            log.Printf("Consumer error: %v (%v)\\n", err, msg)
            continue
        }

        // 메시지 처리
        fmt.Printf("Received message: %s\\n", string(msg.Value))

        // 메시지 처리 완료 후 오프셋 수동 커밋
        _, err = c.CommitMessage(msg)
        if err != nil {
            log.Printf("Failed to commit offset: %v\\n", err)
        } else {
            fmt.Printf("Offset committed: %v\\n", msg.TopicPartition)
        }
    }
}

커밋 저장 위치

Kafka에서 오프셋 정보는 기본적으로 **__consumer_offsets**라는 내부 토픽에 저장됩니다. 이 토픽은 각 컨슈머 그룹과 파티션에 대해 읽은 마지막 오프셋을 기록하여, 컨슈머가 재시작될 때 해당 위치에서 이어서 읽을 수 있도록 합니다.


3. 컨슈머 그룹과 오프셋 관리

Kafka에서는 컨슈머가 하나의 컨슈머 그룹(Consumer Group) 내에 속하게 되며, 각 컨슈머 그룹은 개별적으로 오프셋을 관리합니다. 컨슈머 그룹이 다르면 같은 메시지를 읽더라도 각 그룹마다 별도의 오프셋이 관리되므로, 동일한 메시지를 여러 컨슈머 그룹에서 개별적으로 처리할 수 있습니다.

  • 리밸런싱(Rebalancing): 컨슈머 그룹 내에서 컨슈머가 추가되거나 제거될 때 파티션이 재할당되면서 리밸런싱이 발생합니다. 리밸런싱이 발생하면 오프셋 정보가 새로운 컨슈머에게 이전되고, 새로운 컨슈머는 이전 컨슈머의 마지막 커밋된 오프셋부터 메시지를 읽기 시작합니다.

4. 트랜잭션과 정확히 한 번 전송 (Exactly-Once Semantics)

Kafka는 트랜잭션과 연계해 오프셋을 안전하게 관리하여 정확히 한 번 전송(exactly-once semantics, EOS)을 보장합니다. 이 기능은 트랜잭션 내에서 오프셋을 커밋하고, 커밋되지 않은 트랜잭션의 오프셋은 무시하여 정확히 한 번만 메시지를 처리할 수 있게 합니다.


5. 장애 복구와 오프셋 관리

Kafka에서 컨슈머가 장애로 인해 중단되거나 재시작되는 경우, 이전에 커밋된 오프셋을 기준으로 메시지 처리를 다시 시작할 수 있습니다. 이는 컨슈머가 __consumer_offsets 토픽에 저장된 마지막 오프셋 정보를 기반으로 메시지 처리를 이어나가도록 도와주며, 데이터 손실 없이 안정적인 메시지 처리가 가능하도록 보장합니다.

  • 오프셋 초기화(Offset Reset): 장애 복구 시, 오프셋을 특정 위치로 초기화할 수도 있습니다. 이를 통해 **최신 오프셋(earliest)**부터 읽어 재처리하거나, **가장 마지막 오프셋(latest)**에서 이어서 처리할 수 있습니다.

6. 예시: 오프셋 관리를 통한 메시지 처리 과정

Kafka의 오프셋 관리 과정을 요약한 예시는 다음과 같습니다:

  1. 컨슈머가 특정 메시지의 오프셋 위치를 읽음.
  2. 메시지를 처리한 후 오프셋을 커밋.
  3. 만약 중간에 컨슈머 장애가 발생한 경우, 마지막으로 커밋된 오프셋 이후부터 다시 메시지를 읽음.
  4. 수동 커밋 방식에서는, 컨슈머가 메시지 처리가 확정되었을 때만 오프셋을 커밋하여 중복을 방지함.

결론

Kafka의 오프셋 관리 메커니즘은 각 컨슈머가 어디까지 읽었는지를 기록하여 메시지의 정확성과 일관성을 보장하는 핵심적인 역할을 합니다.