2022/12/06
NATS JetStream Controllerを使ってNATSをGKEにデプロイする
概要
NATSはクラウドネイティブでハイパフォーマンスのメッセージングミドルウェアです。
類似の製品はその他にも、Apache Kafka、RabbitMQ、Redisなどが有名です。
弊社では、RabbotMQをPythonのceleryで、KafkaをRust製のタスクキューで利用していましたが・・今では、NATSを積極的に利用しています。
今回は、NATSのhelm chartを使って、GKEにデプロイする方法を備忘録として記載します。
特に今回は、nackを利用させていただきました。
nackはNATSのStreamやConsumerの定義をk8sのリソースして定義できるところが魅力的で採用しました。
参考
基本的には、以下のREADMEを読んでデプロイしました。
デプロイ
CRDs
nackのCustom Resource Definition(CRD)達をデプロイします。
# CRDの定義ファイルをダウンロード
% wget https://raw.githubusercontent.com/nats-io/nack/v0.6.0/deploy/crds.yml -O crds.yaml
# CRDをデプロイ
% kubectl apply -f crds.yaml
customresourcedefinition.apiextensions.k8s.io/streams.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/consumers.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/streamtemplates.jetstream.nats.io created
customresourcedefinition.apiextensions.k8s.io/accounts.jetstream.nats.io created
namespace
nats
というnamespaceを作って、そこにデプロイすることにしました。
# 定義ファイルを作成
% cat <<'EOF' > namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: nats
EOF
# namespaceを作成
% kubectl create -f namespace.yaml
namespace/nats created
storageclass
次に、nats jetstream clusterで利用するdiskの種類を定義します。 (SSDを利用します。)
# 定義ファイルを作成
% cat <<'EOF' > storageclass.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: pd-custom
provisioner: kubernetes.io/gce-pd
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
parameters:
type: pd-ssd
mountOptions:
- nodelalloc,noatime
EOF
# デプロイ
$ kubectl apply -f storageclass.yaml
# 確認(pd-customという名前のレコードが出てくればOK / そのほかにもデフォルトで作成されているpremium-rwoやstandard-rwoなども一緒に出てきます。)
$ kubectl get storageclass
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
pd-custom kubernetes.io/gce-pd Delete WaitForFirstConsumer true XX
premium-rwo pd.csi.storage.gke.io Delete WaitForFirstConsumer true XX
standard kubernetes.io/gce-pd Delete Immediate true XX
standard-rwo (default) pd.csi.storage.gke.io Delete WaitForFirstConsumer true XX
nats jetstream server
helm3を使って、jetstream serverをデプロイします。
# 定義ファイルを作成
% cat <<'EOF' > nats-cluster.yaml
nats:
jetstream:
enabled: true
memStorage:
enabled: true
size: 2Gi
fileStorage:
enabled: true
storageDirectory: /data/jetstream
size: 10Gi
storageClassName: pd-custom # 先ほど定義したstorage classを指定します。
natsbox:
enabled: false
cluster:
enabled: true
name: "nats"
replicas: 2
# affinity: <省略> ... affinityは適時設定すると良いです。
exporter:
enabled: false # prometheusの設定は今回は見送り
EOF
# deploy nats jetstream server with helm3
% helm repo add nats https://nats-io.github.io/k8s/helm/charts/
% helm install -n nats nats nats/nats -f nats-cluster.yaml
# in case you need to patch/recreate nats cluster due to improper settings...
# helm upgrade -f nats-cluster.yaml nats nats/nats -n nats
# helm uninstall nats -n nats
# 確認(うまくデプロイできると、以下のようにnatsのサービスが立ち上がります。)
% kubectl get svc -n nats
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
nats ClusterIP None <none> 4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP XX
nack
helm3を使って、nackをインストールします。
# 定義ファイルを作成
% cat <<'EOF' > nack.yaml
jetstream:
enabled: true
nats:
url: nats://nats:4222 # nats clusterのserviceを指定します。
EOF
# deploy nack
% helm install nack nats/nack -f nack.yaml -n nats
stream
email_task_stream
というメール送信のバックグラウンドタスク用のstreamを作成しました。
# 定義ファイルを作成
% cat <<'EOF' > stream-email-task.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Stream
metadata:
name: email-task
namespace: nats
spec:
name: email_task_stream
subjects: ["emails"]
storage: file
maxBytes: 5368709120 # 5 * 1024 * 1024 * 1024, // 5GB
replicas: 1
EOF
# deploy stream
% kubectl apply -f stream-email-task.yaml
# 確認
% kubectl get stream -n nats
NAME STATE STREAM NAME SUBJECTS
email-task Created email_task_stream ["emails"]
consumer
email_task_stream
から1件1件タスクを受信するPull basedなConsumerを定義してデプロイしました。
# 定義ファイルを作成
% cat <<'EOF' > consumer-email.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: emails-consumer
namespace: nats
spec:
streamName: email_task_stream
durableName: emails
deliverPolicy: all
maxDeliver: 1
ackPolicy: explicit
EOF
# deploy pull based consumer
% kubectl apply -f consumer-email.yaml
# 確認
% kubectl get consumer -n nats
NAME STATE STREAM CONSUMER ACK POLICY
emails-consumer Created email_task_stream emails explicit
動作確認
# 動作確認用のpodを作成
% cat <<'EOF' > pod-nats-box.yaml
apiVersion: v1
kind: Pod
metadata:
name: nats-box
namespace: nats
labels:
app: nats-box
spec:
containers:
- name: nats-box
image: synadia/nats-box:latest
imagePullPolicy: Always
env:
- name: NATS_URL
value: nats
- name: STAN_CLUSTER
value: stan
command:
- "tail"
- "-f"
- "/dev/null"
EOF
# podをデプロイ
% kubectl apply -f pod-nats-box.yaml
# podにログイン
% kubectl exec -it nats-box -n nats -- /bin/sh -l
# コンテナの中で、`nats`コマンドを叩いて、ちゃんと動作するかチェックします。
$ nats stream info email_task_stream
$ nats pub emails "email 1"
$ nats pub emails "email 2"
$ nats consumer next email_task_stream emails
$ exit
# 動作確認が終わったら、動作確認用のpodを削除
% kubectl delete -f pod-nats-box.yaml
以上になります。
関連する記事
[小〜中規模向け]GKEにTiDBをデプロイする
MySQL互換のNewSQLであるTiDBをGKEにデプロイしてみました。
NATS JetStream Controllerを使ってNATSをGKEにデプロイする
helm chartのnackを使って、NATS JetStreamサーバーをデプロイして、Stream/Consumerをk8sリソースとして管理する
GKEにDragonflydbをデプロイする
redis互換のdragonflydbをGKEにデプロイしました
[GKE]Kafka Strimziをアップグレードする
GKEにデプロイしているKafka Strimzi 0.26.0を0.30.0にアップグレードする