2021/12/08
Kafka StrimziをGKEにデプロイしてみる
Kafkaをk8sにデプロイする必要があり、良さそうなOperatorを探していたところStrimziというのがとても良さそうだったので導入することにしました。
以下手順を備忘録がてら記載しておきます。
strimzi kafka operatorのデプロイ
- Releases · strimzi/strimzi-kafka-operator · GitHubから
strimzi-0.26.0.tar.gz
をダウンロードします。 - 解凍して、
install/cluster-operator
に対して作業をします。 - operatorデプロイ用のnamespaceを作成します。
今回はわかりやすくkafka-operator
という名前でデプロイします。(この名前は任意です。)
# namespace.yamlを作成します。
% cat <<EOF > namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka-operator
EOF
install/cluster-operator
のファイル全てにnamespace: kafka-operator
を付与します。- RoleBindingのnamespaceを変更します。
% sed -i '' 's/namespace: .*/namespace: kafka-operator/' install/cluster-operator/*RoleBinding*.yaml
# あらかじめgit addしておくと差分がわかって良いです。以下のような感じで、namespaceが適切に書き変わっていることを確認します。
% git diff install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml
subjects:
- kind: ServiceAccount
name: strimzi-cluster-operator
- namespace: myproject
+ namespace: kafka-operator
roleRef:
kind: ClusterRole
name: strimzi-cluster-operator-namespaced
- 次にclusterを構築したいnamespaceを指定します。(今回は複数のnamespaceでclusterを構築したいので、複数指定します。)(仮に
app1
とapp2
というnamespaceで構築するとすると、カンマ区切りで以下のように指定します。)
060-Deployment-strimzi-cluster-operator.yaml
を以下のように編集します。
% git diff install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
mountPath: /opt/strimzi/custom-config/
env:
- name: STRIMZI_NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
+ value: app1,app2
- name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
value: "120000"
- name: STRIMZI_OPERATION_TIMEOUT_MS
- 各namespace毎にRoleBindingを作成します。
% mkdir install/cluster-operator/020-RoleBinding-strimzi-cluster-operator
% mv install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/kafka-operator.yaml
% cp install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/kafka-operator.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml
% vi install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml
# namespace: app1 を追記します。
% cp install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app2.yaml
% vi install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app2.yaml
# namespace: app2 に変更します。
- (おまけ)operatorが他のプロセスにあまり影響を与えないようにするために、
your-node-name
というnode poolのインスタンスのみにプロセスが常駐されるようにしておきます。
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "cloud.google.com/gke-nodepool"
operator: In
values:
- your-node-name
- 順次 kubernetesにdeployしていきます。
% kubectl apply -f namespace.yaml
% kubectl apply -f install/cluster-operator/010-ServiceAccount-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/020-ClusterRole-strimzi-cluster-operator-role.yaml
% kubectl apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/
% kubectl apply -f install/cluster-operator/021-ClusterRole-strimzi-cluster-operator-role.yaml
% kubectl apply -f install/cluster-operator/021-ClusterRoleBinding-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/030-ClusterRole-strimzi-kafka-broker.yaml
% kubectl apply -f install/cluster-operator/030-ClusterRoleBinding-strimzi-cluster-operator-kafka-broker-delegation.yaml
% kubectl apply -f install/cluster-operator/031-ClusterRole-strimzi-entity-operator.yaml
% kubectl apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml
% kubectl apply -f install/cluster-operator/033-ClusterRole-strimzi-kafka-client.yaml
% kubectl apply -f install/cluster-operator/033-ClusterRoleBinding-strimzi-cluster-operator-kafka-client-delegation.yaml
% kubectl apply -f install/cluster-operator/040-Crd-kafka.yaml
% kubectl apply -f install/cluster-operator/041-Crd-kafkaconnect.yaml
% kubectl apply -f install/cluster-operator/043-Crd-kafkatopic.yaml
% kubectl apply -f install/cluster-operator/044-Crd-kafkauser.yaml
% kubectl apply -f install/cluster-operator/045-Crd-kafkamirrormaker.yaml
% kubectl apply -f install/cluster-operator/046-Crd-kafkabridge.yaml
% kubectl apply -f install/cluster-operator/047-Crd-kafkaconnector.yaml
% kubectl apply -f install/cluster-operator/048-Crd-kafkamirrormaker2.yaml
% kubectl apply -f install/cluster-operator/049-Crd-kafkarebalance.yaml
% kubectl apply -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
最後に operatorがちゃんと起動するか確認します。(31秒で起動しました。結構速いです。)
% kubectl get pods -n kafka-operator -w
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-5bdd66456d-hvm2j 0/1 ContainerCreating 0 8s
strimzi-cluster-operator-5bdd66456d-hvm2j 0/1 Running 0 17s
strimzi-cluster-operator-5bdd66456d-hvm2j 1/1 Running 0 31s
kafka clusterのデプロイ
先ほどデプロイしたoperatorを使って、kafkaのclusterをデプロイします。
最初のstepでダウンロードしたstrimzi-0.26.0
のexamples/kafka/kafka-persistent-single.yaml
を編集して使っていきます。
これは、データをちゃんと保持して、zookeeperとkafka nodeを1つづつの構成です。
※ 負荷やアプリケーションの可用性を考慮して、いろんな構成が考えられますが、ひとまずはシンプルなものを利用します。
準備
事前にSSDを利用するstorage classを作成しておきます。
% cat <<EOF > storageclass-pd-ssd.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: pd-ssd
provisioner: kubernetes.io/gce-pd
parameters:
type: pd-ssd
fstype: ext4
replication-type: none
allowVolumeExpansion: true
EOF
% kubectl apply -f storageclass-pd-ssd.yaml
clusterのデプロイ
% cat <<EOF > kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: app1
spec:
kafka:
version: 3.0.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
# - name: tls
# port: 9093
# type: internal
# tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "3.0"
inter.broker.protocol.version: "3.0"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
class: pd-ssd
size: 20Gi
deleteClaim: false
# 以下の設定はオプションです。
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "cloud.google.com/gke-nodepool"
operator: In
values:
- your-node-name
zookeeper:
replicas: 1
storage:
type: persistent-claim
class: pd-ssd
size: 20Gi
deleteClaim: false
# 以下の設定はオプションです。
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "cloud.google.com/gke-nodepool"
operator: In
values:
- your-node-name
entityOperator:
topicOperator: {}
userOperator: {}
EOF
% kubectl apply -f kafka-cluster.yaml
確認します。
% kubectl get pods -n app1
NAME READY STATUS RESTARTS AGE
kafka-cluster-kafka-0 1/1 Running 0 2m5s
kafka-cluster-zookeeper-0 1/1 Running 0 2m43s
Topicの定義の作成
% cat <<EOF > topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: job
namespace: app1
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 86400000 # 1day
segment.bytes: 1073741824 # 1GB
EOF
% kubectl apply -f topic.yaml
動作確認
事前にserviceを確認します。(endpointとして利用します。)
% kubectl get svc -n app1
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-cluster-kafka-bootstrap ClusterIP XXX.XXX.XXX.XX <none> 9091/TCP,9092/TCP 66m
kafka-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 66m
kafka-cluster-zookeeper-client ClusterIP XXX.XXX.XXX.XX <none> 2181/TCP 67m
kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 67m
ログインして、producerを試します。
% kubectl get pods -n app1
NAME READY STATUS RESTARTS AGE
kafka-cluster-kafka-0 1/1 Running 0 64m
kafka-cluster-zookeeper-0 1/1 Running 0 65m
% kubectl exec -it kafka-cluster-kafka-0 -n app1 -- bash
# ログイン後
$ ./bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic job
>test
別のターミナルを開いて、同じpodにログインして、consumerを試します。
% kubectl exec -it kafka-cluster-kafka-0 -n app1 -- bash
# ログイン後
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic job --from-beginning
test
# 今度はtoipcを一覧表示してみます。
$ ./bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list
__consumer_offsets
job
感想
kafka自体が結構configだらけなので、どうしてもoperator自体の定義も長くなることをさけられない感じではあるかなと思いますが・・
比較的楽に運用ができそうでした。kafkaを使う際は積極的に使っていこうと思います。
以上です。
関連する記事
[小〜中規模向け]GKEにTiDBをデプロイする
MySQL互換のNewSQLであるTiDBをGKEにデプロイしてみました。
NATS JetStream Controllerを使ってNATSをGKEにデプロイする
helm chartのnackを使って、NATS JetStreamサーバーをデプロイして、Stream/Consumerをk8sリソースとして管理する
GKEにDragonflydbをデプロイする
redis互換のdragonflydbをGKEにデプロイしました
[GKE]Kafka Strimziをアップグレードする
GKEにデプロイしているKafka Strimzi 0.26.0を0.30.0にアップグレードする