개발일기

RabbitMQ - Queue에 메시지 전달 및 처리하기 본문

Messaging Broker/RabbitMQ

RabbitMQ - Queue에 메시지 전달 및 처리하기

Flashback 2023. 4. 2. 00:20
728x90
반응형

1. Queue 생성

import dotenv from 'dotenv';
import amqplib from 'amqplib/callback_api.js'

dotenv.config();

amqplib.connect({
    'hostname': process.env.RABBITMQ_HOST,
    'username': process.env.RABBITMQ_ID,
    'password': process.env.RABBITMQ_PASSWORD,
    'port': process.env.RABBITMQ_PORT,
    'vhost': process.env.RABBITMQ_VHOST,
}, (err, conn) => {
    if(err) {
        console.log("Err: ", err);
        return false;
    }

    conn.createChannel((err, ch) => {
        if(err) {
            console.log("Err: ", err);
            return false;
        }
        
        ch.assertQueue("큐 이름", {옵션}); // queue를 생성하려면 assertQueue를 사용한다.
    });
})
  • assertQueue() : 큐를 생성한다.
    • durable : true로 설정시, RabbitMQ를 재시작해도 해당 큐가 존재한다. 기본값은 true이다.
    • messageTtl : 큐에 도착한 메시지가 지정한 시간 이후에 삭제된다. ms단위로 지정한다.(1000ms = 1초)
    • expires : 큐에 대한 작업이 일정시간 없을 경우 큐를 자동으로 삭제한다. 큐가 존재하는지 확인하는 checkQueue(), 큐의 메시지를 확인하는 get()등을 호출할 경우, 그 시점 이후로 expires 시간이 작동한다. 큐를 생성할 때, expires를 5초로 설정, 생성한지 3초 후에 checkQueue로 큐 존재 확인을 하는 간단한 로직을 실행하면 생성된 큐는 8초 후에 삭제가 된다. checkQueue로 인해 큐 삭제 시간이 늦춰졌기 때문이다. 또한 expires 옵션을 설정할 때는 ms단위로 지정한다.(1000ms = 1초)
    • maxLength : 큐에 보관할 수 있는 최대 메시지 개수를 제한한다. 지정된 개수를 초과하는 메시지가 큐에 publish되면 해당 메시지를 버린다.

1-1. 큐에 exchange 바인딩

conn.createChannel((err, ch) => {
    ch.assertExchange('fruit.direct', 'direct');
    ch.assertQueue('orange-pub',{durable: true});
    ch.bindQueue('orange-pub', 'fruit.direct', 'yummy.orange');
    // 큐 이름, exchange이름, 라우팅키
});
  1. assertExchange() :  fruit.direct라는 exchange를 생성하고 타입은 direct로 설정한다.
  2. assertQueue() : orange-pub라는 큐를 생성하고 durable옵션을 true로 설정한다.
  3. bindQueue() : orange-pub 큐에 fruit.direct라는 exchange를 바인딩하고 routing key는 yummy.orange로 설정한다.

큐에 exchange를 바인딩한 후 메시지를 전달할 때, 라우팅 키가 일치하는 메시지는 orange-pub라는 큐에 전달된다.

 

2. 큐에 메시지 publish하기

conn.createChannel((err, ch) => {
    ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {옵션});
    // exchange이름, 라우팅 키, 메시지 내용, 옵션
});
  • publish() : exchange에 따라 일치하는 라우팅 키를 가진 큐에 메시지를 전달한다. 메시지를 전달할 때는 Buffer타입으로 전송해야한다.

 

  • headers : 메시지에 헤더값을 지정하는 옵션이다. key, value 형식으로 값을 추가한다. exchange의 타입이 headers일 경우, 이 헤더값을 통해 전달될 큐가 정해진다.
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
    headers: {
        'price': 1000,
        'count': 3
    } // header에 key, value 형식으로 값을 추가한다.
})
  • contentType : 메시지의 MIME타입을 지정한다.
  • timestamp : 메시지의 시간값을 지정한다. 정수형으로 입력해야한다.
  • expiration : 보통 메시지 큐는 consume()을 통해 받은 메시지를 소비하는데 expiration을 사용하면 일정 시간이 지난 메시지는 자동으로 제거한다. ms단위로 지정한다.(1000ms = 1초)
ch.publish('fruit.direct', 'yummy.orange', Buffer.from('맛있는 오렌지'), {
    headers: {
        'price': 1000,
        'count': 3
    }, // header에 key, value 형식으로 값을 추가한다.
    contentType: 'text/plain', // MIME 타입 지정
    timestamp: 1680349939463, // 시간
    expiration: 5000, // 메시지 만료 시간(ms)
})

 

2-1. 특정 큐에 메시지 전달

sendToQueue를 사용하면 지정된 큐에 메시지를 전달할 수 있다.

conn.createChannel((err, ch) => {
    ch.sendToQueue("큐 이름", "메시지 내용", {옵션});
});

지정된 큐에 메시지를 전달한다. publish() 할 때와 마찬가지로 메시지는 Buffer 형식으로 전달되야 한다. sendToQueue는 exchange의 direct와 비슷하다. 둘다 이름이 일치해야 전달되기 때문이다.

하지만 exchange는 routing key를, sendToQueue는 큐 이름의 일치 여부에 따라 전달된다는 차이점이 존재한다.

 

3. 메시지 확인

큐에 전달된 메시지는 consume()을 사용하거나 get()을 사용하여 확인할 수 있다.

  • consume() : 큐에 전달된 메시지를 비동기식으로 처리하며 여러개의 메시지를 한번에 consume 할 수 있다. get()보다 성능적으로 우수하기에 메시지를 소비하기에 가장 이상적인 방법이다.
ch.prefetch(1); // 한번에 consume할 메시지 수를 정의한다.

ch.consume("orange-pub", (msg) => {
    console.log("Fields: ", msg.fields); // exchange, routing key 등의 정보
    console.log("Properties: ", msg.properties); // cotentType, headers 등의 정보
    console.log("Content: ", (msg.content).toString()); // 메시지 내용
}, {
    'noAck': true // 옵션
})
  • prefetch : 한번에 consume될 메시지의 수를 정의한다. prefetch(5)로 설정하면 5개의 메시지가 한번에 consume처리된다.
  • fields : exchange, routing key, 지정된 tag 등의 정보를 출력한다.
  • properties : 메시지를 publish할 때 같이 전송된 설정값들을 출력한다. headers, timestamp, contentType등을 출력한다.
  • content : 메시지 내용을 출력한다. toString()없이 출력하면 Buffer 형식으로 출력된다. 메시지 내용이 문자면 toString()을 추가하여 출력한다. 만약 메시지 내용이 JSON 형식이면 JSON.parse(msg.content)로 출력한다.
  • noAck : consume된 메시지가 처리 완료되었는지의 유무를 나타낸다. true로 설정시, 전달된 메시지가 승인처리되어 큐에서 사라지게 된다. false로 설정시, 아직 완료작업이 이루어지지 않았다는 것을 의미하며 해당 메시지를 대기열에 추가하여 다음 consume때 처리한다.
  • get : consume과 비슷하게 큐에 전달된 메시지를 처리하는 메서드이지만 동기식으로 처리하고 메시지를 한개씩만 처리하기에 권장되지 않는 방법이다.
ch.get("orange-pub", {
    'noAck': true
}, (err, msg) => {
    console.log("Fields: ", msg.fields);
    console.log("Properties: ", msg.properties);
    console.log("Content: ", JSON.parse(msg.content));
})

 


참고 사이트 : 

https://amqp-node.github.io/amqplib/

 

amqplib | AMQP 0-9-1 library and client for Node.JS

AMQP 0-9-1 library and client for Node.JS amqplib implements the machinery needed to make clients for AMQP 0-9-1, and includes such a client. Why phrase it that way around? Because AMQP is complicated enough that there are a few different ways of presentin

amqp-node.github.io

 

https://livebook.manning.com/book/rabbitmq-in-depth/chapter-5/11

 

Chapter 5. Don’t get messages; consume them · RabbitMQ in Depth

Consuming messages · Tuning consumer throughput · When consumers and queues are exclusive · Specifying a quality of service for your consumers

livebook.manning.com

 

https://stackoverflow.com/questions/31915773/rabbitmq-what-are-ready-and-unacked-types-of-messages

 

RabbitMQ" What are "Ready" and "Unacked" types of messages?

I'm getting confused between these two types of messages in RabbitMQ. I've seen that some of my queues have 0 "Unacked" and 1000 "Ready" messages, while some have 1000 "Unacked" and 0 "Ready" mess...

stackoverflow.com

 

https://stackoverflow.com/questions/50323848/what-is-the-difference-between-assertqueue-and-send-to-queue-in-rabbitmq

 

what is the difference between assertQueue and send To Queue in RabbitMq

I have found Example for Sending String in rabittMq and Receiving from the queue but I am not clear about about these methods - assertQueue , sendToQueue send.js var amqp = require('amqplib/

stackoverflow.com

 

https://www.cloudamqp.com/blog/rabbitmq-basic-consume-vs-rabbitmq-basic-get.html

 

FAQ: RabbitMQ Basic Consumer vs. RabbitMQ Basic Get - CloudAMQP

RabbitMQ offers two ways to receive messages which are the polling-based basic.get and the push-based basic.consume. How to decide between using a consumer or a get request depends on the workload that each creates.

www.cloudamqp.com

 

728x90
반응형
Comments