일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- docker
- fastapi
- python
- SQL
- React
- Backbone.js
- deep learning
- Redis
- 기초 수학
- For
- linux
- 블레이드 템플릿
- NCP
- webpack
- Redux
- php
- Go
- javascript
- phpredis
- rabbitmq
- Node
- CentOS
- Machine Learning
- laravel
- nginx
- Switch
- Babel
- nodejs
- mariadb
- AWS
- Today
- Total
개발일기
RabbitMQ - Queue에 메시지 전달 및 처리하기 본문
1. Queue 생성
import dotenv from 'dotenv';
import amqplib from 'amqplib/callback_api.js'
dotenv.config();
amqplib.connect({
'hostname': process.env.RABBITMQ_HOST,
'username': process.env.RABBITMQ_ID,
'password': process.env.RABBITMQ_PASSWORD,
'port': process.env.RABBITMQ_PORT,
'vhost': process.env.RABBITMQ_VHOST,
}, (err, conn) => {
if(err) {
console.log("Err: ", err);
return false;
}
conn.createChannel((err, ch) => {
if(err) {
console.log("Err: ", err);
return false;
}
ch.assertQueue("큐 이름", {옵션}); // queue를 생성하려면 assertQueue를 사용한다.
});
})
- assertQueue() : 큐를 생성한다.
- durable : true로 설정시, RabbitMQ를 재시작해도 해당 큐가 존재한다. 기본값은 true이다.
- messageTtl : 큐에 도착한 메시지가 지정한 시간 이후에 삭제된다. ms단위로 지정한다.(1000ms = 1초)
- expires : 큐에 대한 작업이 일정시간 없을 경우 큐를 자동으로 삭제한다. 큐가 존재하는지 확인하는 checkQueue(), 큐의 메시지를 확인하는 get()등을 호출할 경우, 그 시점 이후로 expires 시간이 작동한다. 큐를 생성할 때, expires를 5초로 설정, 생성한지 3초 후에 checkQueue로 큐 존재 확인을 하는 간단한 로직을 실행하면 생성된 큐는 8초 후에 삭제가 된다. checkQueue로 인해 큐 삭제 시간이 늦춰졌기 때문이다. 또한 expires 옵션을 설정할 때는 ms단위로 지정한다.(1000ms = 1초)
- maxLength : 큐에 보관할 수 있는 최대 메시지 개수를 제한한다. 지정된 개수를 초과하는 메시지가 큐에 publish되면 해당 메시지를 버린다.
1-1. 큐에 exchange 바인딩
conn.createChannel((err, ch) => {
ch.assertExchange('fruit.direct', 'direct');
ch.assertQueue('orange-pub',{durable: true});
ch.bindQueue('orange-pub', 'fruit.direct', 'yummy.orange');
// 큐 이름, exchange이름, 라우팅키
});
- assertExchange() : fruit.direct라는 exchange를 생성하고 타입은 direct로 설정한다.
- assertQueue() : orange-pub라는 큐를 생성하고 durable옵션을 true로 설정한다.
- bindQueue() : orange-pub 큐에 fruit.direct라는 exchange를 바인딩하고 routing key는 yummy.orange로 설정한다.
큐에 exchange를 바인딩한 후 메시지를 전달할 때, 라우팅 키가 일치하는 메시지는 orange-pub라는 큐에 전달된다.
2. 큐에 메시지 publish하기
conn.createChannel((err, ch) => {
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {옵션});
// exchange이름, 라우팅 키, 메시지 내용, 옵션
});
- publish() : exchange에 따라 일치하는 라우팅 키를 가진 큐에 메시지를 전달한다. 메시지를 전달할 때는 Buffer타입으로 전송해야한다.
- headers : 메시지에 헤더값을 지정하는 옵션이다. key, value 형식으로 값을 추가한다. exchange의 타입이 headers일 경우, 이 헤더값을 통해 전달될 큐가 정해진다.
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
} // header에 key, value 형식으로 값을 추가한다.
})
- contentType : 메시지의 MIME타입을 지정한다.
- timestamp : 메시지의 시간값을 지정한다. 정수형으로 입력해야한다.
- expiration : 보통 메시지 큐는 consume()을 통해 받은 메시지를 소비하는데 expiration을 사용하면 일정 시간이 지난 메시지는 자동으로 제거한다. ms단위로 지정한다.(1000ms = 1초)
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
}, // header에 key, value 형식으로 값을 추가한다.
contentType: 'text/plain', // MIME 타입 지정
timestamp: 1680349939463, // 시간
expiration: 5000, // 메시지 만료 시간(ms)
})
2-1. 특정 큐에 메시지 전달
sendToQueue를 사용하면 지정된 큐에 메시지를 전달할 수 있다.
conn.createChannel((err, ch) => {
ch.sendToQueue("큐 이름", "메시지 내용", {옵션});
});
지정된 큐에 메시지를 전달한다. publish() 할 때와 마찬가지로 메시지는 Buffer 형식으로 전달되야 한다. sendToQueue는 exchange의 direct와 비슷하다. 둘다 이름이 일치해야 전달되기 때문이다.
하지만 exchange는 routing key를, sendToQueue는 큐 이름의 일치 여부에 따라 전달된다는 차이점이 존재한다.
3. 메시지 확인
큐에 전달된 메시지는 consume()을 사용하거나 get()을 사용하여 확인할 수 있다.
- consume() : 큐에 전달된 메시지를 비동기식으로 처리하며 여러개의 메시지를 한번에 consume 할 수 있다. get()보다 성능적으로 우수하기에 메시지를 소비하기에 가장 이상적인 방법이다.
ch.prefetch(1); // 한번에 consume할 메시지 수를 정의한다.
ch.consume("orange-pub", (msg) => {
console.log("Fields: ", msg.fields); // exchange, routing key 등의 정보
console.log("Properties: ", msg.properties); // cotentType, headers 등의 정보
console.log("Content: ", (msg.content).toString()); // 메시지 내용
}, {
'noAck': true // 옵션
})
- prefetch : 한번에 consume될 메시지의 수를 정의한다. prefetch(5)로 설정하면 5개의 메시지가 한번에 consume처리된다.
- fields : exchange, routing key, 지정된 tag 등의 정보를 출력한다.
- properties : 메시지를 publish할 때 같이 전송된 설정값들을 출력한다. headers, timestamp, contentType등을 출력한다.
- content : 메시지 내용을 출력한다. toString()없이 출력하면 Buffer 형식으로 출력된다. 메시지 내용이 문자면 toString()을 추가하여 출력한다. 만약 메시지 내용이 JSON 형식이면 JSON.parse(msg.content)로 출력한다.
- noAck : consume된 메시지가 처리 완료되었는지의 유무를 나타낸다. true로 설정시, 전달된 메시지가 승인처리되어 큐에서 사라지게 된다. false로 설정시, 아직 완료작업이 이루어지지 않았다는 것을 의미하며 해당 메시지를 대기열에 추가하여 다음 consume때 처리한다.
- get : consume과 비슷하게 큐에 전달된 메시지를 처리하는 메서드이지만 동기식으로 처리하고 메시지를 한개씩만 처리하기에 권장되지 않는 방법이다.
ch.get("orange-pub", {
'noAck': true
}, (err, msg) => {
console.log("Fields: ", msg.fields);
console.log("Properties: ", msg.properties);
console.log("Content: ", JSON.parse(msg.content));
})
참고 사이트 :
https://amqp-node.github.io/amqplib/
amqplib | AMQP 0-9-1 library and client for Node.JS
AMQP 0-9-1 library and client for Node.JS amqplib implements the machinery needed to make clients for AMQP 0-9-1, and includes such a client. Why phrase it that way around? Because AMQP is complicated enough that there are a few different ways of presentin
amqp-node.github.io
https://livebook.manning.com/book/rabbitmq-in-depth/chapter-5/11
Chapter 5. Don’t get messages; consume them · RabbitMQ in Depth
Consuming messages · Tuning consumer throughput · When consumers and queues are exclusive · Specifying a quality of service for your consumers
livebook.manning.com
https://stackoverflow.com/questions/31915773/rabbitmq-what-are-ready-and-unacked-types-of-messages
RabbitMQ" What are "Ready" and "Unacked" types of messages?
I'm getting confused between these two types of messages in RabbitMQ. I've seen that some of my queues have 0 "Unacked" and 1000 "Ready" messages, while some have 1000 "Unacked" and 0 "Ready" mess...
stackoverflow.com
what is the difference between assertQueue and send To Queue in RabbitMq
I have found Example for Sending String in rabittMq and Receiving from the queue but I am not clear about about these methods - assertQueue , sendToQueue send.js var amqp = require('amqplib/
stackoverflow.com
https://www.cloudamqp.com/blog/rabbitmq-basic-consume-vs-rabbitmq-basic-get.html
FAQ: RabbitMQ Basic Consumer vs. RabbitMQ Basic Get - CloudAMQP
RabbitMQ offers two ways to receive messages which are the polling-based basic.get and the push-based basic.consume. How to decide between using a consumer or a get request depends on the workload that each creates.
www.cloudamqp.com
'Messaging Broker > RabbitMQ' 카테고리의 다른 글
RabbitMQ - Exchange 생성 및 Type 정리 (0) | 2023.03.26 |
---|---|
RabbitMQ - Node.js로 RabbitMQ 연결하기 (0) | 2023.03.22 |
RabbitMQ - 설치 및 계정 사용법 (0) | 2023.03.01 |
RabbitMQ - Stats in management UI are disabled on this node (0) | 2023.02.13 |