Apache Kafka 메시지 중복 처리 관련 EOS
Apache Kafka 메시지 중복 처리 관련 EOS
- Kafka에서는 기본적으로 동일한 메시지를 여러 파티션에서 동시에 처리하는 일이 발생하지 않도록 설계되어 있습니다. 그러나 메시지 중복 처리 가능성은 아래와 같은 상황에서 발생할 수 있습니다.
1. Consumer 그룹 내에서의 중복 처리
- Kafka는 각 파티션을 특정 consumer 그룹의 단일 consumer에게만 할당하여, 동일한 파티션의 메시지를 한 번에 하나의 consumer만 처리하게 합니다. 이 구조 덕분에 동일한 파티션의 메시지가 중복으로 처리되는 일이 없습니다.
- 하지만 동일한 토픽의 여러 파티션을 다른 consumer들이 병렬로 처리할 수는 있습니다. 이 경우 각 파티션이 다른 데이터를 처리하므로 중복 처리는 발생하지 않습니다.
2. Consumer 장애와 재할당
- consumer가 특정 메시지를 읽고 처리 중에 장애가 발생하면 Kafka는 이 consumer의 작업을 같은 consumer 그룹 내의 다른 consumer에게 재할당합니다. 이때 마지막으로 처리된 오프셋(offset)을 기준으로 재시작하므로, 이미 처리한 메시지를 다시 처리할 가능성이 있습니다.
- 이를 방지하려면 consumer는 처리 완료 후 오프셋을 커밋(commit)해야 합니다. 커밋된 오프셋 이후의 메시지만 재할당되므로 중복 처리를 최소화할 수 있습니다.
이 예제에서는 Kafka의 Auto Commit 옵션을 끄고, 수동으로 오프셋을 커밋하는 방식을 사용합니다. 이를 통해 메시지를 처리한 후에만 오프셋을 커밋하여 중복 처리를 방지합니다.
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
)
func main() {
// Kafka Consumer 설정
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "order_consumer_group",
"enable.auto.commit": false, // 자동 커밋 비활성화
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer consumer.Close()
topic := "order_processing"
err = consumer.SubscribeTopics([]string{topic}, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topic: %s", err)
}
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
// 메시지 처리
fmt.Printf("Processing order: %s\\n", string(msg.Value))
// 메시지 처리 완료 후 오프셋 커밋
_, err := consumer.CommitMessage(msg)
if err != nil {
log.Printf("Failed to commit offset: %s", err)
} else {
fmt.Printf("Offset committed for message at offset %d\\n", msg.TopicPartition.Offset)
}
} else {
// 에러 발생 시
fmt.Printf("Consumer error: %v (%v)\\n", err, msg)
}
}
}
CommitMessage 실패 시 문제점 및 대처 방안
- 중복 처리 위험
오프셋이 커밋되지 않으면 해당 메시지를 다시 읽게 되므로 중복 처리가 발생할 수 있습니다. 이 경우 애플리케이션이 동일한 메시지를 여러 번 처리할 수 있으며, 이는 데이터 일관성 문제를 일으킬 수 있습니다. - 트랜잭션 및 멱등성 처리
중복 처리가 시스템에 부정적인 영향을 미친다면, 중복 처리를 허용하지 않는 멱등(idempotent) 처리를 고려해야 합니다. 예를 들어, 데이터베이스에 같은 메시지가 여러 번 저장되지 않도록 order_id와 같은 고유 식별자를 사용하여 메시지 중복을 방지할 수 있습니다. - 재시도 로직 추가
CommitMessage가 실패했을 때 재시도 로직을 추가할 수 있습니다. 여러 번 재시도한 후에도 실패하면 해당 오프셋을 기록해 두고 다음 번 처리할 때 커밋을 다시 시도할 수 있습니다.
func commitOffsetWithRetry(consumer *kafka.Consumer, msg *kafka.Message, maxRetries int) error {
for attempt := 1; attempt <= maxRetries; attempt++ {
_, err := consumer.CommitMessage(msg)
if err == nil {
fmt.Printf("Offset committed for message at offset %d\\n", msg.TopicPartition.Offset)
return nil // 성공적으로 커밋됨
}
log.Printf("Failed to commit offset (attempt %d/%d): %s", attempt, maxRetries, err)
time.Sleep(1 * time.Second) // 재시도 전 대기 시간
}
return fmt.Errorf("failed to commit offset after %d attempts", maxRetries)
}
4. 오프셋 주기적 커밋
커밋이 실패하는 경우를 대비해 일정한 주기로 배치(batch) 단위로 커밋하거나, 특정 간격마다 커밋하는 방법도 유용합니다. 이 방법은 성능을 높이면서도 커밋 누락을 줄이는 데 도움이 됩니다.
5. 오프셋 로그 기록
오프셋 커밋 실패가 발생할 경우 로그에 남겨 두고, 시스템을 모니터링하면서 필요한 경우 수동 조치를 할 수 있도록 합니다.
3. 일관성 확보를 위한 리트라이 시도
- 메시지를 소비하고 처리하는 동안 오류가 발생하여 다시 처리(retry)를 해야 하는 경우, 메시지가 중복으로 처리될 가능성이 있습니다. 예를 들어, 네트워크 문제나 소비자 장애가 발생했을 때 메시지를 재전송하거나 재처리하면 중복이 발생할 수 있습니다.
- 이를 해결하기 위해 Idempotent Producer 또는 Exactly Once Semantics (EOS)를 설정할 수 있습니다. Kafka는 EOS 설정을 통해 데이터 중복을 방지하고, 중복 없이 한 번만 처리하도록 보장할 수 있습니다.
Kafka의 **Exactly Once Semantics (EOS)**는 메시지를 중복 없이 정확히 한 번만 처리하도록 보장하는 기능입니다. 이 기능은 특히 금융 거래나 주문 처리 시스템 등에서 데이터 중복이 허용되지 않는 환경에서 유용합니다. EOS는 Kafka 내에서 데이터를 소비하는 애플리케이션이 메시지를 한 번만 처리하고 중복이 없도록 보장해 줍니다.
EOS의 필요성
기본적으로 Kafka는 At-Least-Once 또는 At-Most-Once의 메시지 전송을 보장합니다.
- At-Least-Once: 메시지가 한 번 이상 처리될 수 있어 중복이 발생할 가능성이 있습니다.
- At-Most-Once: 메시지가 한 번만 처리되지만, 실패할 경우 일부 메시지가 손실될 수 있습니다.
중복 없이 정확히 한 번만 메시지를 처리해야 하는 경우 EOS가 필요합니다.
EOS 구현 원리
Kafka는 EOS를 지원하기 위해 다음과 같은 기능을 제공합니다.
- Idempotent Producer (멱등 프로듀서)
Kafka의 프로듀서가 동일한 메시지를 중복으로 전송해도 브로커가 중복 메시지를 필터링합니다. 이를 위해 Producer ID(PID)와 시퀀스 번호를 사용해 프로듀서가 동일한 메시지를 여러 번 전송하더라도 브로커가 중복된 메시지를 감지하고 하나만 처리합니다. 이로써 프로듀서가 정확히 한 번만 데이터를 전송할 수 있도록 보장합니다. - Transaction (트랜잭션)
Kafka는 트랜잭션 기능을 통해 프로듀서가 여러 메시지를 전송할 때, 이들이 원자적으로 처리되도록 합니다. 트랜잭션이 완료되지 않으면 모든 메시지가 무효화되고, 반대로 완료되면 모두 커밋되어 처리됩니다. 따라서 여러 파티션에 메시지를 전송할 때도 일관성을 보장하며, 한 트랜잭션 안에서 모든 파티션에 대한 메시지를 한 번에 커밋합니다. - Transactional Consumer (트랜잭션 소비자)
트랜잭션을 사용하는 소비자에서는 커밋되지 않은 메시지를 읽지 않도록 설정할 수 있습니다. 이는 Read-committed 모드로, 트랜잭션이 완료되지 않은 메시지는 무시하고 커밋된 메시지만 읽어들입니다. 따라서 트랜잭션이 완료되지 않았을 때는 그 메시지를 소비하지 않으며, 중복 없이 한 번만 메시지를 처리하게 됩니다. - End-to-End Exactly Once Processing
Kafka Streams와 같은 데이터 처리 라이브러리를 사용하면 소비자와 프로듀서가 End-to-End Exactly Once 보장을 제공할 수 있습니다. Kafka Streams는 트랜잭션과 멱등성을 사용하여 소스부터 최종 소비까지 중복 처리 없이 메시지가 한 번만 처리되도록 지원합니다.
EOS 활성화 방법
Kafka의 EOS는 프로듀서와 소비자 설정에서 활성화할 수 있습니다.
- 프로듀서 설정:
producerConfig := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"enable.idempotence": true, // 멱등성 활성화
"transactional.id": "order_processing_txn", // 트랜잭션 ID 설정
}
- 소비자 설정:
consumerConfig := &kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "isolation.level": "read_committed", // 커밋된 메시지만 읽기 }
- 장점과 제한 사항
- 장점: EOS를 통해 중복 없이 데이터를 정확히 한 번만 처리할 수 있어 데이터의 신뢰성과 일관성을 높입니다. 특히 금융, 결제, 주문 처리 등 중복 처리가 치명적인 시스템에서 유용합니다.
- 제한 사항: EOS는 성능에 영향을 줄 수 있습니다. 멱등성과 트랜잭션 기능을 유지하는 데 추가적인 리소스가 필요하고, 처리 지연이 발생할 수 있습니다. 설정과 관리의 복잡성이 증가하기 때문에, 트랜잭션이 필요한 경우에만 EOS를 사용하는 것이 좋습니다.
4. 프로듀서에서의 중복 전송
- 프로듀서가 메시지를 전송할 때 실패한 것으로 인식되어 재전송하면 중복 메시지가 생성될 수 있습니다. 이를 피하기 위해 Idempotent Producer 설정을 통해 프로듀서가 동일한 메시지를 한 번만 전송하도록 설정할 수 있습니다.
- 프로듀서가 메시지를 중복 없이 한 번만 전송하도록 Producer ID(PID)와 시퀀스 번호를 수동으로 입력할 필요는 없습니다. Kafka가 자동으로 PID와 시퀀스 번호를 관리하여 중복 전송을 방지합니다.
결론
따라서 Kafka는 기본적인 파티셔닝 구조를 통해 중복 처리를 방지하지만, 장애 복구와 재전송 전략에 따라 중복 가능성이 있습니다. 정확히 한 번만 처리를 보장하려면 Exactly Once Semantics를 고려하는 것이 좋습니다.