RabbitMQ - Queue에 메시지 전달 및 처리하기 본문
1. Queue 생성
import dotenv from 'dotenv';
import amqplib from 'amqplib/callback_api.js'
'hostname': process.env.RABBITMQ_HOST,
'username': process.env.RABBITMQ_ID,
'password': process.env.RABBITMQ_PASSWORD,
'port': process.env.RABBITMQ_PORT,
'vhost': process.env.RABBITMQ_VHOST,
}, (err, conn) => {
if(err) {
console.log("Err: ", err);
return false;
conn.createChannel((err, ch) => {
if(err) {
console.log("Err: ", err);
return false;
ch.assertQueue("큐 이름", {옵션}); // queue를 생성하려면 assertQueue를 사용한다.
- assertQueue() : 큐를 생성한다.
- durable : true로 설정시, RabbitMQ를 재시작해도 해당 큐가 존재한다. 기본값은 true이다.
- messageTtl : 큐에 도착한 메시지가 지정한 시간 이후에 삭제된다. ms단위로 지정한다.(1000ms = 1초)
- expires : 큐에 대한 작업이 일정시간 없을 경우 큐를 자동으로 삭제한다. 큐가 존재하는지 확인하는 checkQueue(), 큐의 메시지를 확인하는 get()등을 호출할 경우, 그 시점 이후로 expires 시간이 작동한다. 큐를 생성할 때, expires를 5초로 설정, 생성한지 3초 후에 checkQueue로 큐 존재 확인을 하는 간단한 로직을 실행하면 생성된 큐는 8초 후에 삭제가 된다. checkQueue로 인해 큐 삭제 시간이 늦춰졌기 때문이다. 또한 expires 옵션을 설정할 때는 ms단위로 지정한다.(1000ms = 1초)
- maxLength : 큐에 보관할 수 있는 최대 메시지 개수를 제한한다. 지정된 개수를 초과하는 메시지가 큐에 publish되면 해당 메시지를 버린다.
1-1. 큐에 exchange 바인딩
conn.createChannel((err, ch) => {
ch.assertExchange('fruit.direct', 'direct');
ch.assertQueue('orange-pub',{durable: true});
ch.bindQueue('orange-pub', 'fruit.direct', 'yummy.orange');
// 큐 이름, exchange이름, 라우팅키
- assertExchange() : fruit.direct라는 exchange를 생성하고 타입은 direct로 설정한다.
- assertQueue() : orange-pub라는 큐를 생성하고 durable옵션을 true로 설정한다.
- bindQueue() : orange-pub 큐에 fruit.direct라는 exchange를 바인딩하고 routing key는 yummy.orange로 설정한다.
큐에 exchange를 바인딩한 후 메시지를 전달할 때, 라우팅 키가 일치하는 메시지는 orange-pub라는 큐에 전달된다.
2. 큐에 메시지 publish하기
conn.createChannel((err, ch) => {
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {옵션});
// exchange이름, 라우팅 키, 메시지 내용, 옵션
- publish() : exchange에 따라 일치하는 라우팅 키를 가진 큐에 메시지를 전달한다. 메시지를 전달할 때는 Buffer타입으로 전송해야한다.
- headers : 메시지에 헤더값을 지정하는 옵션이다. key, value 형식으로 값을 추가한다. exchange의 타입이 headers일 경우, 이 헤더값을 통해 전달될 큐가 정해진다.
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
} // header에 key, value 형식으로 값을 추가한다.
- contentType : 메시지의 MIME타입을 지정한다.
- timestamp : 메시지의 시간값을 지정한다. 정수형으로 입력해야한다.
- expiration : 보통 메시지 큐는 consume()을 통해 받은 메시지를 소비하는데 expiration을 사용하면 일정 시간이 지난 메시지는 자동으로 제거한다. ms단위로 지정한다.(1000ms = 1초)
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
}, // header에 key, value 형식으로 값을 추가한다.
contentType: 'text/plain', // MIME 타입 지정
timestamp: 1680349939463, // 시간
expiration: 5000, // 메시지 만료 시간(ms)
2-1. 특정 큐에 메시지 전달
sendToQueue를 사용하면 지정된 큐에 메시지를 전달할 수 있다.
conn.createChannel((err, ch) => {
ch.sendToQueue("큐 이름", "메시지 내용", {옵션});
지정된 큐에 메시지를 전달한다. publish() 할 때와 마찬가지로 메시지는 Buffer 형식으로 전달되야 한다. sendToQueue는 exchange의 direct와 비슷하다. 둘다 이름이 일치해야 전달되기 때문이다.
하지만 exchange는 routing key를, sendToQueue는 큐 이름의 일치 여부에 따라 전달된다는 차이점이 존재한다.
3. 메시지 확인
큐에 전달된 메시지는 consume()을 사용하거나 get()을 사용하여 확인할 수 있다.
- consume() : 큐에 전달된 메시지를 비동기식으로 처리하며 여러개의 메시지를 한번에 consume 할 수 있다. get()보다 성능적으로 우수하기에 메시지를 소비하기에 가장 이상적인 방법이다.
ch.prefetch(1); // 한번에 consume할 메시지 수를 정의한다.
ch.consume("orange-pub", (msg) => {
console.log("Fields: ", msg.fields); // exchange, routing key 등의 정보
console.log("Properties: ", msg.properties); // cotentType, headers 등의 정보
console.log("Content: ", (msg.content).toString()); // 메시지 내용
}, {
'noAck': true // 옵션
- prefetch : 한번에 consume될 메시지의 수를 정의한다. prefetch(5)로 설정하면 5개의 메시지가 한번에 consume처리된다.
- fields : exchange, routing key, 지정된 tag 등의 정보를 출력한다.
- properties : 메시지를 publish할 때 같이 전송된 설정값들을 출력한다. headers, timestamp, contentType등을 출력한다.
- content : 메시지 내용을 출력한다. toString()없이 출력하면 Buffer 형식으로 출력된다. 메시지 내용이 문자면 toString()을 추가하여 출력한다. 만약 메시지 내용이 JSON 형식이면 JSON.parse(msg.content)로 출력한다.
- noAck : consume된 메시지가 처리 완료되었는지의 유무를 나타낸다. true로 설정시, 전달된 메시지가 승인처리되어 큐에서 사라지게 된다. false로 설정시, 아직 완료작업이 이루어지지 않았다는 것을 의미하며 해당 메시지를 대기열에 추가하여 다음 consume때 처리한다.
- get : consume과 비슷하게 큐에 전달된 메시지를 처리하는 메서드이지만 동기식으로 처리하고 메시지를 한개씩만 처리하기에 권장되지 않는 방법이다.
ch.get("orange-pub", {
'noAck': true
}, (err, msg) => {
console.log("Fields: ", msg.fields);
console.log("Properties: ", msg.properties);
console.log("Content: ", JSON.parse(msg.content));
참고 사이트 :
amqplib | AMQP 0-9-1 library and client for Node.JS
AMQP 0-9-1 library and client for Node.JS amqplib implements the machinery needed to make clients for AMQP 0-9-1, and includes such a client. Why phrase it that way around? Because AMQP is complicated enough that there are a few different ways of presentin
Chapter 5. Don’t get messages; consume them · RabbitMQ in Depth
Consuming messages · Tuning consumer throughput · When consumers and queues are exclusive · Specifying a quality of service for your consumers
RabbitMQ" What are "Ready" and "Unacked" types of messages?
I'm getting confused between these two types of messages in RabbitMQ. I've seen that some of my queues have 0 "Unacked" and 1000 "Ready" messages, while some have 1000 "Unacked" and 0 "Ready" mess...
what is the difference between assertQueue and send To Queue in RabbitMq
I have found Example for Sending String in rabittMq and Receiving from the queue but I am not clear about about these methods - assertQueue , sendToQueue send.js var amqp = require('amqplib/
FAQ: RabbitMQ Basic Consumer vs. RabbitMQ Basic Get - CloudAMQP
RabbitMQ offers two ways to receive messages which are the polling-based basic.get and the push-based basic.consume. How to decide between using a consumer or a get request depends on the workload that each creates.
