2021/12/13

[Rust]BitcoindのZMQを使って、新しいblockの検知をする

bitcoinrustzeromq

概要

bitcoinのblock生成を検知して、処理をキックする部分のRustでの実装になります。

実施事項

bitcoin.confの編集

以下のような設定を bitcoin.conf に記載しておきます。

# zmqpubrawtx=tcp://0.0.0.0:28332
# zmqpubrawblock=tcp://0.0.0.0:28332
# zmqpubhashtx=tcp://0.0.0.0:28332
zmqpubhashblock=tcp://0.0.0.0:28332

こうすることで、hashblockのイベントをzeromqにpublishすることができます。

rust implementations

以前実装したpython版のRust版です。

Cargo.toml

blocking = "1.1.0"
zeromq = "0.3.1"
hex = "0.4.3"
tokio = { version = "1.12", features = ["full"] }
futures = "0.3"

rust codes

use std::thread;
use std::time::Duration;
use tokio::signal::unix::{signal, SignalKind};
use zeromq::{Socket, SocketRecv};

const TOPIC_NAME: &str = "hashblock";

#[tokio::main]
async fn main() {
    let mut sigint = signal(SignalKind::interrupt()).unwrap();
    let mut sigterm = signal(SignalKind::terminate()).unwrap();
    let mut socket = zeromq::SubSocket::new();
    let zmq_endpoint = "tcp://127.0.0.1:28332";
    loop {
        let tick = async {
            blocking::unblock(move || {
                thread::sleep(Duration::from_secs(5));
            })
            .await
        };
        tokio::select! {
            _ = sigint.recv() => {
                println!("sigint detected");
                return;
            }
            _ = sigterm.recv() => {
                println!("sigterm detected");
                return;
            }
            _ = tick => {
                println!("next tick");
                let connect_result = socket.connect(zmq_endpoint).await;
                if let Err(err) = connect_result {
                    println!("zmq connection error: {:?}", err);
                    continue;
                } else {
                    break;
                }
            }
        }
    }
    socket
        .subscribe(TOPIC_NAME)
        .await
        .expect("must subscribe hashblock topic...");
    println!("{} topic subscription started!", TOPIC_NAME);
    loop {
        tokio::select! {
            _ = sigint.recv() => {
                println!("sigint detected");
                break
            }
            _ = sigterm.recv() => {
                println!("sigterm detected");
                break
            }
            msg = socket.recv() => {
                if let Ok(msg) = msg {
                    let topic: String = String::from_utf8(msg.get(0).unwrap().to_vec()).expect("topic name bytes must be utf8");
                    let hex_bytes: Vec<u8> = msg.get(1).unwrap().to_vec();
                    let block_hash = hex::encode(hex_bytes);
                    println!("{}: {}", topic.as_str(), block_hash.as_str());

                    /*
                     * TODO ここで、block_hashに対して処理を行います。
                     * 私は、独自のmessage queue(kafkaとかrabbitmqとか)にこのblockに対して処理をするようなtaskを登録するような処理を記述しています。
                     */
                }
            }
        }
    }
    let _ = socket.close().await;
}

上記のコードはかなりデフォルメしており、エラーハンドリングなどは端折っています。。
接続先等はconfigファイルに記載しているものを利用できるようにしたり、
そもそもmainではなくuse case structを作って、use caseとして登録して呼び出すようにしていますので、参考にされる際はご注意ください。

以上です。