Apache Kafka

Apache Kafka Partitioning

김 정출 2024. 10. 27. 13:36

Apache Kafka Partitioning

  • Apache Kafka의 파티셔닝(partitioning)은 데이터를 분산 처리하고 시스템의 확장성을 높이기 위한 핵심 개념입니다.
  • Kafka의 각 토픽(topic)은 파티션(partition)이라는 여러 개의 부분으로 나누어질 수 있으며, 각 파티션은 독립적인 로그(log)를 형성합니다. 이 구조 덕분에 Kafka는 대량의 데이터와 높은 처리량을 지원할 수 있습니다.

주요 개념과 작동 방식

  1. 파티션의 역할
    • 각 파티션은 독립적인 순서대로 메시지를 저장하는 단일 로그 파일로 작동하며, 이 파일에 메시지가 순차적으로 추가됩니다. 파티션 덕분에 Kafka는 데이터의 병렬 처리와 확장성을 확보할 수 있습니다. 이를 통해 토픽에 메시지를 쓰거나 읽는 작업이 여러 파티션에서 동시에 수행될 수 있습니다.
  2. 파티션 키와 데이터 분배
    • 프로듀서(Producer)가 메시지를 전송할 때 파티션 키(partition key)를 지정할 수 있습니다. 이 키를 기반으로 Kafka는 특정 파티션에 데이터를 할당하여 데이터를 효율적으로 분배합니다. 키를 지정하지 않는 경우, 기본 라운드 로빈 방식이 적용되어 파티션이 무작위로 선택됩니다.
  3. 파티셔닝의 장점
    • 병렬 처리: 여러 파티션에 데이터를 나눠서 저장하기 때문에 다양한 소비자(Consumer)가 병렬로 데이터를 처리할 수 있습니다.
    • 확장성: 새로운 파티션을 추가함으로써 처리량이 필요한 경우 쉽게 확장할 수 있습니다.
    • 장애 복구: 파티션을 리더-팔로워 구조로 구성하여 고가용성을 지원하며, 리더가 장애가 발생할 경우 팔로워가 리더로 승격되어 서비스가 지속됩니다.
  4. 파티션과 오프셋
    • 각 파티션의 메시지는 고유의 오프셋(offset)을 가지며, 소비자는 오프셋을 사용해 자신이 읽고 있는 위치를 추적합니다. 이를 통해 다양한 소비자 그룹이 서로 다른 위치에서 데이터를 독립적으로 읽을 수 있습니다.

파티셔닝 전략

  • 키 기반 파티셔닝: 파티션 키를 사용해 특정 파티션으로 데이터를 일관되게 전송합니다. 예를 들어, 사용자 ID 등을 기반으로 파티션을 나누면 동일한 사용자 ID는 동일 파티션으로 전송되어 데이터 지역성을 유지합니다.
  • 라운드 로빈 파티셔닝: 파티션 키 없이 무작위 파티션을 선택해 데이터의 균형을 맞춥니다.

파티셔닝 예제

Apache Kafka에서 파티션을 효율적으로 사용하는 방법과 그 예제를 살펴보겠습니다. Kafka 파티셔닝은 데이터를 분산하여 처리량을 높이고, 특정 데이터 그룹에 일관성을 유지할 수 있도록 합니다.

 

예제 1: 사용자 ID를 기준으로 한 파티셔닝 (User ID 기반 데이터 처리)

상황)

  • 사용자가 많은 대규모 소셜 미디어 애플리케이션에서 유저별로 활동 로그를 Kafka로 전송하는 시스템을 운영한다고 가정합니다. 사용자가 작성하는 댓글, 좋아요, 팔로우 같은 이벤트를 Kafka 토픽에 저장하고 분석합니다.

목표)

  • 동일한 사용자 ID를 가진 메시지가 항상 동일한 파티션으로 들어가도록 하여 데이터 일관성을 유지하면서 병렬 처리를 통해 시스템의 성능을 극대화합니다.

파티셔닝 전략)

  1. 사용자 ID(user_id)를 키로 사용하여 파티션을 설정합니다.
  2. Kafka는 user_id를 해시한 값을 기반으로 특정 파티션에 데이터를 할당합니다.

효율적인 점)

  • 동일 사용자에 대한 이벤트가 한 파티션에 모이기 때문에 순서 보장이 가능합니다.
  • 여러 소비자가 동시에 각기 다른 파티션을 처리하므로 병렬 처리 성능이 향상됩니다.
  • 분석이나 이벤트 흐름 처리 시 한 사용자의 모든 데이터를 쉽게 추적할 수 있습니다.
// Kafka Producer 예제 (Golang)
package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
    "strconv"
)

func main() {
    producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()

    topic := "user_activity"
    for i := 0; i < 100; i++ {
        userID := "user" + strconv.Itoa(i%10) // 10명의 사용자로 가정
        message := kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Key:            []byte(userID), // user_id를 파티션 키로 설정
            Value:          []byte("User activity log data"),
        }
        err := producer.Produce(&message, nil)
        if err != nil {
            log.Printf("Failed to produce message: %s", err)
        }
    }
}

 

 

예제 2: 지역별 로그 데이터 처리 (Region-based Data Processing)

상황)

다국적 기업이 각 국가의 지사에서 발생하는 로그 데이터를 Kafka에 저장하여 지역별 분석과 모니터링을 수행합니다.

목표)

국가 또는 지역을 기준으로 파티셔닝하여 지역별 데이터를 독립적으로 처리하고 분석할 수 있게 합니다.

파티셔닝 전략)

  1. 국가 코드(region_code)를 키로 사용하여 파티션을 설정합니다.
  2. 예를 들어 US, KR, JP 등 국가 코드를 기준으로 파티션에 데이터를 분산시킵니다.

효율적인 점)

  • 국가별로 데이터가 할당되므로 특정 국가의 데이터를 효율적으로 분석하고 모니터링할 수 있습니다.
  • 각 지역의 소비자가 병렬로 데이터를 처리할 수 있으므로 글로벌 데이터를 실시간으로 관리할 수 있습니다.
// Kafka Producer 예제 (Golang)
package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
    "math/rand"
)

func main() {
    producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()

    topic := "region_logs"
    regions := []string{"US", "KR", "JP", "IN"}
    for i := 0; i < 100; i++ {
        regionCode := regions[rand.Intn(len(regions))] // 무작위 지역 코드 선택
        message := kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Key:            []byte(regionCode), // region_code를 파티션 키로 설정
            Value:          []byte("Log data for " + regionCode),
        }
        err := producer.Produce(&message, nil)
        if err != nil {
            log.Printf("Failed to produce message: %s", err)
        }
    }
}

 

 

예제 3: 주문 처리 시스템에서의 파티셔닝 (Order Processing System)

상황)

전자 상거래 애플리케이션에서 주문 데이터를 Kafka를 통해 실시간으로 처리하고 있습니다. 각 주문의 order_id를 기준으로 파티션을 분배합니다.

목표)

주문 번호를 기준으로 동일한 주문이 항상 동일한 파티션에 들어가도록 하여, 주문 데이터의 일관성을 유지하고 병렬 처리 성능을 높입니다.

파티셔닝 전략)

  1. order_id를 키로 사용하여 파티션을 설정합니다.
  2. 특정 주문의 모든 단계가 동일 파티션으로 들어가게 하여 순서를 보장합니다.

효율적인 점)

  • 병렬로 주문을 처리할 수 있어 대량의 주문 데이터를 빠르게 처리할 수 있습니다.
  • 주문별 데이터를 추적하기 쉬워 트랜잭션 일관성 유지에 용이합니다.
package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
    "strconv"
)

func main() {
    // Kafka Producer 설정
    producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()

    topic := "order_processing" // 주문 처리 토픽
    for i := 0; i < 100; i++ {
        // 주문 ID 설정 (예: order0, order1, ...)
        orderID := "order" + strconv.Itoa(i)

        // 주문 데이터 메시지 생성
        message := kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Key:            []byte(orderID), // order_id를 파티션 키로 설정하여 특정 파티션에 할당
            Value:          []byte("Order details for " + orderID),
        }

        // Kafka로 메시지 전송
        err := producer.Produce(&message, nil)
        if err != nil {
            log.Printf("Failed to produce message: %s", err)
        } else {
            log.Printf("Produced message for order ID: %s", orderID)
        }
    }

    // Flush는 메시지가 브로커로 전송될 때까지 대기
    producer.Flush(15 * 1000)
}

결론

Kafka 파티셔닝은 대규모 실시간 데이터 처리에 최적화된 아키텍처를 제공하여 높은 처리량을 달성할 수 있게 해줍니다.