在 Kubernetes 上部署 Kafka 集群

Kafka 是目前最流行的分布式消息發(fā)布訂閱系統(tǒng),Kafka 功能非常強(qiáng)大,但它同樣也很復(fù)雜,需要一個高可用的強(qiáng)大平臺來運(yùn)行,在微服務(wù)盛行,大多數(shù)公司都采用分布式計算的今天,將 Kafka 作為核心的消息系統(tǒng)使用還是非常有優(yōu)勢的。
如果你在 Kubernetes 集群中運(yùn)行你的微服務(wù),那么在 Kubernetes 中運(yùn)行 Kafka 集群也是很有意義的,這樣可以利用其內(nèi)置的彈性和高可用特性,我們可以使用內(nèi)置的 Kubernetes 服務(wù)發(fā)現(xiàn)輕松地與集群內(nèi)的 Kafka Pods 進(jìn)行交互。
下面我們將來介紹下如何在 Kubernetes 上構(gòu)建分布式的 Kafka 集群,這里我們將使用 Helm Chart 和 StatefulSet 來進(jìn)行部署,當(dāng)然如果想要動態(tài)生成持久化數(shù)據(jù)卷,還需要提前配置一個 StorageClass 資源,比如基于 Ceph RBD 的,如果你集群中沒有配置動態(tài)卷,則需要提前創(chuàng)建3個未綁定的 PV 用于數(shù)據(jù)持久化。
當(dāng)前基于 Helm 官方倉庫的 chartincubator/kafka 在 Kubernetes 上部署的 Kafka,使用的鏡像是 confluentinc/cp-kafka:5.0.1,即部署的是Confluent 公司提供的 Kafka 版本,Confluent Platform Kafka(簡稱CP Kafka)提供了一些 Apache Kafka 沒有的高級特性,例如跨數(shù)據(jù)中心備份、Schema 注冊中心以及集群監(jiān)控工具等。
安裝
使用 Helm Chart 安裝當(dāng)然前提要安裝 Helm,直接使用最新版本的 Helm v3 版本即可:
>?wget?https://get.helm.sh/helm-v3.4.0-linux-amd64.tar.gz
>?tar?-zxvf?helm-v3.4.0-linux-amd64.tar.gz
>?sudo?cp?-a?linux-amd64/helm?/usr/local/bin/helm
>?chmod?+x?/usr/local/bin/helm
然后添加 Kafka 的 Chart 倉庫:
>?helm?repo?add?incubator?http://mirror.azure.cn/kubernetes/charts-incubator/
>?helm?repo?update
Hang?tight?while?we?grab?the?latest?from?your?chart?repositories...
...Successfully?got?an?update?from?the?"incubator"?chart?repository
...Successfully?got?an?update?from?the?"stable"?chart?repository
Update?Complete.??Happy?Helming!?
接著我們就可以配置需要安裝的 Values 文件了,可以直接使用默認(rèn)的 values.yaml 文件,然后可以用它來進(jìn)行定制,比如指定我們自己的 StorageClass:
>?curl?https://raw.githubusercontent.com/helm/charts/master/incubator/kafka/values.yaml?>?kfk-values.yaml
這里我直接使用默認(rèn)的進(jìn)行安裝:
>?helm?install?kafka?incubator/kafka?-f?kfk-values.yaml
NAME:?kafka
LAST?DEPLOYED:?Sun?Nov??1?09:36:44?2020
NAMESPACE:?default
STATUS:?deployed
REVISION:?1
NOTES:
###?Connecting?to?Kafka?from?inside?Kubernetes
You?can?connect?to?Kafka?by?running?a?simple?pod?in?the?K8s?cluster?like?this?with?a?configuration?like?this:
??apiVersion:?v1
??kind:?Pod
??metadata:
????name:?testclient
????namespace:?default
??spec:
????containers:
????-?name:?kafka
??????image:?confluentinc/cp-kafka:5.0.1
??????command:
????????-?sh
????????-?-c
????????-?"exec?tail?-f?/dev/null"
Once?you?have?the?testclient?pod?above?running,?you?can?list?all?kafka
topics?with:
??kubectl?-n?default?exec?testclient?--?./bin/kafka-topics.sh?--zookeeper?kafka-zookeeper:2181?--list
To?create?a?new?topic:
??kubectl?-n?default?exec?testclient?--?./bin/kafka-topics.sh?--zookeeper?kafka-zookeeper:2181?--topic?test1?--create?--partitions?1?--replication-factor?1
To?listen?for?messages?on?a?topic:
??kubectl?-n?default?exec?-ti?testclient?--?./bin/kafka-console-consumer.sh?--bootstrap-server?kafka:9092?--topic?test1?--from-beginning
To?stop?the?listener?session?above?press:?Ctrl+C
To?start?an?interactive?message?producer?session:
??kubectl?-n?default?exec?-ti?testclient?--?./bin/kafka-console-producer.sh?--broker-list?kafka-headless:9092?--topic?test1
To?create?a?message?in?the?above?session,?simply?type?the?message?and?press?"enter"
To?end?the?producer?session?try:?Ctrl+C
If?you?specify?"zookeeper.connect"?in?configurationOverrides,?please?replace?"kafka-zookeeper:2181"?with?the?value?of?"zookeeper.connect",?or?you?will?get?error.
如果你沒配置 StorageClass 或者可用的 PV,安裝的時候 kafka 的 Pod 會處于 Pending 狀態(tài),所以一定要提前配置好數(shù)據(jù)卷。
正常情況隔一會兒 Kafka 就可以安裝成功了:
>?kubectl?get?pods
NAME????????????????READY???STATUS????RESTARTS???AGE
kafka-0?????????????1/1?????Running???0??????????25m
kafka-1?????????????1/1?????Running???0??????????11m
kafka-2?????????????1/1?????Running???0??????????2m
kafka-zookeeper-0???1/1?????Running???0??????????25m
kafka-zookeeper-1???1/1?????Running???0??????????22m
kafka-zookeeper-2???1/1?????Running???0??????????18m
默認(rèn)會安裝3個 ZK Pods 和3個 Kafka Pods,這樣可以保證應(yīng)用的高可用,也可以看下我配置的持久卷信息:
>?kubectl?get?pvc
NAME??????????????STATUS???VOLUME???CAPACITY???ACCESS?MODES???STORAGECLASS???AGE
datadir-kafka-0???Bound????kfk0?????1Gi????????RWO???????????????????????????28m
datadir-kafka-1???Bound????kfk1?????1Gi????????RWO???????????????????????????13m
datadir-kafka-2???Bound????kfk2?????1Gi????????RWO???????????????????????????4m9s
>?kubectl?get?pv
NAME???CAPACITY???ACCESS?MODES???RECLAIM?POLICY???STATUS???CLAIM?????????????????????STORAGECLASS???REASON???AGE
kfk0???1Gi????????RWO????????????Retain???????????Bound????default/datadir-kafka-0???????????????????????????23m
kfk1???1Gi????????RWO????????????Retain???????????Bound????default/datadir-kafka-1???????????????????????????22m
kfk2???1Gi????????RWO????????????Retain???????????Bound????default/datadir-kafka-2???????????????????????????10m
如果我們配置一個 default 的 StorageClass,則會動態(tài)去申請持久化卷,如果你的集群沒有啟用動態(tài)卷,可以修改 values.yaml 來使用靜態(tài)卷。
然后查看下對應(yīng)的 Service 對象:
>?kubectl?get?svc
NAME???????????????????????TYPE????????CLUSTER-IP???????EXTERNAL-IP???PORT(S)??????????????????????AGE
kafka??????????????????????ClusterIP???10.100.205.187???????????9092/TCP?????????????????????31m
kafka-headless?????????????ClusterIP???None?????????????????????9092/TCP?????????????????????31m
kafka-zookeeper????????????ClusterIP???10.100.230.255???????????2181/TCP?????????????????????31m
kafka-zookeeper-headless???ClusterIP???None?????????????????????2181/TCP,3888/TCP,2888/TCP???31m
kubernetes?????????????????ClusterIP???10.96.0.1????????????????443/TCP??????????????????????14d
可以看到又一個叫 kafka-zookeeper 的 zookeeper 服務(wù)和一個叫 kafka 的 Kafka 服務(wù),對于 Kafka 集群的管理,我們將與 kafka-zookeeper 服務(wù)進(jìn)行交互,對于集群消息的收發(fā),我們將使用 kafka 服務(wù)。
客戶端測試
現(xiàn)在 Kafka 集群已經(jīng)搭建好了,接下來我們來安裝一個 Kafka 客戶端,用它來幫助我們產(chǎn)生和獲取 topics 消息。
直接使用下面的命令創(chuàng)建客戶端:
>?cat?<apiVersion:?v1
kind:?Pod
metadata:
??name:?testclient
??namespace:?default
spec:
??containers:
??-?name:?kafka
????image:?confluentinc/cp-kafka:5.0.1
????command:
??????-?sh
??????-?-c
??????-?"exec?tail?-f?/dev/null"
EOF
>?kubectl?get?pod?testclient
NAME?????????READY???STATUS????RESTARTS???AGE
testclient???1/1?????Running???0??????????23s
客戶端 Pod 創(chuàng)建成功后我們就可以開始進(jìn)行一些簡單的測試了。首先讓我們創(chuàng)建一個名為 test1 的有一個分區(qū)和復(fù)制因子'1'的 topic:
>?kubectl?exec?-it?testclient?--?/usr/bin/kafka-topics?--zookeeper?kafka-zookeeper:2181?--topic?test1?--create?--partitions?1?--replication-factor?1
Created?topic?"test1".
然后創(chuàng)建一個生產(chǎn)者,將消息發(fā)布到這個 topic 主題上:
>?kubectl??exec?-ti?testclient?--?/usr/bin/kafka-console-producer?--broker-list?kafka:9092?--topic?test1
>
然后重新打一個終端頁面,讓我們打開一個消費(fèi)者會話,這樣我們就可以看到我們發(fā)送的消息了。
>?kubectl?exec?-ti?testclient?--?/usr/bin/kafka-console-consumer?--bootstrap-server?kafka:9092?--topic?test1
現(xiàn)在我們在生產(chǎn)者的窗口發(fā)送消息,在上面的消費(fèi)者會話窗口中就可以看到對應(yīng)的消息了:
到這里證明 Kafka 集群就正常工作了。比如需要注意 zk 集群我們并沒有做持久化,如果是生產(chǎn)環(huán)境一定記得做下數(shù)據(jù)持久化,在 values.yaml 文件中根據(jù)需求進(jìn)行定制即可,當(dāng)然對于生產(chǎn)環(huán)境還是推薦使用 Operator 來搭建 Kafka 集群,比如 strimzi-kafka-operator。
訓(xùn)練營推薦


?點(diǎn)擊屏末?|?閱讀原文?|?即刻學(xué)習(xí)
