2021/07/11
NATS JetStreamサーバーをローカルで立てて、Rustからアクセスしてみる
前回の記事ではnats-streaming-serverを利用していましたが、
よくみたら非推奨(deprecated)だったようで、nats jetstreamを使うようにしました。
nats jetstreamはat least onceを実現するための仕組みが備わっており、publishされたメッセージをstorage(fileとかmemoryとか)に一時的に保存できるようになっています。
- Stream
- Consumer
という2つの機能をうまく使って、様々な要求のデータストリーミングパイプラインを構築することができるみたいです。
StreamとConsumerの概要図
今回は、JetStreamサーバーをdocker-composeで立ち上げ、Rustを使ってテストしてみましたのでそれの紹介になります。
docker-compose
version: "3.7"
services:
nats-jetstream:
image: "nats:2.3.2-alpine"
# see https://docs.nats.io/jetstream/getting_started/using_docker
command:
- "--port"
- "4222"
- "--http_port"
- "8222"
- "--jetstream" # このフラグを立てると、jetstream機能が動きます
- "--store_dir"
- "/data"
- "--debug"
ports:
- "4222:4222"
- "8222:8222"
volumes:
- ".docker-data/nats-jet-stream:/data"
コマンドラインツールを使ってstreamとconsumerを作成する
natsのインストール
% brew tap nats-io/nats-tools
% brew install nats-io/nats-tools/nats
% nats --help
Streamの作成
% nats str ls
No Streams defined
# ORDERSという名前で、Streamを作成します。(subjectはORDERS.*となるようにします)
% nats str add ORDERS \
--subjects "ORDERS.*" \
--ack \
--max-msgs=-1 \
--max-bytes=-1 \
--max-age=1y \
--storage file \
--retention limits \
--max-msg-size=-1 \
--discard old \
--dupe-window="0s" \
--replicas 1
# ORDERSが作られていることを確認
% nats str ls
Streams:
ORDERS
Consumerの作成
% nats con ls ORDERS
No Consumers defined
# NEWという名前でORDERS StreamのConsumerを作成します
# ORDERS.addという件名をwatchするようにします
% nats con add ORDERS NEW \
--filter ORDERS.add \
--ack explicit \
--pull \
--deliver all \
--sample 100 \
--max-deliver 20
# NEWという名前でORDERS StreamのConsumerが作成されていることを確認
% nats con info ORDERS NEW
Information for Consumer ORDERS > NEW created 2021-07-10T20:01:07+09:00
-- 以下略 --
Pub/Sub
コマンドラインからもメッセージのpub/subができるみたいです。
# pub
nats pub ORDERS.add "your message..."
# sub
nats con next ORDERS NEW
Rustからアクセス
Cargo.tomlにdependenciesを追加
[dependencies]
nats = { version = "0.9.18", features = ["jetstream"] } # jetstream featureが必要でした
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.8.0", features = ["full"] }
blocking = "1.0.2"
dotenv = "0.15.0"
main.rsを作成
以下対応してみました。
- Gracefulシャットダウン(Ctrl+Cを押してもすぐに処理を終了せず、現在処理しているメッセージの処理完了をまつ)
tokio::select!
を使って実現しています
- jetstream consumerを使って、メッセージの読み込み
use dotenv::dotenv;
use std::env;
use serde::{Deserialize, Serialize};
use tokio::signal::unix::{signal, SignalKind};
use nats::jetstream::{Consumer, ConsumerConfig};
use blocking::unblock;
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct AddParams {
a: i32,
b: i32,
}
/// cargo run --package nats-workaround --bin nats-workaround
#[tokio::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
consume().await
}
async fn consume() -> std::io::Result<()> {
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
let nats_url = env::var("NATS_URL").unwrap_or("127.0.0.1".into());
let nc = nats::connect(&nats_url)?;
loop {
let nc = nc.clone();
let next_msg = async {
let mut consumer = Consumer::existing(nc, "ORDERS", ConsumerConfig {
durable_name: Some("NEW".to_string()),
deliver_subject: None, // pull based message
..Default::default()
}).expect("consumer must be initialized...");
let msg = unblock(move || {
let msg = consumer.pull();
if let Ok(msg) = msg {
let next_id = msg.jetstream_message_info().unwrap().stream_seq;
if consumer.dedupe_window.already_processed(next_id) {
let _dont_care = msg.ack();
return None
}
Some(msg)
} else {
None
}
}).await;
msg
};
tokio::select! {
_ = sigint.recv() => {
println!("sigint");
break
}
_ = sigterm.recv() => {
println!("sigterm");
break
}
msg = next_msg => {
if let Some(msg) = msg {
println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
let params: AddParams = serde_json::from_slice(&msg.data).unwrap();
println!("let's sleep");
for n in 1..10 {
std::thread::sleep(std::time::Duration::from_millis(1000));
println!("{}", n);
}
println!("finished!");
println!("parsed {:?}", params);
let _dont_care = msg.ack();
}
}
}
}
Ok(())
}
こちらのコードをベースにして、スケーラブルな非同期タスク実行システムを構築したいなと思います。
(エラーハンドリングなどはまだ未実装なので、こちらをベースに肉付けしていこうと思います。)
以上です。
関連する記事
Concordiumノードをローカルで動かしてみた
Concordiumの調査のために、ローカルでソースコードをビルドしてノードを動かしてみました
NATS JetStream Controllerを使ってNATSをGKEにデプロイする
helm chartのnackを使って、NATS JetStreamサーバーをデプロイして、Stream/Consumerをk8sリソースとして管理する
[Rust]axumとdragonflyを使ったWebsocket Chatのサンプル実装
redis互換のdragonflyをPUBSUBとして利用して、Websocket Chatアプリのサンプル実装を行いました。
[Rust]TiDBを使ったサンプルアプリケーションの実装
RustからTiDBを使ったアプリケーションの実装を行いました。