Apache Kafka의 오프셋 offset
Apache Kafka의 오프셋 offset
- Kafka에서 오프셋 관리는 각 컨슈머가 어디까지 메시지를 읽었는지를 추적하여 메시지 처리가 중복되거나 누락되지 않도록 하는 중요한 메커니즘입니다.
- 이를 통해 Kafka는 높은 데이터 일관성을 유지하면서도 분산 환경에서 효율적으로 메시지를 관리할 수 있습니다.
1. 오프셋(Offset)의 개념
오프셋은 Kafka의 각 파티션에서 특정 메시지의 위치를 나타내는 숫자로, 각 메시지에는 파티션 내에서 고유한 오프셋이 할당됩니다. 예를 들어, 파티션 A에서 세 번째 메시지의 오프셋은 2(0부터 시작)이며, 이 오프셋을 기반으로 Kafka는 컨슈머가 메시지를 읽은 위치를 추적할 수 있습니다.
2. 오프셋 관리 방식
Kafka에서는 오프셋을 저장하고 관리하는 방식이 컨슈머 그룹의 상태와 메시지 재처리 요구에 따라 다르게 운영됩니다. Kafka의 오프셋 관리 방식은 다음과 같습니다:
자동 커밋과 수동 커밋
- 자동 커밋: Kafka는 기본적으로 오프셋을 자동으로 커밋하도록 설정할 수 있습니다. 이 경우 컨슈머가 주기적으로 (기본값은 5초) 마지막으로 읽은 메시지의 오프셋을 Kafka에 자동으로 저장합니다. 하지만 이 방식은 중간에 실패가 발생해도 커밋이 완료된 오프셋 이후의 메시지는 이미 읽은 것으로 간주되기 때문에 메시지의 일부가 누락될 위험이 있습니다.
- 수동 커밋: 컨슈머는 오프셋을 수동으로 커밋할 수도 있습니다. 이를 통해 오프셋을 메시지 처리 완료 시점에 정확히 커밋하여 중복 처리를 방지하거나, 필요한 경우 메시지를 재처리할 수 있습니다. 주로 중복이 없는 메시지 처리가 필요한 금융 거래 시스템 등에서 유용하게 사용됩니다.
수동 커밋 예시
Kafka에서 오프셋을 수동으로 커밋하는 예시를 Golang으로 구현하려면, 주로 Kafka 클라이언트 라이브러리인 confluent-kafka-go를 사용합니다. 이 라이브러리를 통해 Kafka 컨슈머에서 메시지를 처리한 후 수동으로 오프셋을 커밋할 수 있습니다.
다음은 confluent-kafka-go를 사용하여 Kafka 메시지를 수동 커밋하는 예제입니다.
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false, // 수동 커밋 설정
}
c, err := kafka.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer c.Close()
// 구독할 토픽 설정
topics := []string{"my-topic"}
err = c.SubscribeTopics(topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topics: %s", err)
}
fmt.Println("Consumer started. Waiting for messages...")
for {
// 메시지 폴링
msg, err := c.ReadMessage(10 * time.Second)
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrTimedOut {
fmt.Println("Polling timed out, continuing...")
continue
}
log.Printf("Consumer error: %v (%v)\\n", err, msg)
continue
}
// 메시지 처리
fmt.Printf("Received message: %s\\n", string(msg.Value))
// 메시지 처리 완료 후 오프셋 수동 커밋
_, err = c.CommitMessage(msg)
if err != nil {
log.Printf("Failed to commit offset: %v\\n", err)
} else {
fmt.Printf("Offset committed: %v\\n", msg.TopicPartition)
}
}
}
커밋 저장 위치
Kafka에서 오프셋 정보는 기본적으로 **__consumer_offsets**라는 내부 토픽에 저장됩니다. 이 토픽은 각 컨슈머 그룹과 파티션에 대해 읽은 마지막 오프셋을 기록하여, 컨슈머가 재시작될 때 해당 위치에서 이어서 읽을 수 있도록 합니다.
3. 컨슈머 그룹과 오프셋 관리
Kafka에서는 컨슈머가 하나의 컨슈머 그룹(Consumer Group) 내에 속하게 되며, 각 컨슈머 그룹은 개별적으로 오프셋을 관리합니다. 컨슈머 그룹이 다르면 같은 메시지를 읽더라도 각 그룹마다 별도의 오프셋이 관리되므로, 동일한 메시지를 여러 컨슈머 그룹에서 개별적으로 처리할 수 있습니다.
- 리밸런싱(Rebalancing): 컨슈머 그룹 내에서 컨슈머가 추가되거나 제거될 때 파티션이 재할당되면서 리밸런싱이 발생합니다. 리밸런싱이 발생하면 오프셋 정보가 새로운 컨슈머에게 이전되고, 새로운 컨슈머는 이전 컨슈머의 마지막 커밋된 오프셋부터 메시지를 읽기 시작합니다.
4. 트랜잭션과 정확히 한 번 전송 (Exactly-Once Semantics)
Kafka는 트랜잭션과 연계해 오프셋을 안전하게 관리하여 정확히 한 번 전송(exactly-once semantics, EOS)을 보장합니다. 이 기능은 트랜잭션 내에서 오프셋을 커밋하고, 커밋되지 않은 트랜잭션의 오프셋은 무시하여 정확히 한 번만 메시지를 처리할 수 있게 합니다.
5. 장애 복구와 오프셋 관리
Kafka에서 컨슈머가 장애로 인해 중단되거나 재시작되는 경우, 이전에 커밋된 오프셋을 기준으로 메시지 처리를 다시 시작할 수 있습니다. 이는 컨슈머가 __consumer_offsets 토픽에 저장된 마지막 오프셋 정보를 기반으로 메시지 처리를 이어나가도록 도와주며, 데이터 손실 없이 안정적인 메시지 처리가 가능하도록 보장합니다.
- 오프셋 초기화(Offset Reset): 장애 복구 시, 오프셋을 특정 위치로 초기화할 수도 있습니다. 이를 통해 **최신 오프셋(earliest)**부터 읽어 재처리하거나, **가장 마지막 오프셋(latest)**에서 이어서 처리할 수 있습니다.
6. 예시: 오프셋 관리를 통한 메시지 처리 과정
Kafka의 오프셋 관리 과정을 요약한 예시는 다음과 같습니다:
- 컨슈머가 특정 메시지의 오프셋 위치를 읽음.
- 메시지를 처리한 후 오프셋을 커밋.
- 만약 중간에 컨슈머 장애가 발생한 경우, 마지막으로 커밋된 오프셋 이후부터 다시 메시지를 읽음.
- 수동 커밋 방식에서는, 컨슈머가 메시지 처리가 확정되었을 때만 오프셋을 커밋하여 중복을 방지함.
결론
Kafka의 오프셋 관리 메커니즘은 각 컨슈머가 어디까지 읽었는지를 기록하여 메시지의 정확성과 일관성을 보장하는 핵심적인 역할을 합니다.