2022/12/06

NATS JetStream Controllerを使ってNATSをGKEにデプロイする

natsk8sgke

概要

NATSはクラウドネイティブでハイパフォーマンスのメッセージングミドルウェアです。

類似の製品はその他にも、Apache Kafka、RabbitMQ、Redisなどが有名です。

弊社では、RabbotMQをPythonのceleryで、KafkaをRust製のタスクキューで利用していましたが・・今では、NATSを積極的に利用しています。

今回は、NATSのhelm chartを使って、GKEにデプロイする方法を備忘録として記載します。

特に今回は、nackを利用させていただきました。

nackはNATSのStreamやConsumerの定義をk8sのリソースして定義できるところが魅力的で採用しました。

参考

基本的には、以下のREADMEを読んでデプロイしました。

デプロイ

CRDs

nackのCustom Resource Definition(CRD)達をデプロイします。

# CRDの定義ファイルをダウンロード
% wget https://raw.githubusercontent.com/nats-io/nack/v0.6.0/deploy/crds.yml -O crds.yaml

# CRDをデプロイ
% kubectl apply -f crds.yaml
customresourcedefinition.apiextensions.k8s.io/streams.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/consumers.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/streamtemplates.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/accounts.jetstream.nats.io created

namespace

natsというnamespaceを作って、そこにデプロイすることにしました。

# 定義ファイルを作成
% cat <<'EOF' > namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: nats
EOF
# namespaceを作成
% kubectl create -f namespace.yaml
namespace/nats created

storageclass

次に、nats jetstream clusterで利用するdiskの種類を定義します。 (SSDを利用します。)

# 定義ファイルを作成
% cat <<'EOF' > storageclass.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: pd-custom
provisioner: kubernetes.io/gce-pd
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
parameters:
  type: pd-ssd
mountOptions:
  - nodelalloc,noatime
EOF

# デプロイ
$ kubectl apply -f storageclass.yaml

# 確認(pd-customという名前のレコードが出てくればOK / そのほかにもデフォルトで作成されているpremium-rwoやstandard-rwoなども一緒に出てきます。)
$ kubectl get storageclass
NAME                     PROVISIONER             RECLAIMPOLICY   VOLUMEBINDINGMODE      ALLOWVOLUMEEXPANSION   AGE
pd-custom                kubernetes.io/gce-pd    Delete          WaitForFirstConsumer   true                   XX
premium-rwo              pd.csi.storage.gke.io   Delete          WaitForFirstConsumer   true                   XX
standard                 kubernetes.io/gce-pd    Delete          Immediate              true                   XX
standard-rwo (default)   pd.csi.storage.gke.io   Delete          WaitForFirstConsumer   true                   XX

nats jetstream server

helm3を使って、jetstream serverをデプロイします。

# 定義ファイルを作成
% cat <<'EOF' > nats-cluster.yaml
nats:
  jetstream:
    enabled: true

    memStorage:
      enabled: true
      size: 2Gi

    fileStorage:
      enabled: true
      storageDirectory: /data/jetstream
      size: 10Gi
      storageClassName: pd-custom  # 先ほど定義したstorage classを指定します。

natsbox:
  enabled: false

cluster:
  enabled: true
  name: "nats"
  replicas: 2

# affinity: <省略> ... affinityは適時設定すると良いです。

exporter:
  enabled: false  # prometheusの設定は今回は見送り
EOF

# deploy nats jetstream server with helm3
% helm repo add nats https://nats-io.github.io/k8s/helm/charts/
% helm install -n nats nats nats/nats -f nats-cluster.yaml
# in case you need to patch/recreate nats cluster due to improper settings...
#   helm upgrade -f nats-cluster.yaml nats nats/nats -n nats
#   helm uninstall nats -n nats

# 確認(うまくデプロイできると、以下のようにnatsのサービスが立ち上がります。)
% kubectl get svc -n nats
NAME   TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                 AGE
nats   ClusterIP   None         <none>        4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP   XX

nack

helm3を使って、nackをインストールします。

# 定義ファイルを作成
% cat <<'EOF' > nack.yaml
jetstream:
  enabled: true
  nats:
    url: nats://nats:4222  # nats clusterのserviceを指定します。
EOF

# deploy nack
% helm install nack nats/nack -f nack.yaml -n nats

stream

email_task_streamというメール送信のバックグラウンドタスク用のstreamを作成しました。

# 定義ファイルを作成
% cat <<'EOF' > stream-email-task.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Stream
metadata:
  name: email-task
  namespace: nats
spec:
  name: email_task_stream
  subjects: ["emails"]
  storage: file
  maxBytes: 5368709120  # 5 * 1024 * 1024 * 1024, // 5GB
  replicas: 1
EOF

# deploy stream
% kubectl apply -f stream-email-task.yaml

# 確認
% kubectl get stream -n nats
NAME         STATE     STREAM NAME         SUBJECTS
email-task   Created   email_task_stream   ["emails"]

consumer

email_task_streamから1件1件タスクを受信するPull basedなConsumerを定義してデプロイしました。

# 定義ファイルを作成
% cat <<'EOF' > consumer-email.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
  name: emails-consumer
  namespace: nats
spec:
  streamName: email_task_stream
  durableName: emails
  deliverPolicy: all
  maxDeliver: 1
  ackPolicy: explicit
EOF

# deploy pull based consumer
% kubectl apply -f consumer-email.yaml

# 確認
% kubectl get consumer -n nats
NAME              STATE     STREAM              CONSUMER   ACK POLICY
emails-consumer   Created   email_task_stream   emails     explicit

動作確認

# 動作確認用のpodを作成
% cat <<'EOF' > pod-nats-box.yaml
apiVersion: v1
kind: Pod
metadata:
  name: nats-box
  namespace: nats
  labels:
    app: nats-box
spec:
  containers:
    - name: nats-box
      image: synadia/nats-box:latest
      imagePullPolicy: Always
      env:
        - name: NATS_URL
          value: nats
        - name: STAN_CLUSTER
          value: stan
      command:
        - "tail"
        - "-f"
        - "/dev/null"
EOF
# podをデプロイ
% kubectl apply -f pod-nats-box.yaml

# podにログイン
% kubectl exec -it nats-box -n nats -- /bin/sh -l
# コンテナの中で、`nats`コマンドを叩いて、ちゃんと動作するかチェックします。
$ nats stream info email_task_stream
$ nats pub emails "email 1"
$ nats pub emails "email 2"
$ nats consumer next email_task_stream emails
$ exit

# 動作確認が終わったら、動作確認用のpodを削除
% kubectl delete -f pod-nats-box.yaml

以上になります。