2020/07/15
[Python]RabbitMQにメッセージをProduce/Consumeしてみた
概要
RabbitMQはceleryのBrokerとして普段利用しています。
この時は、特段自前のソースコードで、メッセージのPub/Subをする必要がないのですが…
今回、システム間連携で利用する必要が出てきたので、Tutorialなども参考にしながらコーディングしました。
実装
MessageのProduce
- Producer Classを定義
import pika
import json
class RabbitMQMessageProducer:
def __init__(self, message_broker_url="amqp://guest:[email protected]:5672/vhost_name", exchange_name=""):
params = pika.URLParameters(message_broker_url)
self._connection = pika.BlockingConnection(params)
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=exchange_name, exchange_type="direct", durable=True)
def __del__(self):
if self._connection and self._connection.is_open:
self._connection.close()
def publish_message(self, message, exchange_name: str = "", routing_key: str = ""):
self._channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2, # make message persistent
),
)
- Producer Instanceからメッセージを送信
message_producer = RabbitMQMessageProducer()
message_producer.publish_message(message={"test": True})
MessageのConsume
- Consumer Classを定義
import pika
from app.core.config import settings
class RabbitMQMessageConsumer:
def __init__(self, message_broker_url="amqp://guest:[email protected]:5672/vhost_name", exchange_name: str = "", routing_key: str = ""):
params = pika.URLParameters(message_broker_url)
self._connection = pika.BlockingConnection(params)
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=exchange_name, exchange_type="direct", durable=True)
result = self._channel.queue_declare(queue=routing_key, exclusive=True, durable=True)
self._queue_name = result.method.queue
print(self._queue_name)
self._channel.queue_bind(
exchange=exchange_name, queue=self._queue_name, routing_key=routing_key)
def __del__(self):
if self._connection and self._connection.is_open:
self._connection.close()
def start(self, callback, auto_ack=True):
self._channel.basic_consume(queue=self._queue_name, on_message_callback=callback, auto_ack=auto_ack)
self._channel.start_consuming()
- Consumer Instanceを使って、メッセージを受信
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
ch.basic_ack(delivery_tag=method.delivery_tag)
consumer = RabbitMQMessageConsumer()
consumer.start(callback=callback, auto_ack=False)
以上になります
関連する記事
[Python]ハイフンなし電話番号からハイフン付きに復元
Pythonでハイフンなしの日本の電話番号をハイフン付きのものに変換する
[Python]BeautifulSoup4でhtmlの解析
BeautifulSoup4というPythonのライブラリを使って、特定のURLのコンテンツを取得し、タイトルや説明文を取得できるようにしました。
[Python]銀行コードと支店コードの取扱
Pythonで銀行コード、支店コードデータを取り扱う便利なライブラリzengin-codeを導入しました。
Sendgridを使ってメールの受信を行う
Inbound Email Parse Webhookという機能を利用してメールを受信したらWebhookを呼び出すようにしました