MQ
Message Queue
김 정출
2024. 10. 20. 19:06
Message Queue
- Message Queue는 애플리케이션 간 메시지를 비동기적으로 전달하고 처리할 수 있도록 지원하는 소프트웨어 시스템입니다.
- 보통 생산자(Producer)와 소비자(Consumer) 개념을 통해 메시지를 주고받으며, 메시지의 송수신을 비동기적으로 처리함으로써 두 시스템 간의 결합도를 낮추고 확장성을 높이는 역할을 합니다.
개념
주요 개념과 기능은 다음과 같습니다:
- 비동기 처리: 생산자는 메시지를 큐에 보내기만 하면 되고, 소비자는 메시지를 필요할 때 가져가 처리할 수 있습니다. 따라서 두 시스템 간의 실시간 동기화가 필요하지 않습니다.
- 내구성 및 신뢰성: 메시지가 유실되지 않도록 저장하고, 소비자가 메시지를 정상적으로 처리했는지 확인할 수 있습니다. 이로 인해 서비스의 안정성과 신뢰성을 높입니다.
- 로드 밸런싱: 여러 소비자가 있을 경우, 큐는 메시지를 균등하게 분배하여 로드를 효율적으로 분산시킬 수 있습니다.
- 확장성: 메시지 큐를 사용하면 프로듀서와 컨슈머의 처리 속도를 분리할 수 있어 시스템 확장과 유지보수가 용이해집니다.
프레임워크
- RabbitMQ: AMQP(Advanced Message Queuing Protocol)을 사용하는 대표적인 메시지 브로커입니다. 다양한 언어와 프로토콜을 지원하며, 높은 확장성과 성능을 제공합니다.
- Apache Kafka: 대용량 실시간 데이터 스트리밍을 위해 설계된 분산 메시지 큐 시스템으로, 특히 로그 및 스트리밍 데이터 처리에 많이 사용됩니다.
- AWS SQS: AWS에서 제공하는 완전 관리형 메시지 큐 서비스로, 서버 관리가 필요 없이 확장성과 내구성을 갖춘 큐를 사용할 수 있습니다.
내구성
- 내구성(Durability) 설정
- 대부분의 메시지 큐는 메시지 자체와 큐를 디스크에 영구 저장할 수 있는 옵션을 제공합니다. 예를 들어, RabbitMQ에서는 큐를 내구성 있게 설정(durable)하고, 메시지를 영구적으로 저장(persistent)하도록 설정할 수 있습니다.
- Apache Kafka의 경우, 메시지를 로그에 기록하고, 브로커의 여러 파티션에 복제함으로써 메시지 손실을 방지합니다.
- 확인 응답(ACK, Acknowledgment)
- 메시지 큐는 메시지가 소비자에 의해 정상적으로 처리되었는지 확인하는 ACK 메커니즘을 제공합니다. 소비자가 메시지를 받아 처리한 후 브로커에 ACK를 보내면, 브로커는 해당 메시지를 안전하게 삭제합니다.
- 반대로 소비자가 메시지를 처리하는 도중 문제가 발생하면, 메시지가 큐에 남아 다시 처리될 수 있도록 합니다.
- 메시지 복제 및 클러스터링
- Apache Kafka와 같은 분산형 메시지 큐 시스템에서는 메시지를 여러 브로커에 복제함으로써 특정 브로커 장애 발생 시에도 메시지를 안전하게 보존합니다.
- RabbitMQ에서도 클러스터링을 통해 복제를 구성할 수 있으며, 장애 발생 시 자동으로 장애 조치(Failover)가 이루어집니다.
- 트랜잭션 지원
- 메시지 큐 시스템에서 트랜잭션을 지원하여, 메시지를 생산자에서 큐로 전송하는 과정과 소비자가 메시지를 처리하는 과정 모두 트랜잭션으로 묶어서 처리할 수 있습니다. 이를 통해 한쪽에서 문제가 발생해도 데이터 일관성을 유지할 수 있습니다.
- Dead-Letter Queue (DLQ)
- 메시지 처리가 실패하거나 일정 시간 동안 처리되지 못한 메시지를 별도로 저장하는 Dead-Letter Queue를 설정하여 메시지 유실을 방지할 수 있습니다. 소비자가 여러 번의 시도에도 메시지를 처리하지 못하면 해당 메시지를 DLQ로 보내어 나중에 다시 확인하고 조치할 수 있습니다.
- 프로듀서 측에서의 재전송 로직
- 프로듀서에서 메시지를 전송할 때, 전송 성공 여부를 확인하고 실패 시 재전송(retry) 로직을 구현하는 것이 중요합니다. 이를 통해 네트워크 문제나 일시적인 장애로 인한 메시지 손실을 방지할 수 있습니다.
Producer 측에서 재전송 로직
- 프로듀서에서 전송 성공 여부를 확인하고, 재전송 로직을 구현하는 핵심은 메시지 전송 시 응답을 확인하는 것과, 실패 시 이를 처리하는 로직을 구축하는 것입니다.
- 전송의 성공 여부를 확인하고 재전송하는 방법은 주로 아래와 같은 단계로 이루어집니다.
1. 전송 성공 여부 확인
- 프로듀서는 메시지를 전송한 후, 브로커나 메시지 큐 시스템으로부터 응답을 받아야 합니다.
- 이 응답은 전송이 성공적으로 이루어졌는지 확인할 수 있는 정보가 포함됩니다.
- 메시지 큐 시스템에 따라 다르지만, 대부분의 경우 전송 성공 여부를 확인하기 위한 몇 가지 메커니즘을 제공합니다.
2. 메시지 큐별 전송 확인 방법:
- RabbitMQ: 기본적으로 ACK 응답을 사용하여 메시지가 성공적으로 브로커에 도착했는지 확인할 수 있습니다. RabbitMQ에서는 프로듀서가 브로커로부터 ACK를 받지 못하면 해당 메시지를 다시 보낼 수 있습니다.
- Apache Kafka: ACK 레벨을 설정할 수 있으며, 기본적으로 "acks=1"로 설정되어 있습니다. 이는 메시지가 리더 파티션에 성공적으로 기록되었는지 확인할 수 있습니다. "acks=all"로 설정하면 모든 파티션 복제본에 메시지가 성공적으로 기록되었을 때만 ACK 응답을 받습니다.
- AWS SQS: 전송 후 메시지 ID 또는 MD5 해시 값을 반환하여 전송 성공 여부를 확인할 수 있습니다.
3. 전송 실패 시 재전송 로직
- 메시지 전송이 실패할 때 이를 감지하고, 재전송 로직을 수행해야 합니다. 이를 구현하는 방식은 다음과 같습니다.
- 응답 확인 및 예외 처리: 메시지를 전송한 후, 성공적인 응답(ACK)을 받지 못하면 예외를 발생시키거나 실패로 간주합니다. 이때, 응답이 오지 않거나 네트워크 오류가 발생하면 재전송 로직을 실행해야 합니다.
- Fixed Interval Retry: 고정된 간격으로 재시도
- Exponential Back off: 재시도 시간을 점차 늘리는 방식
Python 예시 코드 (Apache Kafka 프로듀서 사용):
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
def send_message(producer, topic, message, retries=3, backoff_factor=2):
attempt = 0
while attempt < retries:
try:
# 메시지 전송
future = producer.send(topic, value=message.encode('utf-8'))
# 전송 성공 확인
result = future.get(timeout=10) # ACK 대기
print(f"Message sent to {result.topic}, partition {result.partition}")
return True
except KafkaError as e:
attempt += 1
wait_time = (2 ** attempt) * backoff_factor
print(f"Attempt {attempt} failed. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
print("Failed to send message after multiple attempts.")
return False
# Kafka 프로듀서 생성
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 메시지 전송 예시
send_message(producer, 'my_topic', 'Hello, Kafka!')
설명: 이 코드는 메시지 전송 후 Kafka 브로커로부터 ACK 응답을 기다립니다. 만약 전송 실패 시 예외가 발생하면, 재전송 로직을 실행합니다.
- 네트워크 문제 처리: 프로듀서 측에서 전송 시 네트워크 오류나 일시적인 장애가 발생할 수 있습니다. 이를 위해 타임아웃을 설정하고, 타임아웃이 발생하면 재전송을 시도하도록 합니다.
- 최대 재시도 횟수 설정: 무한 재시도는 시스템에 부담을 줄 수 있기 때문에, 최대 재시도 횟수를 설정하고, 이를 초과하면 실패한 메시지를 Dead-Letter Queue나 로그로 남기는 방식으로 처리해야 합니다.
- 재전송의 안정성 강화
- Idempotency Key 사용: 중복 전송을 방지하기 위해, 메시지에 Idempotency Key를 포함시켜 메시지 수신 측에서 중복 메시지 처리를 방지합니다.
- 트랜잭션 처리: 프로듀서와 메시지 큐 간의 트랜잭션을 활용해 메시지가 안전하게 전송되도록 보장합니다.
Idempotency Key
- Idempotency Key는 재전송이나 중복 요청 시, 서버 측에서 동일한 작업을 여러 번 처리하지 않도록 하는 고유한 식별자입니다. 이를 통해 시스템이 중복 처리를 방지하고, 일관성을 유지할 수 있습니다.
1. Idempotency Key의 생성
Idempotency Key는 일반적으로 고유한 값으로, 다음과 같은 방법으로 생성할 수 있습니다:
- UUID: 랜덤한 유니버설 고유 식별자(UUID)를 생성합니다.
- 해시값: 메시지의 내용을 기반으로 해시를 생성합니다. 예를 들어, 메시지의 본문과 타임스탬프를 결합한 후 SHA-256 해시값을 생성할 수 있습니다.
- 결합된 정보: 사용자 ID, 요청 시간, 요청 내용을 결합해 고유한 키를 생성할 수 있습니다.
2. Idempotency Key를 메시지에 포함
생성된 Idempotency Key는 메시지의 메타데이터나 헤더에 포함시켜 전송합니다. 이 키를 사용하여 서버 측에서 중복 요청을 감지할 수 있습니다.
예시: Producer 측에서 RabbitMQ 메시지에 Idempotency Key 포함
import uuid
import pika
def send_message_with_idempotency_key(channel, queue, message):
# Idempotency Key 생성 (UUID 사용)
idempotency_key = str(uuid.uuid4())
# 메시지 전송 시 Idempotency Key를 헤더에 포함
properties = pika.BasicProperties(headers={'Idempotency-Key': idempotency_key})
channel.basic_publish(exchange='',
routing_key=queue,
body=message,
properties=properties)
print(f"Message sent with Idempotency Key: {idempotency_key}")
# RabbitMQ 연결 및 채널 생성
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 메시지 전송 예시
send_message_with_idempotency_key(channel, 'my_queue', 'Hello, RabbitMQ!')
3. Consumer 측에서 Idempotency Key 확인
서버(메시지 컨슈머)가 메시지를 수신할 때, 헤더에서 Idempotency Key를 확인합니다. 이 키를 사용해 이전에 처리된 메시지인지 체크한 후, 중복이면 무시하고, 처음 받는 메시지면 처리한 후 데이터베이스나 캐시에 해당 키를 기록합니다.
import redis
# Redis와 같은 캐시를 이용해 Idempotency Key를 저장하고 체크
cache = redis.Redis(host='localhost', port=6379, db=0)
def process_message(ch, method, properties, body):
idempotency_key = properties.headers.get('Idempotency-Key')
if not idempotency_key:
print("No Idempotency Key found, processing message without deduplication.")
# 메시지 처리 로직
return
if cache.exists(idempotency_key):
print(f"Duplicate message detected with Idempotency Key: {idempotency_key}")
# 중복된 메시지 처리 방지
return
# 메시지 처리 후 Idempotency Key를 저장 (TTL 설정 가능)
cache.set(idempotency_key, "processed", ex=3600)
print(f"Message processed with Idempotency Key: {idempotency_key}")
# RabbitMQ에서 메시지 수신 예시
channel.basic_consume(queue='my_queue',
on_message_callback=process_message,
auto_ack=True)
DLQ
- DLQ(Dead-Letter Queue)는 메시지 큐 시스템에서 정상적으로 처리되지 않은 메시지를 따로 모아두기 위해 사용하는 특수한 큐입니다.
- 일반적인 메시지 큐에서 소비자(Consumer)가 특정 메시지를 처리하는데 실패하거나, 일정 조건을 초과하는 경우 그 메시지를 버리지 않고, 나중에 문제를 해결하기 위해 Dead-Letter Queue에 보내서 보관할 수 있습니다.
1. DLQ의 주요 기능과 목적
- 메시지 처리 실패 관리:
- 소비자가 메시지를 처리하는 도중 오류가 발생하거나, 메시지 형식이 잘못되어 처리할 수 없을 때 해당 메시지를 DLQ로 보내어 나중에 다시 처리할 수 있도록 합니다.
- 재시도 한계 초과 관리:
- 일반적으로 메시지 큐는 메시지 소비 시 실패하면 재시도를 수행합니다. 그러나 재시도 횟수가 초과되거나 특정 기간 동안 처리가 되지 않는 메시지는 DLQ로 이동하여 처리 실패를 관리합니다.
- 메시지 손실 방지:
- DLQ는 메시지가 시스템 내에서 완전히 사라지는 것을 방지하고, 후속 분석 및 문제 해결을 가능하게 합니다. 운영자가 DLQ에 쌓인 메시지를 검토하고 문제를 해결하거나 직접 재처리할 수 있습니다.
- 비정상적인 메시지 추적:
- 잘못된 형식의 메시지나 예상치 못한 데이터를 가진 메시지들이 DLQ로 쌓이게 되면, 이를 통해 시스템의 비정상적 동작을 추적하고 원인을 파악할 수 있습니다.
2. DLQ 작동 원리
메시지 큐 시스템에서 DLQ는 일반 큐와 유사하게 동작하지만, 메시지 처리 실패 조건이 발생하면 해당 메시지를 자동으로 DLQ로 이동시킵니다. 각 메시지 큐 시스템은 DLQ를 다르게 지원하고 있으며, 대표적인 예시는 다음과 같습니다
- RabbitMQ: RabbitMQ에서는 특정 큐에 대해 x-dead-letter-exchange와 같은 속성을 설정하여, 메시지의 TTL(Time-To-Live)이 만료되거나, 소비자가 NACK(negative acknowledgment) 응답을 보낸 경우, 해당 메시지를 지정된 DLQ로 이동시킵니다.
# RabbitMQ에서 DLQ 설정 예시 (정책 사용)
policies:
- name: "dlq-policy" # 정책의 이름을 정의
vhost: "/" # 정책을 설정할 가상 호스트 지정 -> 서로 격리된 논리적 메시지 영역
pattern: "^my-queue$" # 정책이 적용될 큐 또는 익스체인지의 이름을 정규 표현식 패턴 지정
definition: # 정책에 적용할 구체적인 속성
ha-mode: "all" # 모든 노드에 복제, "exactly"의 경우 지정한 개수에 복제
ha-params: 3 # 고가용성 모드에서 노드의 수 지정
ha-sync-mode: "automatic" # 복제된 메시지의 동기화 모드
dead-letter-exchange: "my-dlx"
message-ttl: 60000 # 메시지의 TTL을 밀리초 단위로 설정, 소비되지 않으면 삭제됨
expires: 86400000 # 큐의 TTL을 밀리초 단위로 설정, 소비되지 않으면 삭제됨
max-length: 10000 # 큐에 저장할 수 있는 최대 메시지 수를 설정합니다.
max-length-bytes: 10485760 # 큐에 저장할 수 있는 최대 크기를 바이트 단위로 지정
dead-letter-exchange: "my-dlx" # 메시지가 만료되거나 거부될 이동시킬 Exchange
dead-letter-routing-key: "dlq-routing-key" # Exchange로 전송 시 라우팅 키 지정
priority: 0 # 여러 정책이 동일한 큐 또는 익스체인지에 적용될 때 정책의 우선순위를 지정합니다. 값이 높을수록 높은 우선순위를 가지며, 동일한 패턴에 여러 정책이 적용될 때 우선순위가 높은 정책이 적용됩니다.
- Apache Kafka:
- Kafka에서는 DLQ를 별도의 토픽으로 설정할 수 있습니다. 메시지가 처리되지 못했을 때 오프셋과 함께 메시지를 DLQ 토픽으로 전송해, 해당 메시지를 후속 분석 및 처리할 수 있습니다.
- AWS SQS:
- SQS는 기본적으로 DLQ를 지원하며, 표준 큐나 FIFO 큐에 대해 DLQ를 설정할 수 있습니다. 특정 메시지가 여러 번의 재시도에도 실패하면 자동으로 DLQ로 이동됩니다.
3. DLQ 설정 시 고려할 점
- DLQ로 이동할 조건 설정:
- 메시지 큐 시스템마다 DLQ로 이동할 조건을 설정할 수 있습니다. 예를 들어, 메시지가 N번 이상 처리 실패했을 때, 또는 TTL(Time-To-Live)이 만료되었을 때 등 특정 조건을 명확히 정의해야 합니다.
- DLQ 모니터링:
- DLQ에 쌓이는 메시지를 주기적으로 모니터링하고, 해당 메시지의 원인 분석 및 수동 처리 절차를 마련해야 합니다. DLQ에 메시지가 과도하게 쌓이면 시스템의 문제가 지속되고 있다는 신호일 수 있습니다.
- DLQ의 크기와 보존 기간 설정:
- DLQ에 쌓이는 메시지의 양과 보존 기간을 관리해야 합니다. 과도하게 쌓이면 운영상의 문제를 유발할 수 있으므로, 보존 기간이나 크기 제한을 설정하고, 자동으로 삭제되도록 할 수도 있습니다.
4. DLQ 활용 예시
- 고장 분석:
- DLQ에 쌓인 메시지를 분석하여 특정 문제의 원인을 추적할 수 있습니다. 예를 들어, 특정 필드가 누락된 메시지들이 반복적으로 DLQ로 이동하면, 해당 필드가 반드시 필요한 것임을 파악하고 코드 또는 데이터 구조를 수정할 수 있습니다.
- 재처리:
- DLQ에 쌓인 메시지를 운영자가 검토한 후, 문제가 해결된 경우 메시지를 재처리할 수 있습니다. 재처리 시스템을 구축하여 DLQ의 메시지를 원래 큐로 다시 보내는 구조를 설계할 수도 있습니다.
이와 같이 메시지 큐를 사용하면, 시스템 간 데이터 전달의 신뢰성을 높이고, 확장성을 개선할 수 있습니다.