2021/12/08

Kafka StrimziをGKEにデプロイしてみる

kafkak8sgke

Kafkaをk8sにデプロイする必要があり、良さそうなOperatorを探していたところStrimziというのがとても良さそうだったので導入することにしました。

以下手順を備忘録がてら記載しておきます。

strimzi kafka operatorのデプロイ

  1. Releases · strimzi/strimzi-kafka-operator · GitHubからstrimzi-0.26.0.tar.gzをダウンロードします。
  2. 解凍して、install/cluster-operatorに対して作業をします。
  3. operatorデプロイ用のnamespaceを作成します。

今回はわかりやすくkafka-operatorという名前でデプロイします。(この名前は任意です。)

# namespace.yamlを作成します。
% cat <<EOF > namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: kafka-operator
EOF
  1. install/cluster-operatorのファイル全てにnamespace: kafka-operatorを付与します。
  2. RoleBindingのnamespaceを変更します。
% sed -i '' 's/namespace: .*/namespace: kafka-operator/' install/cluster-operator/*RoleBinding*.yaml
# あらかじめgit addしておくと差分がわかって良いです。以下のような感じで、namespaceが適切に書き変わっていることを確認します。
% git diff install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml
 subjects:
   - kind: ServiceAccount
     name: strimzi-cluster-operator
-    namespace: myproject
+    namespace: kafka-operator
 roleRef:
   kind: ClusterRole
   name: strimzi-cluster-operator-namespaced
  1. 次にclusterを構築したいnamespaceを指定します。(今回は複数のnamespaceでclusterを構築したいので、複数指定します。)(仮にapp1app2というnamespaceで構築するとすると、カンマ区切りで以下のように指定します。)

060-Deployment-strimzi-cluster-operator.yamlを以下のように編集します。

% git diff install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 
               mountPath: /opt/strimzi/custom-config/
           env:
             - name: STRIMZI_NAMESPACE
-              valueFrom:
-                fieldRef:
-                  fieldPath: metadata.namespace
+              value: app1,app2
             - name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
               value: "120000"
             - name: STRIMZI_OPERATION_TIMEOUT_MS
  1. 各namespace毎にRoleBindingを作成します。
% mkdir install/cluster-operator/020-RoleBinding-strimzi-cluster-operator
% mv install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/kafka-operator.yaml
% cp install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/kafka-operator.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml
% vi install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml
# namespace: app1 を追記します。
% cp install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app1.yaml install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app2.yaml
% vi install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/app2.yaml
# namespace: app2 に変更します。
  1. (おまけ)operatorが他のプロセスにあまり影響を与えないようにするために、your-node-nameというnode poolのインスタンスのみにプロセスが常駐されるようにしておきます。
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: "cloud.google.com/gke-nodepool"
                    operator: In
                    values:
                      - your-node-name
  1. 順次 kubernetesにdeployしていきます。
% kubectl apply -f namespace.yaml
% kubectl apply -f install/cluster-operator/010-ServiceAccount-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/020-ClusterRole-strimzi-cluster-operator-role.yaml
% kubectl apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator/
% kubectl apply -f install/cluster-operator/021-ClusterRole-strimzi-cluster-operator-role.yaml
% kubectl apply -f install/cluster-operator/021-ClusterRoleBinding-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/030-ClusterRole-strimzi-kafka-broker.yaml
% kubectl apply -f install/cluster-operator/030-ClusterRoleBinding-strimzi-cluster-operator-kafka-broker-delegation.yaml
% kubectl apply -f install/cluster-operator/031-ClusterRole-strimzi-entity-operator.yaml
% kubectl apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml
% kubectl apply -f install/cluster-operator/033-ClusterRole-strimzi-kafka-client.yaml
% kubectl apply -f install/cluster-operator/033-ClusterRoleBinding-strimzi-cluster-operator-kafka-client-delegation.yaml
% kubectl apply -f install/cluster-operator/040-Crd-kafka.yaml
% kubectl apply -f install/cluster-operator/041-Crd-kafkaconnect.yaml
% kubectl apply -f install/cluster-operator/043-Crd-kafkatopic.yaml
% kubectl apply -f install/cluster-operator/044-Crd-kafkauser.yaml
% kubectl apply -f install/cluster-operator/045-Crd-kafkamirrormaker.yaml
% kubectl apply -f install/cluster-operator/046-Crd-kafkabridge.yaml
% kubectl apply -f install/cluster-operator/047-Crd-kafkaconnector.yaml
% kubectl apply -f install/cluster-operator/048-Crd-kafkamirrormaker2.yaml
% kubectl apply -f install/cluster-operator/049-Crd-kafkarebalance.yaml
% kubectl apply -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
% kubectl apply -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml

最後に operatorがちゃんと起動するか確認します。(31秒で起動しました。結構速いです。)

% kubectl get pods -n kafka-operator -w
NAME                                        READY   STATUS              RESTARTS   AGE
strimzi-cluster-operator-5bdd66456d-hvm2j   0/1     ContainerCreating   0          8s
strimzi-cluster-operator-5bdd66456d-hvm2j   0/1     Running             0          17s
strimzi-cluster-operator-5bdd66456d-hvm2j   1/1     Running             0          31s

kafka clusterのデプロイ

先ほどデプロイしたoperatorを使って、kafkaのclusterをデプロイします。

最初のstepでダウンロードしたstrimzi-0.26.0examples/kafka/kafka-persistent-single.yamlを編集して使っていきます。

これは、データをちゃんと保持して、zookeeperとkafka nodeを1つづつの構成です。

※ 負荷やアプリケーションの可用性を考慮して、いろんな構成が考えられますが、ひとまずはシンプルなものを利用します。

準備

事前にSSDを利用するstorage classを作成しておきます。

% cat <<EOF > storageclass-pd-ssd.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: pd-ssd
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  fstype: ext4
  replication-type: none
allowVolumeExpansion: true
EOF
% kubectl apply -f storageclass-pd-ssd.yaml

clusterのデプロイ

% cat <<EOF > kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: app1
spec:
  kafka:
    version: 3.0.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
#     - name: tls
#       port: 9093
#       type: internal
#       tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "3.0"
      inter.broker.protocol.version: "3.0"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        class: pd-ssd
        size: 20Gi
        deleteClaim: false
    # 以下の設定はオプションです。
    template:
      pod:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: "cloud.google.com/gke-nodepool"
                      operator: In
                      values:
                        - your-node-name
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      class: pd-ssd
      size: 20Gi
      deleteClaim: false
    # 以下の設定はオプションです。
    template:
      pod:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: "cloud.google.com/gke-nodepool"
                      operator: In
                      values:
                        - your-node-name
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF
% kubectl apply -f kafka-cluster.yaml

確認します。

% kubectl get pods -n app1
NAME                        READY   STATUS    RESTARTS   AGE
kafka-cluster-kafka-0       1/1     Running   0          2m5s
kafka-cluster-zookeeper-0   1/1     Running   0          2m43s

Topicの定義の作成

% cat <<EOF > topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: job
  namespace: app1
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 86400000     # 1day
    segment.bytes: 1073741824  # 1GB
EOF
% kubectl apply -f topic.yaml

動作確認

事前にserviceを確認します。(endpointとして利用します。)

 % kubectl get svc -n app1
NAME                             TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kafka-cluster-kafka-bootstrap    ClusterIP   XXX.XXX.XXX.XX   <none>        9091/TCP,9092/TCP            66m
kafka-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP   66m
kafka-cluster-zookeeper-client   ClusterIP   XXX.XXX.XXX.XX   <none>        2181/TCP                     67m
kafka-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   67m

ログインして、producerを試します。

 % kubectl get pods -n app1
NAME                        READY   STATUS    RESTARTS   AGE
kafka-cluster-kafka-0       1/1     Running   0          64m
kafka-cluster-zookeeper-0   1/1     Running   0          65m
% kubectl exec -it kafka-cluster-kafka-0 -n app1 -- bash
# ログイン後
$ ./bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic job
>test

別のターミナルを開いて、同じpodにログインして、consumerを試します。

% kubectl exec -it kafka-cluster-kafka-0 -n app1 -- bash
# ログイン後
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic job --from-beginning
test

# 今度はtoipcを一覧表示してみます。
$ ./bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list
__consumer_offsets
job

感想

kafka自体が結構configだらけなので、どうしてもoperator自体の定義も長くなることをさけられない感じではあるかなと思いますが・・

比較的楽に運用ができそうでした。kafkaを使う際は積極的に使っていこうと思います。

以上です。