2020/06/08
BitcoindのZMQを使って、新しいblockの検知をする
概要
bitcoinサービスまわりのリファクタリング中です。
bitcoinのblock生成を検知して、処理をキックする部分を書き換えました。
zeromqとpythonを使用しています。
実施事項
bitcoin.confの編集
以下のような設定を bitcoin.conf
に記載しておきます。
# zmqpubrawtx=tcp://127.0.0.1:28332
# zmqpubrawblock=tcp://127.0.0.1:28332
# zmqpubhashtx=tcp://127.0.0.1:28332
zmqpubhashblock=tcp://127.0.0.1:28332
こうすることで、hashblockのイベントをzmqにpublishすることができます。
python script
基本的には、bitcoin/zmq_sub.pyのファイルを参考にして検知していきます。
ZMQHandler
classを作成
class ZMQHandler:
def __init__(self, host='127.0.0.1', port=28332):
self.loop = asyncio.get_event_loop()
self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
# subscribe "hashblock" event
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect(f"tcp://{host}:{port}")
logger.info(f"connected to tcp://{host}:{port}")
async def handle(self):
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
sequence = "Unknown"
if len(msg[-1]) == 4:
msg_sequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msg_sequence)
if topic == b"hashblock":
block_hash = binascii.hexlify(body)
logger.info(f"- HASH BLOCK ({sequence}) block_hash: \"{block_hash}\" -")
# TODO ここで、webhookしたり、celery taskを呼んだりすればOK
asyncio.ensure_future(self.handle())
def start(self):
self.loop.add_signal_handler(signal.SIGINT, self.stop)
self.loop.create_task(self.handle())
self.loop.run_forever()
def stop(self):
self.loop.stop()
self.zmqContext.destroy()
さらに、zmqにつながるかチェックして、つながるまで待機するscriptを書いておきます。(dockerやk8sなどで起動まで待つみたいなケースは結構あるので便利です。)
def wait_for(host='127.0.0.1', port=28332, timeout=15.0):
start_time = time.perf_counter()
while True:
try:
logger.info("check connection tcp://%s:%d", host, port)
with socket.create_connection((host, port), timeout=timeout):
break
except OSError as ex:
time.sleep(1.0)
if time.perf_counter() - start_time >= timeout:
raise TimeoutError(
f"Waited too long for the port {port} on host {host} to start accepting connections.") from ex
あとはこれを呼び出すような感じです。
if __name__ == "__main__":
wait_for(host="127.0.0.1", port=28332)
daemon = ZMQHandler(host="127.0.0.1", port=28332)
daemon.start()
対象デフォルメ(エラー処理の部分やconfig部分を省略)していますが、以上になります。
関連する記事
[Python]ハイフンなし電話番号からハイフン付きに復元
Pythonでハイフンなしの日本の電話番号をハイフン付きのものに変換する
[Rust]BitcoindのZMQを使って、新しいblockの検知をする
Rustにてbitcoinのblock生成を検知して、処理をキックする部分を実装しました。
[22.0]Local環境でbitcoind@Regtestを動かしてみる
version22.0のbitcoind・Regtestネットワークをdocker-composeを使って動作確認をしました
[Python]BeautifulSoup4でhtmlの解析
BeautifulSoup4というPythonのライブラリを使って、特定のURLのコンテンツを取得し、タイトルや説明文を取得できるようにしました。