MQ 과금 청구 모델 with RabbitMQ
- 음성인식, 음성합성 및 의도 분류 API 사용 시에 사용량에 따라 과금 청구 정보를 저장하는 시스템이 있습니다.
- 과금 청구 정보는 MySQL에 저장합니다.
- RabbitMQ를 활용해 안정성 있게 처리하려고 합니다.
- DB Write 실패에 대한 에러 처리를 진행하고 재시도를 해야합니다.
1. Exchange와 Queue 설계
- 과금 정보를 처리하기 위해 RabbitMQ를 기반으로 Exchange와 Queue를 설계해보겠습니다.
1-1. Exchange
- Exchange Name: billing-exchange
- Type: topic (주제별로 메시지를 라우팅할 수 있어야 함)
- Routing Keys:
- billing.speech-recognition: 음성인식 관련 과금 정보
- billing.speech-synthesis: 음성합성 관련 과금 정보
- billing.intent-classification : 의도 분류 관련 과금 정보
1-2. 메시지 구조 설계
모든 메시지는 공통적으로 다음과 같은 필드를 가질 수 있습니다:
- service_name: 서비스 ID (예: "speech-recognition" 또는 "speech-synthesis")
- user_id: 사용자 ID
- transaction_id: 트랜잭션 ID
- timestamp: 과금 발생 시간
음성인식 메시지 구조
- service_id: "speech-recognition"
- duration_in_seconds: 과금 기준이 되는 시간 (초)
- cost_per_second: 초당 과금 금액
음성합성 메시지 구조
- service_id: "speech-synthesis"
- character_count: 과금 기준이 되는 글자 수
- cost_per_character: 글자당 과금 금액
의도 분류 관련 메시지 구조
- service_id: "speech-synthesis"
- character_count: 과금 기준이 되는 글자 수
- cost_per_character: 글자당 과금 금액
1-3. 메시지 전송 예시
- 음성인식 서비스에서 사용자 A의 과금 메시지 전송:
- { "service_id": "speech-recognition", "user_id": "0192a9ad-475c-7fd4-9d46-e8b6ef6c11f9", "timestamp": "2024-10-20T12:34:56Z", "transaction_id": "0192a9ad-475c-722c-8b87-aa713c7da6b1", "duration_in_seconds": 30, "cost_per_second": 0.1 }
2. DLX와 DLQ
2-1. Dead Letter Exchange (DLX)
- Exchange Name: billing-dlx (Dead Letter Exchange)
- Type: topic
- Routing Keys:
- dead.speech-recognition: 음성인식 관련 처리 실패 메시지
- dead.speech-synthesis: 음성합성 관련 처리 실패 메시지
- dead.intent-classification : 의도 분류 관련 과금 정보
2-2. Dead Letter Queue (DLQ)
- Queue for Speech Recognition DLQ: speech-recognition-dlq
- Binding Key: dead.speech-recognition
- 설명: 음성인식 서비스의 처리 실패 메시지를 저장하는 Queue
- Queue for Speech Synthesis DLQ: speech-synthesis-dlq
- Binding Key: dead.speech-synthesis
- 설명: 음성합성 서비스의 처리 실패 메시지를 저장하는 Queue
- Queue for Intent Classification DLQ: intent-classification-dlq
- Binding Key: dead.intent-classification
- 설명: 음성합성 서비스의 처리 실패 메시지를 저장하는 Queue
2-3. 기존 Queue와의 연결
speech-recognition-billing-queue 설정
- Dead Letter Exchange: billing-dlx
- Dead Letter Routing Key: dead.speech-recognition
- Max Retry Attempts: 3 (예: 메시지를 3번까지 재시도 후 실패 시 DLQ로 이동)
2-4. 메시지 흐름 설명
- Billing Queue의 메시지 처리 실패 시:
- Consumer가 메시지를 처리하다가 오류가 발생하면, 메시지가 특정 횟수만큼 재시도됩니다.
- 재시도 후에도 처리에 실패한 메시지는 Dead Letter Exchange(billing-dlx)로 라우팅됩니다.
- billing-dlx는 메시지의 routing key에 따라 메시지를 speech-recognition-dlq 또는 speech-synthesis-dlq로 전달합니다.
- Dead Letter Queue의 역할:
- DLQ에 저장된 메시지들은 실패 원인 분석을 위해 별도로 처리할 수 있습니다.
- DLQ에 있는 메시지들은 관리자가 확인하고, 필요 시 복구 작업을 수행하거나 메시지를 재처리할 수 있습니다.
Producer
- Producer는 음성인식, 음성합성 또는 의도 분류 서비스에서 과금 정보를 생성하여 RabbitMQ로 전송합니다.
- Python에서 RabbitMQ에 메시지를 전송할 때, 연결 문제나 일시적인 오류로 인해 publish가 실패할 수 있습니다. 이러한 경우를 대비해, 재전송 로직을 포함합니다.
- RabbitMQ의 basic_publish 메소드는 기본적으로 메시지를 비동기적으로 전송하며, 이는 네트워크 문제나 RabbitMQ의 문제가 있을 때 오류를 바로 포착하기 어렵게 만듭니다. 이 문제를 해결하기 위해, RabbitMQ에서는 확인 모드(Confirmation Mode) 를 제공합니다. 이를 사용하면 Producer가 메시지를 전송한 후, RabbitMQ로부터 메시지가 정상적으로 수신되었다는 acknowledgment(확인) 응답을 받을 수 있습니다.
import pika
import json
from datetime import datetime
import time
# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
MAX_RETRIES = 3 # 최대 재전송 횟수
RETRY_DELAY = 5 # 재전송 간격 (초)
def send_billing_message(service_type, message_data):
# Exchange 및 메시지 설정
exchange_name = 'billing-exchange'
routing_key = f'billing.{service_type}'
# 재전송 로직
attempt = 0
while attempt < MAX_RETRIES:
try:
# RabbitMQ 연결 설정
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# 메시지 전달 확인 모드 활성화
channel.confirm_delivery()
# 메시지 전송
# 메시지 전송
if channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=json.dumps(message_data),
properties=pika.BasicProperties(delivery_mode=2) # 메시지 영속성을 위해 delivery_mode=2
):
print(f"Successfully sent {service_type} billing message: {message_data}")
connection.close()
break # 메시지 전송 성공 시 루프 종료
else:
raise pika.exceptions.UnroutableError("Message could not be confirmed.")
except (pika.exceptions.UnroutableError, pika.exceptions.AMQPError) as e:
attempt += 1
print(f"Failed to send message (Attempt {attempt}/{MAX_RETRIES}): {e}")
if attempt < MAX_RETRIES:
print(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
else:
print(f"Exceeded max retries. Failed to send message: {message_data}")
# Example usage
send_billing_message("speech-recognition", {
"service_id": "speech-recognition",
"user_id": "0192a9ad-475c-7fd4-9d46-e8b6ef6c11f9",
"timestamp": "2024-10-20T12:34:56Z",
"transaction_id": "0192a9ad-475c-722c-8b87-aa713c7da6b1",
"duration_in_seconds": 30,
"cost_per_second": 0.1
})
코드 설명
- 메시지 전달 확인 모드(confirm_delivery()):
- confirm_delivery()를 호출하면, 채널이 확인 모드로 전환되어 Producer는 RabbitMQ로부터 메시지 수신 확인(acknowledgment)을 받습니다.
- 이 설정을 통해 RabbitMQ가 메시지를 성공적으로 수신했는지 확인할 수 있습니다.
- confirm_delivery()는 RabbitMQ에 추가적인 메시지 확인 작업을 요청하므로, 성능에 영향을 미칠 수 있습니다. 하지만 메시지 전달의 신뢰성을 보장해야 하는 경우 유용하게 사용할 수 있습니다.
- 메시지 전송 후 확인:
- basic_publish가 호출된 후, channel.basic_publish()의 반환 값으로 메시지 전송이 성공했는지 확인할 수 있습니다. 메시지가 정상적으로 전송되었으면 True를 반환하고, 그렇지 않으면 예외를 발생시킵니다.
- 여기서는 메시지가 확인되지 않으면 pika.exceptions.UnroutableError를 발생시키도록 했습니다.
- 재전송 로직:
- 메시지 전송이 실패했을 때 재시도를 수행하며, 일정 횟수를 초과하면 실패로 간주하고 종료합니다.
MySQL DB Table
CREATE DATABASE billing_system;
USE billing_system;
CREATE TABLE billing_transactions (
id INT AUTO_INCREMENT PRIMARY KEY,
service_id VARCHAR(20),
user_id VARCHAR(36),
timestamp DATETIME,
transaction_id VARCHAR(36),
duration_in_seconds INT DEFAULT NULL,
cost DECIMAL(10, 2) DEFAULT NULL,
count INT DEFAULT NULL,
total_cost DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Consumer
- Consumer는 RabbitMQ에서 메시지를 수신하고, 이를 MySQL에 저장합니다.
- 처리 실패 시 DLQ로 전송로 전송
import pika
import json
import mysql.connector
from datetime import datetime
# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
DLX_EXCHANGE = 'billing-dlx'
MAX_RETRIES = 3 # 최대 재시도 횟수
def send_to_dlq(message, reason):
"""
Dead Letter Queue로 메시지를 전송하는 함수
"""
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# Dead Letter Exchange 설정
service_type = message.get('service_id', 'unknown')
routing_key = f'dead.{service_type}'
# 이유를 메시지에 추가
message['error_reason'] = reason
# DLQ로 메시지 전송
channel.basic_publish(
exchange=DLX_EXCHANGE,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # 메시지 영속성
)
print(f"Sent to DLQ: {message}")
except Exception as e:
print(f"Failed to send to DLQ: {e}")
finally:
if 'connection' in locals() and connection.is_open:
connection.close()
def save_transaction_to_db(message):
"""
메시지를 DB에 저장하는 함수
"""
# MySQL 연결 설정
db = mysql.connector.connect(
host="localhost",
user="yourusername",
password="yourpassword",
database="billing_system"
)
cursor = db.cursor()
try:
# 메시지 데이터 파싱
data = json.loads(message)
# 총 비용 계산
total_cost = 0
if data['service_id'] == "speech-recognition":
total_cost = data['duration_in_seconds'] * data['cost_per_second']
elif data['service_id'] == "speech-synthesis":
total_cost = data['character_count'] * data['cost_per_character']
elif data['service_id'] == "intent-classification":
total_cost = data['character_count'] * data['cost_per_character']
# SQL INSERT 실행
sql = """
INSERT INTO billing_transactions (
service_id, user_id, timestamp, transaction_id,
cost, count, total_cost
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
values = (
data['service_id'], data['user_id'], data['timestamp'], data['transaction_id'],
data.get('cost'), data.get('count'),
total_cost
)
cursor.execute(sql, values)
db.commit()
print(f"Transaction saved: {cursor.rowcount} record inserted.")
except Exception as e:
print(f"Error while saving to database: {e}")
raise # 에러 발생 시 예외를 다시 던져 재처리하도록 함
finally:
cursor.close()
db.close()
def callback(ch, method, properties, body):
try:
print("Received message:", body)
save_transaction_to_db(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# 메시지 재시도 횟수 확인
headers = properties.headers if properties.headers else {}
retry_count = headers.get('x-retry-count', 0)
if retry_count >= MAX_RETRIES:
print(f"Max retries reached for message: {body}")
# DLQ로 전송
message = json.loads(body)
send_to_dlq(message, str(e))
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# 재시도 횟수를 증가시키고 다시 전송
retry_count += 1
headers['x-retry-count'] = retry_count
print(f"Retrying message {retry_count}/{MAX_RETRIES} times: {body}")
# 메시지 다시 전송 (현재 Queue로 재전송)
ch.basic_publish(
exchange='',
routing_key=method.routing_key,
body=body,
properties=pika.BasicProperties(
headers=headers,
delivery_mode=2 # 메시지 영속성
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consuming(queue_name):
"""
메시지 수신을 시작하는 함수
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Queue 연결
channel.queue_declare(queue=queue_name, durable=True)
# 메시지 수신
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f"Waiting for messages in {queue_name}. To exit press CTRL+C")
channel.start_consuming()
# Example usage
start_consuming('speech-recognition-billing-queue')
start_consuming('speech-synthesis-billing-queue')
start_consuming('intent-classification-billing-queue')
코드 설명
- 재시도 횟수 관리 (x-retry-count):
- 메시지의 headers 속성에 x-retry-count를 추가하여, 재시도 횟수를 관리합니다.
- 처음 메시지가 수신되면 x-retry-count를 0으로 간주하고, 실패할 때마다 1씩 증가시킵니다.
- 최대 재시도 횟수 초과 시 DLQ 전송:
- 재시도 횟수가 MAX_RETRIES를 초과하면, 메시지를 Dead Letter Queue로 전송합니다. 이때, 실패한 이유를 함께 기록하여 DLQ에 전송합니다.
- DLQ로 전송되면, 기존 Queue에서 해당 메시지를 정상 처리된 것으로 표시하고(Acknowledgement), 추가적인 재처리를 중단합니다.
- 메시지 재시도:
- 재시도 횟수가 MAX_RETRIES에 도달하지 않은 경우, 메시지를 같은 Queue로 다시 전송하여 재처리합니다.
- 메시지를 재전송할 때, headers 속성의 x-retry-count 값을 증가시킵니다.
DLQ 처리 프로그램
- DLQ에 있는 메시지들을 읽어서 원래의 Queue로 다시 보내는 프로그램을 작성합니다. 그리고 특정 월말 일자에는 자동화된 재처리를 중단하고 수동으로 처리할 수 있도록 합니다.
import pika
import json
from datetime import datetime
import schedule
import time
# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
DLX_EXCHANGE = 'billing-dlx'
# 월말 일자 확인 함수
def is_month_end():
today = datetime.today()
# 매달 30일 또는 31일을 월말로 간주 (필요에 따라 변경 가능)
return today.day in [30, 31]
def reprocess_dlq_message(message):
"""
DLQ 메시지를 재처리하는 로직
"""
try:
# RabbitMQ 연결 설정
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# 메시지의 원래 서비스 ID를 확인
service_type = message.get('service_id', 'unknown')
original_routing_key = f'billing.{service_type}'
# 원래 Exchange로 메시지 재전송
channel.basic_publish(
exchange='billing-exchange',
routing_key=original_routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Reprocessed message: {message}")
except Exception as e:
print(f"Error while reprocessing message: {e}")
finally:
if 'connection' in locals() and connection.is_open:
connection.close()
def fetch_and_reprocess_dlq(queue_name):
"""
DLQ에서 메시지를 가져와 재처리하는 함수
"""
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# Queue 연결
channel.queue_declare(queue=queue_name, durable=True)
# DLQ에서 메시지를 가져와 재처리
method_frame, header_frame, body = channel.basic_get(queue_name)
if method_frame:
message = json.loads(body)
reprocess_dlq_message(message)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
else:
print(f"No messages to reprocess in {queue_name}")
except Exception as e:
print(f"Error while fetching messages from DLQ: {e}")
finally:
if 'connection' in locals() and connection.is_open:
connection.close()
def schedule_automated_reprocessing(queue_name):
"""
자동화된 재처리를 스케줄링하는 함수
"""
def automated_reprocessing():
if is_month_end():
print("Month-end detected. Switching to manual processing mode.")
return # 월말에는 자동 재처리 중단
fetch_and_reprocess_dlq(queue_name)
# 매 시간마다 자동화된 재처리 수행 (필요에 따라 주기를 변경할 수 있음)
schedule.every().hour.do(automated_reprocessing)
while True:
schedule.run_pending()
time.sleep(60)
def manual_reprocessing(queue_name):
"""
수동으로 재처리하는 함수 -> 파일로 저장하여 과금을 수기로 확인하거나 입력하도록 처리도 가능
"""
print("manual!!!!!!")
def start_dlq_processing(queue_name):
"""
DLQ 메시지 처리를 시작하는 함수
"""
print("Starting DLQ processing. You can switch between automated and manual modes.")
while True:
mode = input("Enter 'auto' for automated processing or 'manual' for manual processing: ").strip().lower()
if mode == 'auto':
schedule_automated_reprocessing(queue_name)
elif mode == 'manual':
manual_reprocessing(queue_name)
else:
print("Invalid input. Please enter 'auto' or 'manual'.")
# Example usage
start_dlq_processing('speech-recognition-dlq')
읽어주셔서 감사합니다.
'MQ' 카테고리의 다른 글
Message Queue에서 트랜잭션(Transactions) (1) | 2024.10.28 |
---|---|
RabbitMQ 클러스터와 Mirrored Queue (2) | 2024.10.20 |
RabbitMQ 메시지 패턴방식 (0) | 2024.10.20 |
AMQP 프로토콜 (1) | 2024.10.20 |
Message Queue (0) | 2024.10.20 |