2021/07/11

NATS JetStreamサーバーをローカルで立てて、Rustからアクセスしてみる

natsrust

前回の記事ではnats-streaming-serverを利用していましたが、
よくみたら非推奨(deprecated)だったようで、nats jetstreamを使うようにしました。

nats jetstreamはat least onceを実現するための仕組みが備わっており、publishされたメッセージをstorage(fileとかmemoryとか)に一時的に保存できるようになっています。

  • Stream
  • Consumer

という2つの機能をうまく使って、様々な要求のデータストリーミングパイプラインを構築することができるみたいです。

StreamとConsumerの概要図

concept of jetstream

今回は、JetStreamサーバーをdocker-composeで立ち上げ、Rustを使ってテストしてみましたのでそれの紹介になります。

docker-compose

version: "3.7"
services:
  nats-jetstream:
    image: "nats:2.3.2-alpine"
    # see https://docs.nats.io/jetstream/getting_started/using_docker
    command:
      - "--port"
      - "4222"
      - "--http_port"
      - "8222"
      - "--jetstream"  # このフラグを立てると、jetstream機能が動きます
      - "--store_dir"
      - "/data"
      - "--debug"
    ports:
      - "4222:4222"
      - "8222:8222"
    volumes:
      - ".docker-data/nats-jet-stream:/data"

コマンドラインツールを使ってstreamとconsumerを作成する

natsのインストール

% brew tap nats-io/nats-tools
% brew install nats-io/nats-tools/nats
% nats --help

Streamの作成

% nats str ls
No Streams defined

# ORDERSという名前で、Streamを作成します。(subjectはORDERS.*となるようにします)
% nats str add ORDERS \
    --subjects "ORDERS.*" \
    --ack \
    --max-msgs=-1 \
    --max-bytes=-1 \
    --max-age=1y \
    --storage file \
    --retention limits \
    --max-msg-size=-1 \
    --discard old \
    --dupe-window="0s" \
    --replicas 1

# ORDERSが作られていることを確認
% nats str ls
Streams:
        ORDERS

Consumerの作成

% nats con ls ORDERS
No Consumers defined

# NEWという名前でORDERS StreamのConsumerを作成します
# ORDERS.addという件名をwatchするようにします
% nats con add ORDERS NEW \
    --filter ORDERS.add \
    --ack explicit \
    --pull \
    --deliver all \
    --sample 100 \
    --max-deliver 20

# NEWという名前でORDERS StreamのConsumerが作成されていることを確認
% nats con info ORDERS NEW
Information for Consumer ORDERS > NEW created 2021-07-10T20:01:07+09:00
-- 以下略 --

Pub/Sub

コマンドラインからもメッセージのpub/subができるみたいです。

# pub
nats pub ORDERS.add "your message..."
# sub
nats con next ORDERS NEW

Rustからアクセス

Cargo.tomlにdependenciesを追加

[dependencies]
nats = { version = "0.9.18", features = ["jetstream"] }  # jetstream featureが必要でした
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.8.0", features = ["full"] }
blocking = "1.0.2"
dotenv = "0.15.0"

main.rsを作成

以下対応してみました。

  • Gracefulシャットダウン(Ctrl+Cを押してもすぐに処理を終了せず、現在処理しているメッセージの処理完了をまつ)
    • tokio::select!を使って実現しています
  • jetstream consumerを使って、メッセージの読み込み
use dotenv::dotenv;
use std::env;
use serde::{Deserialize, Serialize};
use tokio::signal::unix::{signal, SignalKind};
use nats::jetstream::{Consumer, ConsumerConfig};
use blocking::unblock;

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct AddParams {
    a: i32,
    b: i32,
}

/// cargo run --package nats-workaround --bin nats-workaround
#[tokio::main]
async fn main() -> std::io::Result<()> {
    dotenv().ok();
    consume().await
}

async fn consume()  -> std::io::Result<()> {
    let mut sigint = signal(SignalKind::interrupt())?;
    let mut sigterm = signal(SignalKind::terminate())?;
    let nats_url = env::var("NATS_URL").unwrap_or("127.0.0.1".into());
    let nc = nats::connect(&nats_url)?;
    loop {
        let nc = nc.clone();
        let next_msg = async {
            let mut consumer = Consumer::existing(nc, "ORDERS", ConsumerConfig {
                durable_name: Some("NEW".to_string()),
                deliver_subject: None,  // pull based message
                ..Default::default()
            }).expect("consumer must be initialized...");
            let msg = unblock(move || {
                let msg = consumer.pull();
                if let Ok(msg) = msg {
                    let next_id = msg.jetstream_message_info().unwrap().stream_seq;
                    if consumer.dedupe_window.already_processed(next_id) {
                        let _dont_care = msg.ack();
                        return None
                    }
                    Some(msg)
                } else {
                    None
                }
            }).await;
            msg
        };
        tokio::select! {
            _ = sigint.recv() => {
                println!("sigint");
                break
            }
            _ = sigterm.recv() => {
                println!("sigterm");
                break
            }
            msg = next_msg => {
                if let Some(msg) = msg {
                    println!("received {:?}", String::from_utf8(msg.data.clone()).unwrap());
                    let params: AddParams = serde_json::from_slice(&msg.data).unwrap();
                    println!("let's sleep");
                    for n in 1..10 {
                        std::thread::sleep(std::time::Duration::from_millis(1000));
                        println!("{}", n);
                    }
                    println!("finished!");
                    println!("parsed {:?}", params);
                    let _dont_care = msg.ack();
                }
            }
        }
    }
    Ok(())
}

こちらのコードをベースにして、スケーラブルな非同期タスク実行システムを構築したいなと思います。
(エラーハンドリングなどはまだ未実装なので、こちらをベースに肉付けしていこうと思います。)

以上です。