2021/07/10
NATS Streamingサーバーをローカルで立てて、Rustからアクセスしてみる
普段メッセージブローカーとしてはrabbitmq/redis/kafkaを使ってきましたが、natsが良さそうなので入門してみようと思います。
以下試してみました。
- docker-composeを使って、nats streaming serverを立てる
- rustのnatscrateを利用して接続をテスト
docker-compose
version: "3.7"
services:
nats-streaming-stan:
image: "nats-streaming:0.22.0"
# see https://docs.nats.io/nats-streaming-server/configuring/cmdline
command:
- "--cluster_id"
- "stan"
- "--http_port"
- "8222"
- "--port"
- "4222"
- "--store"
- "file"
- "--dir"
- "/data/stan/store"
- "-D" # Enable debugging output
- "-SD" # Enable STAN debugging output
# The actual total wait is: (fail count + 1) * (hb interval + hb timeout)
- "--hb_interval" # Interval at which the server sends an heartbeat to a client
- "10s"
- "--hb_timeout" # How long the server waits for a heartbeat response from the client before considering it a failed heartbeat
- "10s"
- "--hb_fail_count" # Count of failed heartbeats before server closes the client connection.
- "2"
ports:
- "4222:4222"
- "8222:8222"
volumes:
- ".docker-data/nats-streaming:/data"
参考
Rustからアクセス
ライブラリのインストール
[dependencies]
nats = "0.9.18" # <- nats・driverライブラリ
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
[dev-dependencies]
dotenv = "0.15.0"
テストコード
src/lib.rs
#[cfg(test)]
mod test {
use dotenv::dotenv;
use std::env;
use serde::{Deserialize, Serialize};
#[test]
fn test_simple_pubsub() -> std::io::Result<()> {
dotenv().ok();
let nats_url = env::var("NATS_URL").unwrap_or("localhost".into());
let nc = nats::connect(&nats_url)?;
let sub = nc.subscribe("my.subject")?;
nc.publish("my.subject", "Hello World!")?;
nc.publish("my.subject", "my message")?;
if let Some(msg) = sub.next() {
println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
assert_eq!(msg.data, "Hello World!".as_bytes())
}
if let Some(msg) = sub.next() {
println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
assert_eq!(msg.data, "my message".as_bytes())
}
Ok(())
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct Person {
first_name: String,
last_name: String,
age: u8,
}
#[test]
fn test_serde() -> std::io::Result<()> {
dotenv().ok();
let nats_url = env::var("NATS_URL").unwrap_or("localhost".into());
let nc = nats::connect(&nats_url)?;
let subj = nc.new_inbox();
let p = Person {
first_name: "derek".to_owned(),
last_name: "collison".to_owned(),
age: 22,
};
let sub = nc.subscribe(&subj)?;
nc.publish(&subj, serde_json::to_vec(&p)?)?;
let mut p2 = sub.iter().map(move |msg| {
let p: Person = serde_json::from_slice(&msg.data).unwrap();
p
});
let p2 = p2.next().unwrap();
println!("received {:?}", p2);
assert_eq!(p2, p);
Ok(())
}
}
ローカルの開発環境ではこちらの設定を利用して色々とテストしてみようと思います。
開発環境などを作るときには、クラスター化を試してみようと思います。
関連する記事
Concordiumノードをローカルで動かしてみた
Concordiumの調査のために、ローカルでソースコードをビルドしてノードを動かしてみました
NATS JetStream Controllerを使ってNATSをGKEにデプロイする
helm chartのnackを使って、NATS JetStreamサーバーをデプロイして、Stream/Consumerをk8sリソースとして管理する
[Rust]axumとdragonflyを使ったWebsocket Chatのサンプル実装
redis互換のdragonflyをPUBSUBとして利用して、Websocket Chatアプリのサンプル実装を行いました。
[Rust]TiDBを使ったサンプルアプリケーションの実装
RustからTiDBを使ったアプリケーションの実装を行いました。