MQ

MQ 과금 청구 모델 with RabbitMQ

김 정출 2024. 10. 20. 20:46

MQ 과금 청구 모델 with RabbitMQ

  • 음성인식, 음성합성 및 의도 분류 API 사용 시에 사용량에 따라 과금 청구 정보를 저장하는 시스템이 있습니다.
  • 과금 청구 정보는 MySQL에 저장합니다.
  • RabbitMQ를 활용해 안정성 있게 처리하려고 합니다.
  • DB Write 실패에 대한 에러 처리를 진행하고 재시도를 해야합니다.

1. Exchange와 Queue 설계

  • 과금 정보를 처리하기 위해 RabbitMQ를 기반으로 Exchange와 Queue를 설계해보겠습니다.

1-1. Exchange

  • Exchange Name: billing-exchange
  • Type: topic (주제별로 메시지를 라우팅할 수 있어야 함)
  • Routing Keys:
    • billing.speech-recognition: 음성인식 관련 과금 정보
    • billing.speech-synthesis: 음성합성 관련 과금 정보
    • billing.intent-classification : 의도 분류 관련 과금 정보

1-2. 메시지 구조 설계

모든 메시지는 공통적으로 다음과 같은 필드를 가질 수 있습니다:

  • service_name: 서비스 ID (예: "speech-recognition" 또는 "speech-synthesis")
  • user_id: 사용자 ID
  • transaction_id: 트랜잭션 ID
  • timestamp: 과금 발생 시간

음성인식 메시지 구조

  • service_id: "speech-recognition"
  • duration_in_seconds: 과금 기준이 되는 시간 (초)
  • cost_per_second: 초당 과금 금액

음성합성 메시지 구조

  • service_id: "speech-synthesis"
  • character_count: 과금 기준이 되는 글자 수
  • cost_per_character: 글자당 과금 금액

의도 분류 관련 메시지 구조

  • service_id: "speech-synthesis"
  • character_count: 과금 기준이 되는 글자 수
  • cost_per_character: 글자당 과금 금액

1-3. 메시지 전송 예시

  1. 음성인식 서비스에서 사용자 A의 과금 메시지 전송:
  2. { "service_id": "speech-recognition", "user_id": "0192a9ad-475c-7fd4-9d46-e8b6ef6c11f9", "timestamp": "2024-10-20T12:34:56Z", "transaction_id": "0192a9ad-475c-722c-8b87-aa713c7da6b1", "duration_in_seconds": 30, "cost_per_second": 0.1 }

2. DLX와 DLQ

2-1. Dead Letter Exchange (DLX)

  • Exchange Name: billing-dlx (Dead Letter Exchange)
  • Type: topic
  • Routing Keys:
    • dead.speech-recognition: 음성인식 관련 처리 실패 메시지
    • dead.speech-synthesis: 음성합성 관련 처리 실패 메시지
    • dead.intent-classification : 의도 분류 관련 과금 정보

2-2. Dead Letter Queue (DLQ)

  • Queue for Speech Recognition DLQ: speech-recognition-dlq
    • Binding Key: dead.speech-recognition
    • 설명: 음성인식 서비스의 처리 실패 메시지를 저장하는 Queue
  • Queue for Speech Synthesis DLQ: speech-synthesis-dlq
    • Binding Key: dead.speech-synthesis
    • 설명: 음성합성 서비스의 처리 실패 메시지를 저장하는 Queue
  • Queue for Intent Classification DLQ: intent-classification-dlq
    • Binding Key: dead.intent-classification
    • 설명: 음성합성 서비스의 처리 실패 메시지를 저장하는 Queue

2-3. 기존 Queue와의 연결

speech-recognition-billing-queue 설정

  • Dead Letter Exchange: billing-dlx
  • Dead Letter Routing Key: dead.speech-recognition
  • Max Retry Attempts: 3 (예: 메시지를 3번까지 재시도 후 실패 시 DLQ로 이동)

2-4. 메시지 흐름 설명

  1. Billing Queue의 메시지 처리 실패 시:
    • Consumer가 메시지를 처리하다가 오류가 발생하면, 메시지가 특정 횟수만큼 재시도됩니다.
    • 재시도 후에도 처리에 실패한 메시지는 Dead Letter Exchange(billing-dlx)로 라우팅됩니다.
    • billing-dlx는 메시지의 routing key에 따라 메시지를 speech-recognition-dlq 또는 speech-synthesis-dlq로 전달합니다.
  2. Dead Letter Queue의 역할:
    • DLQ에 저장된 메시지들은 실패 원인 분석을 위해 별도로 처리할 수 있습니다.
    • DLQ에 있는 메시지들은 관리자가 확인하고, 필요 시 복구 작업을 수행하거나 메시지를 재처리할 수 있습니다.

Producer

  • Producer는 음성인식, 음성합성 또는 의도 분류 서비스에서 과금 정보를 생성하여 RabbitMQ로 전송합니다.
  • Python에서 RabbitMQ에 메시지를 전송할 때, 연결 문제나 일시적인 오류로 인해 publish가 실패할 수 있습니다. 이러한 경우를 대비해, 재전송 로직을 포함합니다.
  • RabbitMQ의 basic_publish 메소드는 기본적으로 메시지를 비동기적으로 전송하며, 이는 네트워크 문제나 RabbitMQ의 문제가 있을 때 오류를 바로 포착하기 어렵게 만듭니다. 이 문제를 해결하기 위해, RabbitMQ에서는 확인 모드(Confirmation Mode) 를 제공합니다. 이를 사용하면 Producer가 메시지를 전송한 후, RabbitMQ로부터 메시지가 정상적으로 수신되었다는 acknowledgment(확인) 응답을 받을 수 있습니다.
import pika
import json
from datetime import datetime
import time

# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
MAX_RETRIES = 3  # 최대 재전송 횟수
RETRY_DELAY = 5  # 재전송 간격 (초)

def send_billing_message(service_type, message_data):
    # Exchange 및 메시지 설정
    exchange_name = 'billing-exchange'
    routing_key = f'billing.{service_type}'

    # 재전송 로직
    attempt = 0
    while attempt < MAX_RETRIES:
        try:
            # RabbitMQ 연결 설정
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
            channel = connection.channel()

            # 메시지 전달 확인 모드 활성화
            channel.confirm_delivery()

            # 메시지 전송
             # 메시지 전송
            if channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=json.dumps(message_data),
                properties=pika.BasicProperties(delivery_mode=2)  # 메시지 영속성을 위해 delivery_mode=2
            ):
                print(f"Successfully sent {service_type} billing message: {message_data}")
                connection.close()
                break  # 메시지 전송 성공 시 루프 종료
            else:
                raise pika.exceptions.UnroutableError("Message could not be confirmed.")

        except (pika.exceptions.UnroutableError, pika.exceptions.AMQPError) as e:
            attempt += 1
            print(f"Failed to send message (Attempt {attempt}/{MAX_RETRIES}): {e}")
            if attempt < MAX_RETRIES:
                print(f"Retrying in {RETRY_DELAY} seconds...")
                time.sleep(RETRY_DELAY)
            else:
                print(f"Exceeded max retries. Failed to send message: {message_data}")

# Example usage
send_billing_message("speech-recognition", {
    "service_id": "speech-recognition",
    "user_id": "0192a9ad-475c-7fd4-9d46-e8b6ef6c11f9",
    "timestamp": "2024-10-20T12:34:56Z",
    "transaction_id": "0192a9ad-475c-722c-8b87-aa713c7da6b1",
    "duration_in_seconds": 30,
    "cost_per_second": 0.1
})

코드 설명

  1. 메시지 전달 확인 모드(confirm_delivery()):
    • confirm_delivery()를 호출하면, 채널이 확인 모드로 전환되어 Producer는 RabbitMQ로부터 메시지 수신 확인(acknowledgment)을 받습니다.
    • 이 설정을 통해 RabbitMQ가 메시지를 성공적으로 수신했는지 확인할 수 있습니다.
    • confirm_delivery()는 RabbitMQ에 추가적인 메시지 확인 작업을 요청하므로, 성능에 영향을 미칠 수 있습니다. 하지만 메시지 전달의 신뢰성을 보장해야 하는 경우 유용하게 사용할 수 있습니다.
  2. 메시지 전송 후 확인:
    • basic_publish가 호출된 후, channel.basic_publish()의 반환 값으로 메시지 전송이 성공했는지 확인할 수 있습니다. 메시지가 정상적으로 전송되었으면 True를 반환하고, 그렇지 않으면 예외를 발생시킵니다.
    • 여기서는 메시지가 확인되지 않으면 pika.exceptions.UnroutableError를 발생시키도록 했습니다.
  3. 재전송 로직:
    • 메시지 전송이 실패했을 때 재시도를 수행하며, 일정 횟수를 초과하면 실패로 간주하고 종료합니다.

MySQL DB Table

CREATE DATABASE billing_system;

USE billing_system;

CREATE TABLE billing_transactions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    service_id VARCHAR(20),
    user_id VARCHAR(36),
    timestamp DATETIME,
    transaction_id VARCHAR(36),
    duration_in_seconds INT DEFAULT NULL,
    cost DECIMAL(10, 2) DEFAULT NULL,
    count INT DEFAULT NULL,
    total_cost DECIMAL(10, 2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Consumer

  • Consumer는 RabbitMQ에서 메시지를 수신하고, 이를 MySQL에 저장합니다.
  • 처리 실패 시 DLQ로 전송로 전송
import pika
import json
import mysql.connector
from datetime import datetime

# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
DLX_EXCHANGE = 'billing-dlx'
MAX_RETRIES = 3  # 최대 재시도 횟수

def send_to_dlq(message, reason):
    """
    Dead Letter Queue로 메시지를 전송하는 함수
    """
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
        channel = connection.channel()

        # Dead Letter Exchange 설정
        service_type = message.get('service_id', 'unknown')
        routing_key = f'dead.{service_type}'

        # 이유를 메시지에 추가
        message['error_reason'] = reason

        # DLQ로 메시지 전송
        channel.basic_publish(
            exchange=DLX_EXCHANGE,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)  # 메시지 영속성
        )
        print(f"Sent to DLQ: {message}")

    except Exception as e:
        print(f"Failed to send to DLQ: {e}")
    finally:
        if 'connection' in locals() and connection.is_open:
            connection.close()

def save_transaction_to_db(message):
    """
    메시지를 DB에 저장하는 함수
    """
    # MySQL 연결 설정
    db = mysql.connector.connect(
        host="localhost",
        user="yourusername",
        password="yourpassword",
        database="billing_system"
    )
    cursor = db.cursor()

    try:
        # 메시지 데이터 파싱
        data = json.loads(message)

        # 총 비용 계산
        total_cost = 0
        if data['service_id'] == "speech-recognition":
            total_cost = data['duration_in_seconds'] * data['cost_per_second']
        elif data['service_id'] == "speech-synthesis":
            total_cost = data['character_count'] * data['cost_per_character']
        elif data['service_id'] == "intent-classification":
            total_cost = data['character_count'] * data['cost_per_character']

        # SQL INSERT 실행
        sql = """
        INSERT INTO billing_transactions (
            service_id, user_id, timestamp, transaction_id, 
            cost, count, total_cost
        ) VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        values = (
            data['service_id'], data['user_id'], data['timestamp'], data['transaction_id'],
            data.get('cost'), data.get('count'),
            total_cost
        )

        cursor.execute(sql, values)
        db.commit()
        print(f"Transaction saved: {cursor.rowcount} record inserted.")

    except Exception as e:
        print(f"Error while saving to database: {e}")
        raise  # 에러 발생 시 예외를 다시 던져 재처리하도록 함

    finally:
        cursor.close()
        db.close()

def callback(ch, method, properties, body):
    try:
        print("Received message:", body)
        save_transaction_to_db(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        # 메시지 재시도 횟수 확인
        headers = properties.headers if properties.headers else {}
        retry_count = headers.get('x-retry-count', 0)

        if retry_count >= MAX_RETRIES:
            print(f"Max retries reached for message: {body}")
            # DLQ로 전송
            message = json.loads(body)
            send_to_dlq(message, str(e))
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # 재시도 횟수를 증가시키고 다시 전송
            retry_count += 1
            headers['x-retry-count'] = retry_count
            print(f"Retrying message {retry_count}/{MAX_RETRIES} times: {body}")

            # 메시지 다시 전송 (현재 Queue로 재전송)
            ch.basic_publish(
                exchange='',
                routing_key=method.routing_key,
                body=body,
                properties=pika.BasicProperties(
                    headers=headers,
                    delivery_mode=2  # 메시지 영속성
                )
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consuming(queue_name):
    """
    메시지 수신을 시작하는 함수
    """
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # Queue 연결
    channel.queue_declare(queue=queue_name, durable=True)

    # 메시지 수신
    channel.basic_consume(queue=queue_name, on_message_callback=callback)

    print(f"Waiting for messages in {queue_name}. To exit press CTRL+C")
    channel.start_consuming()

# Example usage
start_consuming('speech-recognition-billing-queue')
start_consuming('speech-synthesis-billing-queue')
start_consuming('intent-classification-billing-queue')

코드 설명

  1. 재시도 횟수 관리 (x-retry-count):
    • 메시지의 headers 속성에 x-retry-count를 추가하여, 재시도 횟수를 관리합니다.
    • 처음 메시지가 수신되면 x-retry-count를 0으로 간주하고, 실패할 때마다 1씩 증가시킵니다.
  2. 최대 재시도 횟수 초과 시 DLQ 전송:
    • 재시도 횟수가 MAX_RETRIES를 초과하면, 메시지를 Dead Letter Queue로 전송합니다. 이때, 실패한 이유를 함께 기록하여 DLQ에 전송합니다.
    • DLQ로 전송되면, 기존 Queue에서 해당 메시지를 정상 처리된 것으로 표시하고(Acknowledgement), 추가적인 재처리를 중단합니다.
  3. 메시지 재시도:
    • 재시도 횟수가 MAX_RETRIES에 도달하지 않은 경우, 메시지를 같은 Queue로 다시 전송하여 재처리합니다.
    • 메시지를 재전송할 때, headers 속성의 x-retry-count 값을 증가시킵니다.

DLQ 처리 프로그램

  • DLQ에 있는 메시지들을 읽어서 원래의 Queue로 다시 보내는 프로그램을 작성합니다. 그리고 특정 월말 일자에는 자동화된 재처리를 중단하고 수동으로 처리할 수 있도록 합니다.
import pika
import json
from datetime import datetime
import schedule
import time

# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost'
DLX_EXCHANGE = 'billing-dlx'

# 월말 일자 확인 함수
def is_month_end():
    today = datetime.today()
    # 매달 30일 또는 31일을 월말로 간주 (필요에 따라 변경 가능)
    return today.day in [30, 31]

def reprocess_dlq_message(message):
    """
    DLQ 메시지를 재처리하는 로직
    """
    try:
        # RabbitMQ 연결 설정
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
        channel = connection.channel()

        # 메시지의 원래 서비스 ID를 확인
        service_type = message.get('service_id', 'unknown')
        original_routing_key = f'billing.{service_type}'

        # 원래 Exchange로 메시지 재전송
        channel.basic_publish(
            exchange='billing-exchange',
            routing_key=original_routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        print(f"Reprocessed message: {message}")

    except Exception as e:
        print(f"Error while reprocessing message: {e}")
    finally:
        if 'connection' in locals() and connection.is_open:
            connection.close()

def fetch_and_reprocess_dlq(queue_name):
    """
    DLQ에서 메시지를 가져와 재처리하는 함수
    """
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
        channel = connection.channel()

        # Queue 연결
        channel.queue_declare(queue=queue_name, durable=True)

        # DLQ에서 메시지를 가져와 재처리
        method_frame, header_frame, body = channel.basic_get(queue_name)

        if method_frame:
            message = json.loads(body)
            reprocess_dlq_message(message)
            channel.basic_ack(delivery_tag=method_frame.delivery_tag)
        else:
            print(f"No messages to reprocess in {queue_name}")

    except Exception as e:
        print(f"Error while fetching messages from DLQ: {e}")
    finally:
        if 'connection' in locals() and connection.is_open:
            connection.close()

def schedule_automated_reprocessing(queue_name):
    """
    자동화된 재처리를 스케줄링하는 함수
    """
    def automated_reprocessing():
        if is_month_end():
            print("Month-end detected. Switching to manual processing mode.")
            return  # 월말에는 자동 재처리 중단
        fetch_and_reprocess_dlq(queue_name)

    # 매 시간마다 자동화된 재처리 수행 (필요에 따라 주기를 변경할 수 있음)
    schedule.every().hour.do(automated_reprocessing)

    while True:
        schedule.run_pending()
        time.sleep(60)

def manual_reprocessing(queue_name):
    """
    수동으로 재처리하는 함수 -> 파일로 저장하여 과금을 수기로 확인하거나 입력하도록 처리도 가능
    """
    print("manual!!!!!!")

def start_dlq_processing(queue_name):
    """
    DLQ 메시지 처리를 시작하는 함수
    """
    print("Starting DLQ processing. You can switch between automated and manual modes.")
    while True:
        mode = input("Enter 'auto' for automated processing or 'manual' for manual processing: ").strip().lower()
        if mode == 'auto':
            schedule_automated_reprocessing(queue_name)
        elif mode == 'manual':
            manual_reprocessing(queue_name)
        else:
            print("Invalid input. Please enter 'auto' or 'manual'.")

# Example usage
start_dlq_processing('speech-recognition-dlq')

읽어주셔서 감사합니다.