Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- docker
- linux
- 기초 수학
- CentOS
- webpack
- Backbone.js
- phpredis
- Switch
- python
- deep learning
- rabbitmq
- nodejs
- nginx
- NCP
- fastapi
- Machine Learning
- For
- Babel
- Go
- 블레이드 템플릿
- Redis
- laravel
- AWS
- php
- mariadb
- SQL
- Redux
- javascript
- Node
- React
Archives
- Today
- Total
개발일기
RabbitMQ - Queue에 메시지 전달 및 처리하기 본문
728x90
반응형
1. Queue 생성
import dotenv from 'dotenv';
import amqplib from 'amqplib/callback_api.js'
dotenv.config();
amqplib.connect({
'hostname': process.env.RABBITMQ_HOST,
'username': process.env.RABBITMQ_ID,
'password': process.env.RABBITMQ_PASSWORD,
'port': process.env.RABBITMQ_PORT,
'vhost': process.env.RABBITMQ_VHOST,
}, (err, conn) => {
if(err) {
console.log("Err: ", err);
return false;
}
conn.createChannel((err, ch) => {
if(err) {
console.log("Err: ", err);
return false;
}
ch.assertQueue("큐 이름", {옵션}); // queue를 생성하려면 assertQueue를 사용한다.
});
})
- assertQueue() : 큐를 생성한다.
- durable : true로 설정시, RabbitMQ를 재시작해도 해당 큐가 존재한다. 기본값은 true이다.
- messageTtl : 큐에 도착한 메시지가 지정한 시간 이후에 삭제된다. ms단위로 지정한다.(1000ms = 1초)
- expires : 큐에 대한 작업이 일정시간 없을 경우 큐를 자동으로 삭제한다. 큐가 존재하는지 확인하는 checkQueue(), 큐의 메시지를 확인하는 get()등을 호출할 경우, 그 시점 이후로 expires 시간이 작동한다. 큐를 생성할 때, expires를 5초로 설정, 생성한지 3초 후에 checkQueue로 큐 존재 확인을 하는 간단한 로직을 실행하면 생성된 큐는 8초 후에 삭제가 된다. checkQueue로 인해 큐 삭제 시간이 늦춰졌기 때문이다. 또한 expires 옵션을 설정할 때는 ms단위로 지정한다.(1000ms = 1초)
- maxLength : 큐에 보관할 수 있는 최대 메시지 개수를 제한한다. 지정된 개수를 초과하는 메시지가 큐에 publish되면 해당 메시지를 버린다.
1-1. 큐에 exchange 바인딩
conn.createChannel((err, ch) => {
ch.assertExchange('fruit.direct', 'direct');
ch.assertQueue('orange-pub',{durable: true});
ch.bindQueue('orange-pub', 'fruit.direct', 'yummy.orange');
// 큐 이름, exchange이름, 라우팅키
});
- assertExchange() : fruit.direct라는 exchange를 생성하고 타입은 direct로 설정한다.
- assertQueue() : orange-pub라는 큐를 생성하고 durable옵션을 true로 설정한다.
- bindQueue() : orange-pub 큐에 fruit.direct라는 exchange를 바인딩하고 routing key는 yummy.orange로 설정한다.
큐에 exchange를 바인딩한 후 메시지를 전달할 때, 라우팅 키가 일치하는 메시지는 orange-pub라는 큐에 전달된다.
2. 큐에 메시지 publish하기
conn.createChannel((err, ch) => {
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {옵션});
// exchange이름, 라우팅 키, 메시지 내용, 옵션
});
- publish() : exchange에 따라 일치하는 라우팅 키를 가진 큐에 메시지를 전달한다. 메시지를 전달할 때는 Buffer타입으로 전송해야한다.
- headers : 메시지에 헤더값을 지정하는 옵션이다. key, value 형식으로 값을 추가한다. exchange의 타입이 headers일 경우, 이 헤더값을 통해 전달될 큐가 정해진다.
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
} // header에 key, value 형식으로 값을 추가한다.
})
- contentType : 메시지의 MIME타입을 지정한다.
- timestamp : 메시지의 시간값을 지정한다. 정수형으로 입력해야한다.
- expiration : 보통 메시지 큐는 consume()을 통해 받은 메시지를 소비하는데 expiration을 사용하면 일정 시간이 지난 메시지는 자동으로 제거한다. ms단위로 지정한다.(1000ms = 1초)
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
headers: {
'price': 1000,
'count': 3
}, // header에 key, value 형식으로 값을 추가한다.
contentType: 'text/plain', // MIME 타입 지정
timestamp: 1680349939463, // 시간
expiration: 5000, // 메시지 만료 시간(ms)
})
2-1. 특정 큐에 메시지 전달
sendToQueue를 사용하면 지정된 큐에 메시지를 전달할 수 있다.
conn.createChannel((err, ch) => {
ch.sendToQueue("큐 이름", "메시지 내용", {옵션});
});
지정된 큐에 메시지를 전달한다. publish() 할 때와 마찬가지로 메시지는 Buffer 형식으로 전달되야 한다. sendToQueue는 exchange의 direct와 비슷하다. 둘다 이름이 일치해야 전달되기 때문이다.
하지만 exchange는 routing key를, sendToQueue는 큐 이름의 일치 여부에 따라 전달된다는 차이점이 존재한다.
3. 메시지 확인
큐에 전달된 메시지는 consume()을 사용하거나 get()을 사용하여 확인할 수 있다.
- consume() : 큐에 전달된 메시지를 비동기식으로 처리하며 여러개의 메시지를 한번에 consume 할 수 있다. get()보다 성능적으로 우수하기에 메시지를 소비하기에 가장 이상적인 방법이다.
ch.prefetch(1); // 한번에 consume할 메시지 수를 정의한다.
ch.consume("orange-pub", (msg) => {
console.log("Fields: ", msg.fields); // exchange, routing key 등의 정보
console.log("Properties: ", msg.properties); // cotentType, headers 등의 정보
console.log("Content: ", (msg.content).toString()); // 메시지 내용
}, {
'noAck': true // 옵션
})
- prefetch : 한번에 consume될 메시지의 수를 정의한다. prefetch(5)로 설정하면 5개의 메시지가 한번에 consume처리된다.
- fields : exchange, routing key, 지정된 tag 등의 정보를 출력한다.
- properties : 메시지를 publish할 때 같이 전송된 설정값들을 출력한다. headers, timestamp, contentType등을 출력한다.
- content : 메시지 내용을 출력한다. toString()없이 출력하면 Buffer 형식으로 출력된다. 메시지 내용이 문자면 toString()을 추가하여 출력한다. 만약 메시지 내용이 JSON 형식이면 JSON.parse(msg.content)로 출력한다.
- noAck : consume된 메시지가 처리 완료되었는지의 유무를 나타낸다. true로 설정시, 전달된 메시지가 승인처리되어 큐에서 사라지게 된다. false로 설정시, 아직 완료작업이 이루어지지 않았다는 것을 의미하며 해당 메시지를 대기열에 추가하여 다음 consume때 처리한다.
- get : consume과 비슷하게 큐에 전달된 메시지를 처리하는 메서드이지만 동기식으로 처리하고 메시지를 한개씩만 처리하기에 권장되지 않는 방법이다.
ch.get("orange-pub", {
'noAck': true
}, (err, msg) => {
console.log("Fields: ", msg.fields);
console.log("Properties: ", msg.properties);
console.log("Content: ", JSON.parse(msg.content));
})
참고 사이트 :
https://amqp-node.github.io/amqplib/
https://livebook.manning.com/book/rabbitmq-in-depth/chapter-5/11
https://stackoverflow.com/questions/31915773/rabbitmq-what-are-ready-and-unacked-types-of-messages
https://www.cloudamqp.com/blog/rabbitmq-basic-consume-vs-rabbitmq-basic-get.html
728x90
반응형
'Messaging Broker > RabbitMQ' 카테고리의 다른 글
RabbitMQ - Exchange 생성 및 Type 정리 (0) | 2023.03.26 |
---|---|
RabbitMQ - Node.js로 RabbitMQ 연결하기 (0) | 2023.03.22 |
RabbitMQ - 설치 및 계정 사용법 (0) | 2023.03.01 |
RabbitMQ - Stats in management UI are disabled on this node (0) | 2023.02.13 |
Comments