2020/06/08

BitcoindのZMQを使って、新しいblockの検知をする

pythonbitcoinzeromq

概要

bitcoinサービスまわりのリファクタリング中です。
bitcoinのblock生成を検知して、処理をキックする部分を書き換えました。
zeromqとpythonを使用しています。

実施事項

bitcoin.confの編集

以下のような設定を bitcoin.conf に記載しておきます。

# zmqpubrawtx=tcp://127.0.0.1:28332
# zmqpubrawblock=tcp://127.0.0.1:28332
# zmqpubhashtx=tcp://127.0.0.1:28332
zmqpubhashblock=tcp://127.0.0.1:28332

こうすることで、hashblockのイベントをzmqにpublishすることができます。

python script

基本的には、bitcoin/zmq_sub.pyのファイルを参考にして検知していきます。

ZMQHandler classを作成

class ZMQHandler:
    def __init__(self, host='127.0.0.1', port=28332):
        self.loop = asyncio.get_event_loop()
        self.zmqContext = zmq.asyncio.Context()
        self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
        # subscribe "hashblock" event
        self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
        self.zmqSubSocket.connect(f"tcp://{host}:{port}")
        logger.info(f"connected to tcp://{host}:{port}")

    async def handle(self):
        msg = await self.zmqSubSocket.recv_multipart()
        topic = msg[0]
        body = msg[1]
        sequence = "Unknown"
        if len(msg[-1]) == 4:
            msg_sequence = struct.unpack('<I', msg[-1])[-1]
            sequence = str(msg_sequence)
        if topic == b"hashblock":
            block_hash = binascii.hexlify(body)
            logger.info(f"- HASH BLOCK ({sequence}) block_hash: \"{block_hash}\" -")
            # TODO ここで、webhookしたり、celery taskを呼んだりすればOK
        asyncio.ensure_future(self.handle())

    def start(self):
        self.loop.add_signal_handler(signal.SIGINT, self.stop)
        self.loop.create_task(self.handle())
        self.loop.run_forever()

    def stop(self):
        self.loop.stop()
        self.zmqContext.destroy()

さらに、zmqにつながるかチェックして、つながるまで待機するscriptを書いておきます。(dockerやk8sなどで起動まで待つみたいなケースは結構あるので便利です。)

def wait_for(host='127.0.0.1', port=28332, timeout=15.0):
    start_time = time.perf_counter()
    while True:
        try:
            logger.info("check connection tcp://%s:%d", host, port)
            with socket.create_connection((host, port), timeout=timeout):
                break
        except OSError as ex:
            time.sleep(1.0)
            if time.perf_counter() - start_time >= timeout:
                raise TimeoutError(
                    f"Waited too long for the port {port} on host {host} to start accepting connections.") from ex

あとはこれを呼び出すような感じです。

if __name__ == "__main__":
    wait_for(host="127.0.0.1", port=28332)
    daemon = ZMQHandler(host="127.0.0.1", port=28332)
    daemon.start()

対象デフォルメ(エラー処理の部分やconfig部分を省略)していますが、以上になります。