2021/07/10

NATS Streamingサーバーをローカルで立てて、Rustからアクセスしてみる

natsrust

普段メッセージブローカーとしてはrabbitmq/redis/kafkaを使ってきましたが、natsが良さそうなので入門してみようと思います。

以下試してみました。

  1. docker-composeを使って、nats streaming serverを立てる
  2. rustのnatscrateを利用して接続をテスト

docker-compose

version: "3.7"
services:
  nats-streaming-stan:
    image: "nats-streaming:0.22.0"
    # see https://docs.nats.io/nats-streaming-server/configuring/cmdline
    command:
      - "--cluster_id"
      - "stan"
      - "--http_port"
      - "8222"
      - "--port"
      - "4222"
      - "--store"
      - "file"
      - "--dir"
      - "/data/stan/store"
      - "-D"  # Enable debugging output
      - "-SD" # Enable STAN debugging output
      # The actual total wait is: (fail count + 1) * (hb interval + hb timeout)
      - "--hb_interval"   # Interval at which the server sends an heartbeat to a client
      - "10s"
      - "--hb_timeout"    # How long the server waits for a heartbeat response from the client before considering it a failed heartbeat
      - "10s"
      - "--hb_fail_count" # Count of failed heartbeats before server closes the client connection.
      - "2"
    ports:
      - "4222:4222"
      - "8222:8222"
    volumes:
      - ".docker-data/nats-streaming:/data"

参考

Rustからアクセス

ライブラリのインストール

[dependencies]
nats = "0.9.18"  # <- nats・driverライブラリ
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

[dev-dependencies]
dotenv = "0.15.0"

テストコード

src/lib.rs

#[cfg(test)]
mod test {
    use dotenv::dotenv;
    use std::env;
    use serde::{Deserialize, Serialize};

    #[test]
    fn test_simple_pubsub() -> std::io::Result<()> {
        dotenv().ok();
        let nats_url = env::var("NATS_URL").unwrap_or("localhost".into());
        let nc = nats::connect(&nats_url)?;
        let sub = nc.subscribe("my.subject")?;
        nc.publish("my.subject", "Hello World!")?;
        nc.publish("my.subject", "my message")?;
        if let Some(msg) = sub.next() {
            println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
            assert_eq!(msg.data, "Hello World!".as_bytes())
        }
        if let Some(msg) = sub.next() {
            println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
            assert_eq!(msg.data, "my message".as_bytes())
        }
        Ok(())
    }

    #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
    struct Person {
        first_name: String,
        last_name: String,
        age: u8,
    }

    #[test]
    fn test_serde() -> std::io::Result<()> {
        dotenv().ok();
        let nats_url = env::var("NATS_URL").unwrap_or("localhost".into());
        let nc = nats::connect(&nats_url)?;
        let subj = nc.new_inbox();
        let p = Person {
            first_name: "derek".to_owned(),
            last_name: "collison".to_owned(),
            age: 22,
        };
        let sub = nc.subscribe(&subj)?;
        nc.publish(&subj, serde_json::to_vec(&p)?)?;
        let mut p2 = sub.iter().map(move |msg| {
            let p: Person = serde_json::from_slice(&msg.data).unwrap();
            p
        });
        let p2 = p2.next().unwrap();
        println!("received {:?}", p2);
        assert_eq!(p2, p);
        Ok(())
    }
}

ローカルの開発環境ではこちらの設定を利用して色々とテストしてみようと思います。

開発環境などを作るときには、クラスター化を試してみようと思います。