Apache Kafka

Apache Kafka 과금 처리 설계

김 정출 2024. 10. 27. 13:25

Apache Kafka 과금 처리 설계

  • Apache Kafka를 사용해 과금 정보를 처리하는 시스템을 설계할 때는 메시지의 순서, 내구성, 오류 처리, 스케일링 등을 고려한 아키텍처가 필요합니다.
  • 여기서 과금 정보는 실시간으로 들어오는 데이터일 수 있으며, 예를 들어 음성 인식이나 음성 합성 같은 서비스의 과금 방식(초당 또는 문자당 과금)에 맞춰야 합니다. 아래는 Kafka를 활용한 과금 처리 시스템 설계에 대한 주요 요소들입니다.

설계

1. Topic 설계

  • 과금 유형에 따른 Topic 분리: 서비스별로 과금이 다르다면, 각 서비스에 대해 다른 Topic을 생성합니다. 예를 들어, billing_speech_recognition, billing_speech_synthesis 등의 Topic을 두고, 각 Topic은 해당 서비스의 과금 정보를 전달합니다.
  • 파티셔닝 전략: 과금 데이터의 처리 속도와 병렬 처리 요구 사항에 따라 파티션을 설정합니다. 파티션 키로는 사용자 ID 또는 세션 ID 등을 사용하여 특정 사용자나 세션의 메시지가 같은 파티션에 들어가도록 하면 순서를 보장할 수 있습니다.

 

 

2. 메시지 구조

  • 표준화된 메시지 스키마: 과금 데이터를 표준화하기 위해 Avro, Protobuf 등의 스키마를 사용하여 메시지 형식을 정의합니다. 메시지에는 사용자 ID, 과금 대상(예: 인식된 문자 수, 처리 시간), 과금 금액 등의 필드를 포함할 수 있습니다.
  • 스키마 레지스트리: 메시지 스키마의 버전을 관리하고 변경에 대응하기 위해 스키마 레지스트리를 사용하는 것이 좋습니다. 이를 통해 소비자가 쉽게 스키마 변경에 적응할 수 있습니다.

 

 

3. Producer 설계

  • 과금 데이터 수집 및 전송: Producer는 서비스에서 발생하는 과금 정보를 실시간으로 Kafka로 전송합니다. 과금 데이터를 수집하는 로직에서 실패 시 재전송 메커니즘을 구현하여 데이터 손실을 방지합니다.
  • 재전송 로직: 메시지 전송이 실패하면 일정 횟수까지 재시도하고, 계속 실패 시에는 별도의 로그 또는 DLQ(Dead Letter Queue)에 저장해 나중에 재처리할 수 있도록 합니다.

 

 

4. Consumer 설계

  • 실시간 과금 계산: Consumer는 각 메시지를 처리하여 사용량과 과금 정보를 계산합니다. 이러한 계산을 통해 사용자의 과금 총액을 업데이트하고, 일정 시간마다 DB에 저장하여 최종 과금 정보를 유지합니다.
  • 동시성 처리: 여러 Consumer 인스턴스를 배포하여 높은 처리량을 지원하고, 파티션을 기반으로 병렬 처리를 최적화합니다.
  • 에러 처리: Consumer에서 처리 중 오류가 발생할 경우, 해당 메시지를 DLQ에 저장하고 이후에 재처리하는 메커니즘을 구현합니다.

 

 

5. 데이터 보관 및 아카이빙

  • 장기 보관: 과금 기록이 필요하다면 Kafka의 Retention 정책을 설정하거나, 별도의 스토리지(예: S3, HDFS)에 저장합니다. 이를 통해 장기적으로 과금 데이터를 관리할 수 있습니다.
  • 지속적인 저장소로 전송: 과금 정보를 Kafka에서 처리한 후 최종적으로 데이터베이스에 저장하거나, 배치 작업으로 주기적인 집계 및 보고서를 생성하는 방식으로 데이터를 관리할 수 있습니다.

 

 

6. 모니터링 및 알림

  • 모니터링: Kafka의 Topic, Consumer Lag, 메시지 처리율 등을 모니터링하여 이상 발생 시 알림을 받을 수 있도록 설정합니다. Prometheus와 Grafana 같은 도구를 사용하여 실시간 모니터링을 구축할 수 있습니다.
  • 이상 감지: 특정 사용자나 세션에서 비정상적으로 높은 과금 데이터가 수집될 경우 알림을 통해 관리자가 확인할 수 있도록 합니다.

구현

위를 기반으로 Apache Kafka와 Golang을 사용하여 과금 정보 처리 시스템을 설계하고 구현해보겠습니다. 이 예제에서는 Protobuf를 사용해 메시지 구조를 정의하며, DLQ(Dead Letter Queue)도 설계에 포함합니다.

 

1. 시스템 설계

  1. 토픽 및 파티션 구성
    • billing_speech_recognition: 음성 인식 과금 정보를 전송하는 Topic.
    • billing_speech_synthesis: 음성 합성 과금 정보를 전송하는 Topic.
    • billing_dlq: Producer 또는 Consumer에서 처리 실패한 메시지를 저장하는 DLQ Topic.
  2. 메시지 구조 (Protobuf 정의)
    • BillingMessage에는 사용자 ID, 서비스 유형, 과금 데이터(예: 사용 시간, 문자 수 등), 발생 시각 등을 포함합니다.
  3. Producer 설계
    • 서비스에서 발생하는 과금 정보를 Kafka Topic에 전송하는 Producer를 구현합니다.
    • 메시지 전송 실패 시 재시도하고, 설정된 재시도 횟수를 초과하면 DLQ에 전송합니다.
  4. Consumer 설계
    • 과금 데이터를 수신하여 필요한 계산을 수행하고, 최종 데이터를 저장합니다.
    • Consumer에서 메시지 처리 오류가 발생하면 DLQ에 메시지를 저장합니다.

 

 

2. Protobuf 메시지 정의

billing.proto 파일을 만들어 Protobuf 메시지 스키마를 정의합니다.

syntax = "proto3";

package billing;

message BillingMessage {
  string user_id = 1;
  string service_type = 2;  // "speech_recognition" or "speech_synthesis"
  int64 usage = 3;          // 사용량 (초 또는 문자 수)
  int64 timestamp = 4;      // 발생 시각 (Unix timestamp)
}

Protobuf 코드 생성

protoc --go_out=. billing.proto

 

 

3.Producer 코드

import (
	"context"
	"fmt"
	"time"

	"github.com/Shopify/sarama"
	"google.golang.org/protobuf/proto"
	"myapp/billingpb" // Protobuf generated Go package
)

func sendMessage(producer sarama.SyncProducer, topic string, message *billingpb.BillingMessage) error {
	data, err := proto.Marshal(message)
	if err != nil {
		return fmt.Errorf("failed to marshal message: %w", err)
	}

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(data),
	}

	for retries := 0; retries < 3; retries++ { // 최대 3번 재시도
		_, _, err = producer.SendMessage(msg)
		if err == nil {
			fmt.Println("Message sent successfully")
			return nil
		}
		fmt.Printf("Retrying to send message due to error: %v\\n", err)
		time.Sleep(2 * time.Second)
	}

	// 재시도 실패 시 DLQ로 전송
	dlqMsg := &sarama.ProducerMessage{
		Topic: "billing_dlq",
		Value: sarama.ByteEncoder(data),
	}
	_, _, dlqErr := producer.SendMessage(dlqMsg)
	if dlqErr != nil {
		return fmt.Errorf("failed to send message to DLQ: %w", dlqErr)
	}

	return fmt.Errorf("message sent to DLQ after retries")
}

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		panic(fmt.Sprintf("failed to create producer: %v", err))
	}
	defer producer.Close()

	message := &billingpb.BillingMessage{
		UserId:      "user123",
		ServiceType: "speech_recognition",
		Usage:       300, // 사용량 예시 (초)
		Timestamp:   time.Now().Unix(),
	}

	err = sendMessage(producer, "billing_speech_recognition", message)
	if err != nil {
		fmt.Printf("Error sending message: %v\\n", err)
	}
}

 

 

3.2 Consumer 코드

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/Shopify/sarama"
	"google.golang.org/protobuf/proto"
	"myapp/billingpb"
)

func processMessage(msg *billingpb.BillingMessage) error {
	// 메시지 처리 로직 (예: 과금 계산)
	fmt.Printf("Processing message for user %s: %+v\\n", msg.UserId, msg)
	// 오류 발생 시 예제용으로 임의로 오류 발생
	if msg.UserId == "user123" {
		return fmt.Errorf("simulated processing error")
	}
	return nil
}

func consumeMessages(consumer sarama.Consumer, topic string) {
	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to start consumer for partition 0: %v", err)
	}
	defer partitionConsumer.Close()

	for msg := range partitionConsumer.Messages() {
		var billingMsg billingpb.BillingMessage
		if err := proto.Unmarshal(msg.Value, &billingMsg); err != nil {
			log.Printf("Failed to unmarshal message: %v", err)
			continue
		}

		if err := processMessage(&billingMsg); err != nil {
			// 처리 실패 시 DLQ로 전송
			dlqProducer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
			defer dlqProducer.Close()

			dlqMsg := &sarama.ProducerMessage{
				Topic: "billing_dlq",
				Value: sarama.ByteEncoder(msg.Value),
			}
			_, _, dlqErr := dlqProducer.SendMessage(dlqMsg)
			if dlqErr != nil {
				log.Printf("Failed to send message to DLQ: %v", dlqErr)
			}
			log.Printf("Message sent to DLQ due to error: %v", err)
		}
	}
}

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	go consumeMessages(consumer, "billing_speech_recognition")
	select {}
}

 

 

4. DLQ 처리 설계

  • DLQ에 저장된 메시지는 별도의 Consumer에서 주기적으로 확인하며 재처리할 수 있습니다.
  • 일정 횟수 이상 실패한 메시지는 로그로 남기거나 관리자에게 알림을 보내는 방식으로 처리할 수 있습니다.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/Shopify/sarama"
	"google.golang.org/protobuf/proto"
	"myapp/billingpb" // Protobuf generated Go package
)

// 처리 실패한 메시지를 파일에 JSON 형식으로 저장
func saveFailedMessage(message *billingpb.BillingMessage) error {
	file, err := os.OpenFile("failed_billing_records.json", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return fmt.Errorf("failed to open file: %w", err)
	}
	defer file.Close()

	msgJSON, err := json.Marshal(message)
	if err != nil {
		return fmt.Errorf("failed to marshal message to JSON: %w", err)
	}

	_, err = file.WriteString(string(msgJSON) + "\\n")
	if err != nil {
		return fmt.Errorf("failed to write message to file: %w", err)
	}

	log.Printf("Failed message saved to file: %v", message)
	return nil
}

// Kafka DLQ Consumer
func consumeDLQMessages(consumer sarama.Consumer, topic string) {
	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to start consumer for partition 0: %v", err)
	}
	defer partitionConsumer.Close()

	for msg := range partitionConsumer.Messages() {
		var billingMsg billingpb.BillingMessage
		if err := proto.Unmarshal(msg.Value, &billingMsg); err != nil {
			log.Printf("Failed to unmarshal message from DLQ: %v", err)
			continue
		}

		// 실패 메시지 파일 저장
		if err := saveFailedMessage(&billingMsg); err != nil {
			log.Printf("Failed to save failed message to file: %v", err)
		}
	}
}

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	// Dead Letter Queue를 수신
	go consumeDLQMessages(consumer, "billing_dlq")

	// 프로그램이 종료되지 않도록 대기
	select {}
}

파일 구조 예시

failed_billing_records.json 파일에 저장된 메시지의 예시는 다음과 같습니다.

{"user_id":"user123","service_type":"speech_recognition","usage":300,"timestamp":1698423934}
{"user_id":"user456","service_type":"speech_synthesis","usage":1500,"timestamp":1698424934}

이 프로그램을 통해 DLQ에 저장된 메시지를 별도 파일로 백업함으로써, 재처리가 필요하거나 분석이 필요한 과금 데이터를 손쉽게 확인하고 관리할 수 있습니다.


결론

이 설계와 구현을 통해 Kafka와 Protobuf를 이용해 안정적이고 확장성 있는 과금 정보를 처리하는 시스템을 구축할 수 있습니다.