Apache Kafka
Apache Kafka 과금 처리 설계
김 정출
2024. 10. 27. 13:25
Apache Kafka 과금 처리 설계
- Apache Kafka를 사용해 과금 정보를 처리하는 시스템을 설계할 때는 메시지의 순서, 내구성, 오류 처리, 스케일링 등을 고려한 아키텍처가 필요합니다.
- 여기서 과금 정보는 실시간으로 들어오는 데이터일 수 있으며, 예를 들어 음성 인식이나 음성 합성 같은 서비스의 과금 방식(초당 또는 문자당 과금)에 맞춰야 합니다. 아래는 Kafka를 활용한 과금 처리 시스템 설계에 대한 주요 요소들입니다.
설계
1. Topic 설계
- 과금 유형에 따른 Topic 분리: 서비스별로 과금이 다르다면, 각 서비스에 대해 다른 Topic을 생성합니다. 예를 들어, billing_speech_recognition, billing_speech_synthesis 등의 Topic을 두고, 각 Topic은 해당 서비스의 과금 정보를 전달합니다.
- 파티셔닝 전략: 과금 데이터의 처리 속도와 병렬 처리 요구 사항에 따라 파티션을 설정합니다. 파티션 키로는 사용자 ID 또는 세션 ID 등을 사용하여 특정 사용자나 세션의 메시지가 같은 파티션에 들어가도록 하면 순서를 보장할 수 있습니다.
2. 메시지 구조
- 표준화된 메시지 스키마: 과금 데이터를 표준화하기 위해 Avro, Protobuf 등의 스키마를 사용하여 메시지 형식을 정의합니다. 메시지에는 사용자 ID, 과금 대상(예: 인식된 문자 수, 처리 시간), 과금 금액 등의 필드를 포함할 수 있습니다.
- 스키마 레지스트리: 메시지 스키마의 버전을 관리하고 변경에 대응하기 위해 스키마 레지스트리를 사용하는 것이 좋습니다. 이를 통해 소비자가 쉽게 스키마 변경에 적응할 수 있습니다.
3. Producer 설계
- 과금 데이터 수집 및 전송: Producer는 서비스에서 발생하는 과금 정보를 실시간으로 Kafka로 전송합니다. 과금 데이터를 수집하는 로직에서 실패 시 재전송 메커니즘을 구현하여 데이터 손실을 방지합니다.
- 재전송 로직: 메시지 전송이 실패하면 일정 횟수까지 재시도하고, 계속 실패 시에는 별도의 로그 또는 DLQ(Dead Letter Queue)에 저장해 나중에 재처리할 수 있도록 합니다.
4. Consumer 설계
- 실시간 과금 계산: Consumer는 각 메시지를 처리하여 사용량과 과금 정보를 계산합니다. 이러한 계산을 통해 사용자의 과금 총액을 업데이트하고, 일정 시간마다 DB에 저장하여 최종 과금 정보를 유지합니다.
- 동시성 처리: 여러 Consumer 인스턴스를 배포하여 높은 처리량을 지원하고, 파티션을 기반으로 병렬 처리를 최적화합니다.
- 에러 처리: Consumer에서 처리 중 오류가 발생할 경우, 해당 메시지를 DLQ에 저장하고 이후에 재처리하는 메커니즘을 구현합니다.
5. 데이터 보관 및 아카이빙
- 장기 보관: 과금 기록이 필요하다면 Kafka의 Retention 정책을 설정하거나, 별도의 스토리지(예: S3, HDFS)에 저장합니다. 이를 통해 장기적으로 과금 데이터를 관리할 수 있습니다.
- 지속적인 저장소로 전송: 과금 정보를 Kafka에서 처리한 후 최종적으로 데이터베이스에 저장하거나, 배치 작업으로 주기적인 집계 및 보고서를 생성하는 방식으로 데이터를 관리할 수 있습니다.
6. 모니터링 및 알림
- 모니터링: Kafka의 Topic, Consumer Lag, 메시지 처리율 등을 모니터링하여 이상 발생 시 알림을 받을 수 있도록 설정합니다. Prometheus와 Grafana 같은 도구를 사용하여 실시간 모니터링을 구축할 수 있습니다.
- 이상 감지: 특정 사용자나 세션에서 비정상적으로 높은 과금 데이터가 수집될 경우 알림을 통해 관리자가 확인할 수 있도록 합니다.
구현
위를 기반으로 Apache Kafka와 Golang을 사용하여 과금 정보 처리 시스템을 설계하고 구현해보겠습니다. 이 예제에서는 Protobuf를 사용해 메시지 구조를 정의하며, DLQ(Dead Letter Queue)도 설계에 포함합니다.
1. 시스템 설계
- 토픽 및 파티션 구성
- billing_speech_recognition: 음성 인식 과금 정보를 전송하는 Topic.
- billing_speech_synthesis: 음성 합성 과금 정보를 전송하는 Topic.
- billing_dlq: Producer 또는 Consumer에서 처리 실패한 메시지를 저장하는 DLQ Topic.
- 메시지 구조 (Protobuf 정의)
- BillingMessage에는 사용자 ID, 서비스 유형, 과금 데이터(예: 사용 시간, 문자 수 등), 발생 시각 등을 포함합니다.
- Producer 설계
- 서비스에서 발생하는 과금 정보를 Kafka Topic에 전송하는 Producer를 구현합니다.
- 메시지 전송 실패 시 재시도하고, 설정된 재시도 횟수를 초과하면 DLQ에 전송합니다.
- Consumer 설계
- 과금 데이터를 수신하여 필요한 계산을 수행하고, 최종 데이터를 저장합니다.
- Consumer에서 메시지 처리 오류가 발생하면 DLQ에 메시지를 저장합니다.
2. Protobuf 메시지 정의
billing.proto 파일을 만들어 Protobuf 메시지 스키마를 정의합니다.
syntax = "proto3";
package billing;
message BillingMessage {
string user_id = 1;
string service_type = 2; // "speech_recognition" or "speech_synthesis"
int64 usage = 3; // 사용량 (초 또는 문자 수)
int64 timestamp = 4; // 발생 시각 (Unix timestamp)
}
Protobuf 코드 생성
protoc --go_out=. billing.proto
3.Producer 코드
import (
"context"
"fmt"
"time"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"myapp/billingpb" // Protobuf generated Go package
)
func sendMessage(producer sarama.SyncProducer, topic string, message *billingpb.BillingMessage) error {
data, err := proto.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(data),
}
for retries := 0; retries < 3; retries++ { // 최대 3번 재시도
_, _, err = producer.SendMessage(msg)
if err == nil {
fmt.Println("Message sent successfully")
return nil
}
fmt.Printf("Retrying to send message due to error: %v\\n", err)
time.Sleep(2 * time.Second)
}
// 재시도 실패 시 DLQ로 전송
dlqMsg := &sarama.ProducerMessage{
Topic: "billing_dlq",
Value: sarama.ByteEncoder(data),
}
_, _, dlqErr := producer.SendMessage(dlqMsg)
if dlqErr != nil {
return fmt.Errorf("failed to send message to DLQ: %w", dlqErr)
}
return fmt.Errorf("message sent to DLQ after retries")
}
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(fmt.Sprintf("failed to create producer: %v", err))
}
defer producer.Close()
message := &billingpb.BillingMessage{
UserId: "user123",
ServiceType: "speech_recognition",
Usage: 300, // 사용량 예시 (초)
Timestamp: time.Now().Unix(),
}
err = sendMessage(producer, "billing_speech_recognition", message)
if err != nil {
fmt.Printf("Error sending message: %v\\n", err)
}
}
3.2 Consumer 코드
package main
import (
"context"
"fmt"
"log"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"myapp/billingpb"
)
func processMessage(msg *billingpb.BillingMessage) error {
// 메시지 처리 로직 (예: 과금 계산)
fmt.Printf("Processing message for user %s: %+v\\n", msg.UserId, msg)
// 오류 발생 시 예제용으로 임의로 오류 발생
if msg.UserId == "user123" {
return fmt.Errorf("simulated processing error")
}
return nil
}
func consumeMessages(consumer sarama.Consumer, topic string) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Failed to start consumer for partition 0: %v", err)
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
var billingMsg billingpb.BillingMessage
if err := proto.Unmarshal(msg.Value, &billingMsg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
if err := processMessage(&billingMsg); err != nil {
// 처리 실패 시 DLQ로 전송
dlqProducer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
defer dlqProducer.Close()
dlqMsg := &sarama.ProducerMessage{
Topic: "billing_dlq",
Value: sarama.ByteEncoder(msg.Value),
}
_, _, dlqErr := dlqProducer.SendMessage(dlqMsg)
if dlqErr != nil {
log.Printf("Failed to send message to DLQ: %v", dlqErr)
}
log.Printf("Message sent to DLQ due to error: %v", err)
}
}
}
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
go consumeMessages(consumer, "billing_speech_recognition")
select {}
}
4. DLQ 처리 설계
- DLQ에 저장된 메시지는 별도의 Consumer에서 주기적으로 확인하며 재처리할 수 있습니다.
- 일정 횟수 이상 실패한 메시지는 로그로 남기거나 관리자에게 알림을 보내는 방식으로 처리할 수 있습니다.
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"myapp/billingpb" // Protobuf generated Go package
)
// 처리 실패한 메시지를 파일에 JSON 형식으로 저장
func saveFailedMessage(message *billingpb.BillingMessage) error {
file, err := os.OpenFile("failed_billing_records.json", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
msgJSON, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message to JSON: %w", err)
}
_, err = file.WriteString(string(msgJSON) + "\\n")
if err != nil {
return fmt.Errorf("failed to write message to file: %w", err)
}
log.Printf("Failed message saved to file: %v", message)
return nil
}
// Kafka DLQ Consumer
func consumeDLQMessages(consumer sarama.Consumer, topic string) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Failed to start consumer for partition 0: %v", err)
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
var billingMsg billingpb.BillingMessage
if err := proto.Unmarshal(msg.Value, &billingMsg); err != nil {
log.Printf("Failed to unmarshal message from DLQ: %v", err)
continue
}
// 실패 메시지 파일 저장
if err := saveFailedMessage(&billingMsg); err != nil {
log.Printf("Failed to save failed message to file: %v", err)
}
}
}
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// Dead Letter Queue를 수신
go consumeDLQMessages(consumer, "billing_dlq")
// 프로그램이 종료되지 않도록 대기
select {}
}
파일 구조 예시
failed_billing_records.json 파일에 저장된 메시지의 예시는 다음과 같습니다.
{"user_id":"user123","service_type":"speech_recognition","usage":300,"timestamp":1698423934}
{"user_id":"user456","service_type":"speech_synthesis","usage":1500,"timestamp":1698424934}
이 프로그램을 통해 DLQ에 저장된 메시지를 별도 파일로 백업함으로써, 재처리가 필요하거나 분석이 필요한 과금 데이터를 손쉽게 확인하고 관리할 수 있습니다.
결론
이 설계와 구현을 통해 Kafka와 Protobuf를 이용해 안정적이고 확장성 있는 과금 정보를 처리하는 시스템을 구축할 수 있습니다.