JangBaGeum.gif

[RabbitMQ] '메시지 발행자' 입장에서의 메시지 배달 보장과 성능의 절충점 본문

Backend/RabbitMQ

[RabbitMQ] '메시지 발행자' 입장에서의 메시지 배달 보장과 성능의 절충점

장바금 2025. 6. 14. 21:26

담당하여 운영 중인 도메인 서비스에서 RabbitMQ로 이벤트 메시지를 발행하는 과정을 하드하게 운영하고 있다.

주요 메시지는 주문/결제 등 고객 서비스에 직결되는 데이터였기 때문에, 단 한 건의 메시지도 누락되면 안 된다는 목표를 가질 수밖에 없었다.

하지만 RabbitMQ는 메시지를 보장하는 여러 기법을 제공하고 있고, 이를 잘못 설계하거나 오용하면 "메시지를 보장하려다가 오히려 시스템이 느려지고 불안정해지는" 결과를 초래할 수도 있다.

그래서 이번 과정에서 RabbitMQ의 메시지 발행자 입장에서 가능한 메시지 배달 보장 방식들을 전부 정리해 보고, 각 단계의 장단점과 성능 비용에 대한 감을 잡고 서비스에 맞는 골디락스 존을 찾고자 한다.

 

 

# RabbitMQ 발행자 입장에서의 메시지 배달 보장 8단계

메시지를 절대 잃지 않기 위한 방법은 단순하지 않은 것 같다.

메시지 배당 보장 수준은 8단계로 나누어 정리해 보았다. 각 기능은 독립적인 기능이며 함께 조합함으로써 보장 수준을 높일 수 있다.

 

## 배당 보장 8단계

1단계. 보장하지 않음 (Fire-and-Forget)

2단계: 실패 통보받기 (mandatory 옵션 + return 리스너)

3단계: 발행자 확인 (Publisher Confirms)

4단계 대체 익스체인지 (Alternate Exchange)

5단계: HA큐 (Hight Availabilty Queue)

6단계: 트랜잭션

7단계: 트랜잭션 HA 큐

8단계: 메시지 디스크에 저장 (Durable Queue + Delivery-Mode:2)

 

이어서 각 배달 보장 8단계에 대해 설명한다.

예시 코드의 경우 내가 주로 사용하는 Node.js의 amqplib 모듈을 사용하여 설명한다.

 

### 1단계. 보장하지 않음 (Fire-and-Forget)

가장 기본적인 방식이다. 그냥 `channel.publish()` 를 호출하면 끝이다.

이 방식은 어떠한 보장 방식도 사용하지 않았기 때문에 속도는 가장 빠르다고 할 수 있을 것 같다.

하지만 브로커에 제대로 전달됐는지, 큐에 라우팅 됐는지, 실패했는지 조차 알 수 없다. 메시지가 사라져도 우리는 알 방법이 없다.

메시지가 일부 누락되어도 괜찮은 서비스라면 괜찮겠지만 중요한 메시지에는 적합하지 않다.

 

### 2단계: 실패 통보받기 (mandatory 옵션 + return 리스너)

조금 더 신경을 써서, 메시지가 큐에 도달하지 못한 경우에만 알림을 받도록 설정할 수 있다.

 

메시지 발행 시, `mandatory: true` 옵션을 설정하면, 라우팅 실패 시 `return` 이벤트가 발생하며 전달한 메시지도 함께 반환한다. 브로커에는 도달했지만, 큐가 없거나 바인딩이 잘못됐을 경우 경고를 받을 수 있다.

 

이는 비동기로 동작하며 메시지를 발행한 애플리케이션에서 별도의 비동기 처리가 필요하다.

라우팅 문제가 있을 수 있는 환경이라면, 최소한 이 정도 설정은 해두는 것이 좋을 것 같다.

 

아래 예시는 익스체인지와 큐가 바인딩 돼있지 않다고 했을 때, return 이벤트를 핸들링하는 코드이다.

const amqp = require('amqplib');

(async () => {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  const exchange = 'test_exchange';
  const routingKey = 'not_exist_queue'; // 이 키에 매칭되는 큐가 없다고 가정

  await channel.assertExchange(exchange, 'direct', { durable: false });

  // return 이벤트 리스너 등록 (반환된 메시지 처리)
  channel.on('return', (msg) => {
    // 별도 처리 로직 구현 필요
  });

  const success = channel.publish(
    exchange,
    routingKey,
    Buffer.from('Hello, this may fail to be routed!'),
    {
      mandatory: true // 큐에 라우팅되지 않으면 return으로 알림 받음
    }
  );

  console.log(`Message published (mandatory: true): ${success}`);
})();

 

### 3단계: 발행자 확인 (Publisher Confirms)

발행자가 확실히 메시지를 전달했는지를 확인하려면, RabbitMQ의 Confirm 모드를 사용하는 것이 좋을 것 같다.

메시지를 보낸 후, 브로커로부터 ack(성공) 또는 nack(실패) 신호를 받게 된다.

비동기로 처리할 수 있고, `waitForConfirms()`를 통해 순차적으로 보장할 수 있다.

만약 ack가 오지 않거나 nack가 오면, 이를 감지하여 별도의 처리 로직을 동작시킬 수 있다.

성능은 다소 저하되지만, 실질적인 신뢰성을 확보하는 첫 단계가 아닌가 싶다.

 

아래는 confirmChannel을 생성하고 waitForConfirms()를 사용하는 예시이다.

const amqp = require('amqplib');

(async () => {
  const conn = await amqp.connect('amqp://localhost');

  // ConfirmChannel을 생성해야 함
  const channel = await conn.createConfirmChannel();

  const exchange = 'confirm_exchange';
  const routingKey = 'valid_key';

  await channel.assertExchange(exchange, 'direct', { durable: true });

  // 메시지 발행
  const success = channel.publish(
    exchange,
    routingKey,
    Buffer.from('This message should be confirmed by broker'),
    { persistent: true } // durable queue와 함께 쓰면 디스크에 저장됨
  );

  console.log(`Message sent to exchange: ${success}`);

  // 브로커의 ACK/NACK 확인
  try {
    await channel.waitForConfirms();
    console.log('Broker acknowledged message (ack)');
  } catch (err) {
    console.error('Broker did not confirm message (nack)');
    console.error(err);
  }

  await channel.close();
  await conn.close();
})();

 

`waitForConfirms()` 는 채널의 모든 publish에 대한 confirm을 기다리므로 메시지 발행량이 많다면 비효율적일 수 있다.

고성능이 요구되는 시스템에서는 `channel.publish()` 시, `channel.once('ack', callback)` 을 활용한 비동기 처리하는 것이 괜찮을 것 같다.

 

 

### 4단계 대체 익스체인지 (Alternate Exchange)

메시지가 어디에도 전달되지 못했을 때, 그냥 버릴 것이 아니라 대체 익스체인지로 전달되도록 설정할 수 있다.

`x-alternate-exchange` 를 설정해 두면, 라우팅 실패 메시지를 지정된 fallback exchange로 보내 후속 처리할 수 있다.

예를 들어, 메시지가 발행됐지만 매칭되는 바인딩 큐가 없을 경우 등의 이유로 실패 메시지를 로그로 저장하거나, 관리자에게 알림을 보내는 등의 후처리가 가능하다.

 

실패 메시지를 놓치지 않고 관리하고 싶다면 좋은 선택으로 보인다.

const amqp = require('amqplib');

(async () => {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  const mainExchange = 'main_exchange';
  const altExchange = 'alternate_exchange';
  const validRoutingKey = 'valid_key';
  const invalidRoutingKey = 'invalid_key';
  const failedQueue = 'unroutable_messages';

  // 1. 대체 Exchange 먼저 선언
  await channel.assertExchange(altExchange, 'fanout', { durable: true });

  // 2. 대체 Exchange에 연결될 큐 선언
  await channel.assertQueue(failedQueue, { durable: true });
  await channel.bindQueue(failedQueue, altExchange, '');

  // 3. 주 Exchange 선언하면서 x-alternate-exchange 설정
  await channel.assertExchange(mainExchange, 'direct', {
    durable: true,
    arguments: {
      'x-alternate-exchange': altExchange
    }
  });

  // 4. 주 Exchange에 연결되는 정상 큐 (optional)
  // await channel.assertQueue('valid_queue');
  // await channel.bindQueue('valid_queue', mainExchange, validRoutingKey);

  // 5. 유효하지 않은 라우팅 키로 메시지 전송 (fallback 발생)
  const msg = 'This message will be routed to alternate exchange';
  channel.publish(mainExchange, invalidRoutingKey, Buffer.from(msg), {
    persistent: true
  });

  console.log(`Sent message with invalid routingKey → should go to [${failedQueue}]`);

  // 확인을 위해 fallback 큐에서 메시지 수신 (1초 대기)
  setTimeout(async () => {
    const msg = await channel.get(failedQueue, { noAck: true });
    if (msg) {
      console.log(`Received in fallback queue: ${msg.content.toString()}`);
    } else {
      console.log(`No message in fallback queue.`);
    }
    await channel.close();
    await conn.close();
  }, 1000);
})();

 

대체 익스체인지로 가는 메시지에는 원래 라우팅 키와 익스체인지 정보가 유지된다. 이는 후처리 시, 활용이 가능하며 fallback exchange에도 바인딩된 큐가 없으면 이 메시지는 유실되어 버린다.

 

#### 대체 익스체인지와 mandatory 옵션을 함께 사용할 때의 주의점

대체 익스체인지와 실패 통보(return 이벤트)를 받을 수 있는 mandatory:true 옵션을 함께 사용할 시 중요하게 인지해야 하는 점이 있다.

 

- mandatory:true와 대체 익스체인지가 설정된 경우, 메시지는 반환되지 않는다.

mandatory 플래그를 true로 주고 메시지를 발행해도, 익스체인지에서 대체 익스체인지가 지정되어 있으면 라우팅 되지 못한 메시지는 발행자에게 반환(Basic.Return)되지 않고 대체 키로 전달된다.

즉, 발행자가 라우팅 실패를 감지하고 싶어서 mandatory를 true로 설정했다면, 대체 익스체인지가 있으면 이 동작이 일어나지 않으니 주의해야 한다.

 

- 대체 익스체인지도 라우팅 실패 시 메시지 손실이 가능하다.

대체 익스체인지로 전달된 메시지도 대체 익스체인지에 바인딩된 큐가 없거나 라우팅 키가 맞지 않으면 큐에 저장되지 않고 메시지가 손실될 수 있다.

이 경우에도 발행자에게 Basic.Return이 전달되지 않는다.

 

그렇기에 실무에서 사용할 시에는 대체 익스체인지와 mandatory:true를 동시에 사용할 때는 메시지 반환 (Basic.Return) 이벤트가 발생하지 않는다는 점을 반드시 인지해야 하고, 대체 익스체인지에 큐가 바인딩되어 있지 않으면 메시지가 손실될 수 있으니, 대체 익스체인지의 큐 바인딩 상채를 항상 확인할 수 있어야 한다.

 

 

### 5단계: HA큐 (Hight Availabilty Queue)

RabbitMQ는 노드 간 클러스터링이 가능하지만, 하나의 노드에 장애가 나면 큐와 메시지도 함께 사라질 수 있다.

이를 방지하려면 HA 큐의 사용을 고려해야 한다.

RabbitMQ 클러스터는 여러 노드로 구성되어 있지만, 기본적으로 큐와 메시지는 생성된 노드에만 저장된다.

즉, 한 노드에 장애가 나면 해당 노드의 큐와 메시지도 함께 사라질 수 있다. 그러하여 서비스 안정성을 위해선 큐와 메시지를 여러 노드에 복제해 두는 HA 큐가 필수라고 할 수 있다.

 

HA 큐의 종류는 Mirrored QueueQuorum Queue 가 존재한다.

큰 차이로는 이전 방식과 최신 방식이라는 점과 알고리즘의 차이가 있다. 현재는 Quorum Queue 방식을 권장하고 있고, 분산 합의 알고리즘인 Raft 알고리즘을 사용한다는 점이다. 각 방식에 대해서는 자세히 다루지는 않겠다. 이후 설명은 Quorum Queue 기반으로 설명한다.

 

메시지는 큐에 도착하는 시점에서 연결된 노드(리더+팔로워)에 동시에 저장된다. 

Raft 분산 합의 알고리즘을 사용한다는 점에서 과반수 노드에 메시지가 저장되어야만 메시지 발행이 성공(ACK)으로 처리된다.

이러한 구조 덕분에 일부 노드 장애가 발생해도, 과반수 노드가 살아 있으면 메시지 유실 없이 복구가 가능하다.

 

실무적으로는 HA 큐를 사용할 때, 고려할 점은 아래와 같을 것 같다.

- 신규 시스템에는 반드시 Quorum Queue를 사용 (큐 생성 시, `x-queue-type: quorum` 옵션 추가)

- 복제 노드 수는 기본 3개, 중요도에 따라 노드 수를 올리되 홀수로 세팅

- 모니터리 도구로 복제 상태와 성능을 주기적으로 확인

 

서비스가 클러스터 환경이라면 HA 큐는 필수로 고려해야 할 보장 수준일 것 같다.

 

참고

https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade

 

### 6단계: 트랜잭션

RabbitMQ는 트랜잭션 모드를 제공한다. 발행자가 `txSelect()` 를 호출한 후 여러 메시지를 보내고, 마지막에 `txCommit()`을 호출하면 모두 한 번에 처리된다.

중간에 문제가 발생하면 `toRollback()`을 통해 메시지를 무효화할 수 있다.

하지만 성능이 매우 나쁘다고 할 수 있다. RabbitMQ 공식 문서에서도 잘 사용하지 말라고 할 정도로 느린 것 같다.

보장 수준은 높지만, 실제로는 거의 사용하지 않는다고 한다.

 

const amqp = require('amqplib');

(async () => {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  const queue = 'tx_queue';

  await channel.assertQueue(queue, { durable: true });

  try {
    // 1. 트랜잭션 시작
    await channel.txSelect();
    console.log('Transaction started');

    // 2. 메시지 발행
    channel.sendToQueue(queue, Buffer.from('msg-1'));
    channel.sendToQueue(queue, Buffer.from('msg-2'));
    // 예: 중간에 에러 발생 시뮬레이션
    if (Math.random() < 0.5) {
      throw new Error('Simulated failure during transaction');
    }

    channel.sendToQueue(queue, Buffer.from('msg-3'));

    // 3. 커밋
    await channel.txCommit();
    console.log('Transaction committed');
  } catch (err) {
    // 4. 롤백
    console.error('Transaction failed, rolling back:', err.message);
    await channel.txRollback();
    console.log('Transaction rolled back');
  }

  await channel.close();
  await conn.close();
})();

 

 

### 7단계: 트랜잭션 HA 큐

트랜잭션 모드에 HA 큐를 더하면, 메시지를 한 번에 처리하면서 복제까지 보장하는 최고의 안정성을 얻게 된다.

이는 매우 극단적이라고 표현하고 있다. 예를 들어, 금융 서비스처럼 단 한 건의 메시지도 놓쳐선 안 되는 환경에서만 하용하면 될 것 같다. 

즉, 일반적인 서비스에서는 과도한 수준이 아닐까 싶다.

 

### 8단계: 메시지 디스크에 저장 (Durable Queue + Delivery-Mode:2)

브로커가 갑자기 죽거나, 서비스가 재시작되는 상황에서도 메시지를 살리고 싶다면, 메시지를 디스크에 저장해야 한다.

이를 위해 큐는 `durable`, 메시지는 `deliveryMode:2(Persistent Message)` 로 설정한다. 이 두 설정이 모두 되어야 메시지는 완전히 디스크에 저장된다.

브로커는 이 메시지를 디스크에 기록하고, 재시작 시 다시 로드할 수 있게 된다.

 

디스크에 저장된 메시지는 큐에 쌓여 있는 동안만 디스크에 보존된다.

"소비자"가 메시지를 수신 후, 확인(ACK)을 보내면 디스크의 메시지가 삭제된다.

 

물론 디스크 I/O 가 추가되므로 성능은 떨어진다.

메시지의 양이 과다하면 운영체제의 디스크 I/O 작업에서 팬딩이 걸릴 수 있기 때문에 하드웨어 성능을 잘 확인해야 한다.

 

"발행자 확인" 방식과 함께 사용한다면 신뢰성을 좀 더 높일 수 있다고 한다.

메시지를 디스크에 저장하는 과정은 비동기적으로 처리된다. 완전한 신뢰성을 위해서는 "발행자 확인" 기능을 함께 사용하며, 브로커가 메시지를 디스크에 안전하게 기록한 후에만 발행자에게 완료(ACK)를 보내므로, 발행자는 메시지의 안전한 저장을 확실히 알 수 있다.

 


 

이처럼 RabbitMQ는 다양한 단계의 메시지 보장 수단을 제공하는 것으로 보인다.

하지만 보장 수준이 높아질수록 성능은 분명히 떨어지기 마련인 듯하다.

우리 서비스에 필요한 최소한의 안정성과 감당 가능한 성능 저하의 군형점, 그 '골디락스 존'을 찾는 것이 진짜 중요하고 이 부분에 대해 많은 고민과 테스트를 해야 할 것 같다.

 

참고

도서: RabbitMQ in Depth / 개빈 로이저자(글) ·홍영택번역 / 4장

'Backend > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Exchange와 Exchange Types  (1) 2024.11.07