2020/07/15

[Python]RabbitMQにメッセージをProduce/Consumeしてみた

pythonrabbitmq

概要

RabbitMQはceleryのBrokerとして普段利用しています。
この時は、特段自前のソースコードで、メッセージのPub/Subをする必要がないのですが…
今回、システム間連携で利用する必要が出てきたので、Tutorialなども参考にしながらコーディングしました。

実装

MessageのProduce

  • Producer Classを定義
import pika
import json


class RabbitMQMessageProducer:
    def __init__(self, message_broker_url="amqp://guest:[email protected]:5672/vhost_name", exchange_name=""):
        params = pika.URLParameters(message_broker_url)
        self._connection = pika.BlockingConnection(params)
        self._channel = self._connection.channel()
        self._channel.exchange_declare(exchange=exchange_name, exchange_type="direct", durable=True)

    def __del__(self):
        if self._connection and self._connection.is_open:
            self._connection.close()

    def publish_message(self, message, exchange_name: str = "", routing_key: str = ""):
        self._channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,  # make message persistent
            ),
        )
  • Producer Instanceからメッセージを送信
message_producer = RabbitMQMessageProducer()
message_producer.publish_message(message={"test": True})

MessageのConsume

  • Consumer Classを定義
import pika
from app.core.config import settings


class RabbitMQMessageConsumer:
    def __init__(self, message_broker_url="amqp://guest:[email protected]:5672/vhost_name", exchange_name: str = "", routing_key: str = ""):
        params = pika.URLParameters(message_broker_url)
        self._connection = pika.BlockingConnection(params)
        self._channel = self._connection.channel()
        self._channel.exchange_declare(exchange=exchange_name, exchange_type="direct", durable=True)
        result = self._channel.queue_declare(queue=routing_key, exclusive=True, durable=True)
        self._queue_name = result.method.queue
        print(self._queue_name)
        self._channel.queue_bind(
            exchange=exchange_name, queue=self._queue_name, routing_key=routing_key)

    def __del__(self):
        if self._connection and self._connection.is_open:
            self._connection.close()

    def start(self, callback, auto_ack=True):
        self._channel.basic_consume(queue=self._queue_name, on_message_callback=callback, auto_ack=auto_ack)
        self._channel.start_consuming()
  • Consumer Instanceを使って、メッセージを受信
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

consumer = RabbitMQMessageConsumer()
consumer.start(callback=callback, auto_ack=False)

以上になります