Apache Kafka Partitioning
- Apache Kafka의 파티셔닝(partitioning)은 데이터를 분산 처리하고 시스템의 확장성을 높이기 위한 핵심 개념입니다.
- Kafka의 각 토픽(topic)은 파티션(partition)이라는 여러 개의 부분으로 나누어질 수 있으며, 각 파티션은 독립적인 로그(log)를 형성합니다. 이 구조 덕분에 Kafka는 대량의 데이터와 높은 처리량을 지원할 수 있습니다.
주요 개념과 작동 방식
- 파티션의 역할
- 각 파티션은 독립적인 순서대로 메시지를 저장하는 단일 로그 파일로 작동하며, 이 파일에 메시지가 순차적으로 추가됩니다. 파티션 덕분에 Kafka는 데이터의 병렬 처리와 확장성을 확보할 수 있습니다. 이를 통해 토픽에 메시지를 쓰거나 읽는 작업이 여러 파티션에서 동시에 수행될 수 있습니다.
- 파티션 키와 데이터 분배
- 프로듀서(Producer)가 메시지를 전송할 때 파티션 키(partition key)를 지정할 수 있습니다. 이 키를 기반으로 Kafka는 특정 파티션에 데이터를 할당하여 데이터를 효율적으로 분배합니다. 키를 지정하지 않는 경우, 기본 라운드 로빈 방식이 적용되어 파티션이 무작위로 선택됩니다.
- 파티셔닝의 장점
- 병렬 처리: 여러 파티션에 데이터를 나눠서 저장하기 때문에 다양한 소비자(Consumer)가 병렬로 데이터를 처리할 수 있습니다.
- 확장성: 새로운 파티션을 추가함으로써 처리량이 필요한 경우 쉽게 확장할 수 있습니다.
- 장애 복구: 파티션을 리더-팔로워 구조로 구성하여 고가용성을 지원하며, 리더가 장애가 발생할 경우 팔로워가 리더로 승격되어 서비스가 지속됩니다.
- 파티션과 오프셋
- 각 파티션의 메시지는 고유의 오프셋(offset)을 가지며, 소비자는 오프셋을 사용해 자신이 읽고 있는 위치를 추적합니다. 이를 통해 다양한 소비자 그룹이 서로 다른 위치에서 데이터를 독립적으로 읽을 수 있습니다.
파티셔닝 전략
- 키 기반 파티셔닝: 파티션 키를 사용해 특정 파티션으로 데이터를 일관되게 전송합니다. 예를 들어, 사용자 ID 등을 기반으로 파티션을 나누면 동일한 사용자 ID는 동일 파티션으로 전송되어 데이터 지역성을 유지합니다.
- 라운드 로빈 파티셔닝: 파티션 키 없이 무작위 파티션을 선택해 데이터의 균형을 맞춥니다.
파티셔닝 예제
Apache Kafka에서 파티션을 효율적으로 사용하는 방법과 그 예제를 살펴보겠습니다. Kafka 파티셔닝은 데이터를 분산하여 처리량을 높이고, 특정 데이터 그룹에 일관성을 유지할 수 있도록 합니다.
예제 1: 사용자 ID를 기준으로 한 파티셔닝 (User ID 기반 데이터 처리)
상황)
- 사용자가 많은 대규모 소셜 미디어 애플리케이션에서 유저별로 활동 로그를 Kafka로 전송하는 시스템을 운영한다고 가정합니다. 사용자가 작성하는 댓글, 좋아요, 팔로우 같은 이벤트를 Kafka 토픽에 저장하고 분석합니다.
목표)
- 동일한 사용자 ID를 가진 메시지가 항상 동일한 파티션으로 들어가도록 하여 데이터 일관성을 유지하면서 병렬 처리를 통해 시스템의 성능을 극대화합니다.
파티셔닝 전략)
- 사용자 ID(user_id)를 키로 사용하여 파티션을 설정합니다.
- Kafka는 user_id를 해시한 값을 기반으로 특정 파티션에 데이터를 할당합니다.
효율적인 점)
- 동일 사용자에 대한 이벤트가 한 파티션에 모이기 때문에 순서 보장이 가능합니다.
- 여러 소비자가 동시에 각기 다른 파티션을 처리하므로 병렬 처리 성능이 향상됩니다.
- 분석이나 이벤트 흐름 처리 시 한 사용자의 모든 데이터를 쉽게 추적할 수 있습니다.
// Kafka Producer 예제 (Golang)
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"strconv"
)
func main() {
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
topic := "user_activity"
for i := 0; i < 100; i++ {
userID := "user" + strconv.Itoa(i%10) // 10명의 사용자로 가정
message := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(userID), // user_id를 파티션 키로 설정
Value: []byte("User activity log data"),
}
err := producer.Produce(&message, nil)
if err != nil {
log.Printf("Failed to produce message: %s", err)
}
}
}
예제 2: 지역별 로그 데이터 처리 (Region-based Data Processing)
상황)
다국적 기업이 각 국가의 지사에서 발생하는 로그 데이터를 Kafka에 저장하여 지역별 분석과 모니터링을 수행합니다.
목표)
국가 또는 지역을 기준으로 파티셔닝하여 지역별 데이터를 독립적으로 처리하고 분석할 수 있게 합니다.
파티셔닝 전략)
- 국가 코드(region_code)를 키로 사용하여 파티션을 설정합니다.
- 예를 들어 US, KR, JP 등 국가 코드를 기준으로 파티션에 데이터를 분산시킵니다.
효율적인 점)
- 국가별로 데이터가 할당되므로 특정 국가의 데이터를 효율적으로 분석하고 모니터링할 수 있습니다.
- 각 지역의 소비자가 병렬로 데이터를 처리할 수 있으므로 글로벌 데이터를 실시간으로 관리할 수 있습니다.
// Kafka Producer 예제 (Golang)
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"math/rand"
)
func main() {
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
topic := "region_logs"
regions := []string{"US", "KR", "JP", "IN"}
for i := 0; i < 100; i++ {
regionCode := regions[rand.Intn(len(regions))] // 무작위 지역 코드 선택
message := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(regionCode), // region_code를 파티션 키로 설정
Value: []byte("Log data for " + regionCode),
}
err := producer.Produce(&message, nil)
if err != nil {
log.Printf("Failed to produce message: %s", err)
}
}
}
예제 3: 주문 처리 시스템에서의 파티셔닝 (Order Processing System)
상황)
전자 상거래 애플리케이션에서 주문 데이터를 Kafka를 통해 실시간으로 처리하고 있습니다. 각 주문의 order_id를 기준으로 파티션을 분배합니다.
목표)
주문 번호를 기준으로 동일한 주문이 항상 동일한 파티션에 들어가도록 하여, 주문 데이터의 일관성을 유지하고 병렬 처리 성능을 높입니다.
파티셔닝 전략)
- order_id를 키로 사용하여 파티션을 설정합니다.
- 특정 주문의 모든 단계가 동일 파티션으로 들어가게 하여 순서를 보장합니다.
효율적인 점)
- 병렬로 주문을 처리할 수 있어 대량의 주문 데이터를 빠르게 처리할 수 있습니다.
- 주문별 데이터를 추적하기 쉬워 트랜잭션 일관성 유지에 용이합니다.
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"strconv"
)
func main() {
// Kafka Producer 설정
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
topic := "order_processing" // 주문 처리 토픽
for i := 0; i < 100; i++ {
// 주문 ID 설정 (예: order0, order1, ...)
orderID := "order" + strconv.Itoa(i)
// 주문 데이터 메시지 생성
message := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(orderID), // order_id를 파티션 키로 설정하여 특정 파티션에 할당
Value: []byte("Order details for " + orderID),
}
// Kafka로 메시지 전송
err := producer.Produce(&message, nil)
if err != nil {
log.Printf("Failed to produce message: %s", err)
} else {
log.Printf("Produced message for order ID: %s", orderID)
}
}
// Flush는 메시지가 브로커로 전송될 때까지 대기
producer.Flush(15 * 1000)
}
결론
Kafka 파티셔닝은 대규모 실시간 데이터 처리에 최적화된 아키텍처를 제공하여 높은 처리량을 달성할 수 있게 해줍니다.
'Apache Kafka' 카테고리의 다른 글
Apache Kafka의 오프셋 offset (0) | 2024.10.28 |
---|---|
Apache Kafka Helm 배포 (0) | 2024.10.27 |
Apache Kafka 메시지 중복 처리 관련 EOS (0) | 2024.10.27 |
Apache Kafka 과금 처리 설계 (0) | 2024.10.27 |