2021/12/13
[Rust]BitcoindのZMQを使って、新しいblockの検知をする
概要
bitcoinのblock生成を検知して、処理をキックする部分のRustでの実装になります。
実施事項
bitcoin.confの編集
以下のような設定を bitcoin.conf
に記載しておきます。
# zmqpubrawtx=tcp://0.0.0.0:28332
# zmqpubrawblock=tcp://0.0.0.0:28332
# zmqpubhashtx=tcp://0.0.0.0:28332
zmqpubhashblock=tcp://0.0.0.0:28332
こうすることで、hashblockのイベントをzeromqにpublishすることができます。
rust implementations
以前実装したpython版のRust版です。
Cargo.toml
blocking = "1.1.0"
zeromq = "0.3.1"
hex = "0.4.3"
tokio = { version = "1.12", features = ["full"] }
futures = "0.3"
rust codes
use std::thread;
use std::time::Duration;
use tokio::signal::unix::{signal, SignalKind};
use zeromq::{Socket, SocketRecv};
const TOPIC_NAME: &str = "hashblock";
#[tokio::main]
async fn main() {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut socket = zeromq::SubSocket::new();
let zmq_endpoint = "tcp://127.0.0.1:28332";
loop {
let tick = async {
blocking::unblock(move || {
thread::sleep(Duration::from_secs(5));
})
.await
};
tokio::select! {
_ = sigint.recv() => {
println!("sigint detected");
return;
}
_ = sigterm.recv() => {
println!("sigterm detected");
return;
}
_ = tick => {
println!("next tick");
let connect_result = socket.connect(zmq_endpoint).await;
if let Err(err) = connect_result {
println!("zmq connection error: {:?}", err);
continue;
} else {
break;
}
}
}
}
socket
.subscribe(TOPIC_NAME)
.await
.expect("must subscribe hashblock topic...");
println!("{} topic subscription started!", TOPIC_NAME);
loop {
tokio::select! {
_ = sigint.recv() => {
println!("sigint detected");
break
}
_ = sigterm.recv() => {
println!("sigterm detected");
break
}
msg = socket.recv() => {
if let Ok(msg) = msg {
let topic: String = String::from_utf8(msg.get(0).unwrap().to_vec()).expect("topic name bytes must be utf8");
let hex_bytes: Vec<u8> = msg.get(1).unwrap().to_vec();
let block_hash = hex::encode(hex_bytes);
println!("{}: {}", topic.as_str(), block_hash.as_str());
/*
* TODO ここで、block_hashに対して処理を行います。
* 私は、独自のmessage queue(kafkaとかrabbitmqとか)にこのblockに対して処理をするようなtaskを登録するような処理を記述しています。
*/
}
}
}
}
let _ = socket.close().await;
}
上記のコードはかなりデフォルメしており、エラーハンドリングなどは端折っています。。
接続先等はconfigファイルに記載しているものを利用できるようにしたり、
そもそもmainではなくuse case structを作って、use caseとして登録して呼び出すようにしていますので、参考にされる際はご注意ください。
以上です。
関連する記事
Concordiumノードをローカルで動かしてみた
Concordiumの調査のために、ローカルでソースコードをビルドしてノードを動かしてみました
[Rust]axumとdragonflyを使ったWebsocket Chatのサンプル実装
redis互換のdragonflyをPUBSUBとして利用して、Websocket Chatアプリのサンプル実装を行いました。
[Rust]TiDBを使ったサンプルアプリケーションの実装
RustからTiDBを使ったアプリケーションの実装を行いました。
[Rust]Google Cloud Storageを利用する
GCSやNFSのファイルを扱えるpackageをRustで実装しました。