Kubernetes部署Kafka集群

主要参考了https://stackoverflow.com/questions/44651219/kafka-deployment-on-minikubehttps://github.com/ramhiser/kafka-kubernetes两个项目,可是这两个项目都是单节点的Kafka,我这里尝试将单节点Kafka扩展为多节点的Kafka集群。node

1、单节点Kafkalinux

要搭建Kafka集群,仍是要从单节点开始。git

1.建立Zookeeper服务zookeeper-svc.yaml和zookeeper-deployment.yaml,用kubectl create -f建立:github

apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
spec:
  ports:
  - name: zookeeper-port
    port: 2181
    targetPort: 2181
  selector:
    app: zookeeper

 

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - image: wurstmeister/zookeeper
        imagePullPolicy: IfNotPresent
        name: zookeeper
        ports:
        - containerPort: 2181

2.等pod跑起来,service的endpoint配置成功后,就能够继续建立kafka的kafka-svc.yaml和kafka-deployment.yaml了:bootstrap

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    nodePort: 30092
    protocol: TCP
  selector:
    app: kafka

 

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka
  template:
    metadata:
      labels:
        name: kafka
        app: kafka
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: "[kafka的service的clusterIP]"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: [zookeeper的service的clusterIP]:2181
        - name: KAFKA_BROKER_ID
          value: "1"

clusterIP经过kubectl get svc进行查看。KAFKA_ZOOKEEPER_CONNECT的值也能够改成zookeeper-service:2181。api

3.建立后,须要对服务进行测试。参考了http://www.javashuo.com/article/p-pyycgahe-b.html的方法。bash

在此以前,针对虚拟化的Kafka,须要先执行下面的命令以进入容器:app

kubectl exec -it [Kafka的pod名称] /bin/bash

进入容器后,Kafka的命令存储在opt/kafka/bin目录下,用cd命令进入:测试

cd opt/kafka/bin

后面的操做就跟上面的博客中写的相似了。针对单节点Kafka,须要将同一个节点做为生产者和消费者。执行命令以下:.net

kafka-console-producer.sh --broker-list [kafka的service的clusterIP]:9092 --topic test

运行正常的话,下方会出现>标记以提示输入消息。这样这个终端就成为了生产者。

另外打开一个linux终端,执行相同的命令进入容器。此次将这个终端做为消费者。注意,上面的博客中写的建立消费者的方法在新版的Kafka中已经改变,须要执行下面的命令:

kafka-console-consumer.sh --bootstrap-server [kafka的service的clusterIP]:9092 --topic test --from-beginning

以后,在生产者输入信息,查看消费者是否可以接收到。若是接收到,说明运行成功。

最后,还能够执行下面的命令以测试列出全部的消息主题:

kafka-topics.sh --list --zookeeper [zookeeper的service的clusterIP]:2181

注意,有时须要用Kafka的端口,有时须要用Zookeeper的端口,应注意区分。

2、多节点Kafka集群

单节点服务运行成功后,就能够尝试增长Kafka的节点以创建集群。个人Kubernetes集群包含3个节点,因此我搭建的Kafka集群也包含3个节点,分别运行在三台机器上。

我这里采用了3个Deployment来运行Kafka和Zookeeper,其实更优雅的方式是使用StatefulSet。Kubernetes的官方文档上有使用StatefulSet搭建Zookeeper集群的范例。

可是使用StatefulSet搭建Zookeeper和Kafka时,Zookeeper的myid和Kafka的brokerID就不能预先设置了,所以须要在镜像构建过程当中加入相关的操做,而Docker Hub中的绝大多数镜像都不包含这一逻辑。而Deployment虽然不够优雅,可是能够对各节点预先配置,运行起来相对简单,能够说各有所长。

1.搭建Zookeeper集群

建立zookeeper的yaml文件zookeeper-svc2.yaml和zookeeper-deployment2.yaml以下:

apiVersion: v1
kind: Service
metadata:
  name: zoo1
  labels:
    app: zookeeper-1
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-1
---
apiVersion: v1
kind: Service
metadata:
  name: zoo2
  labels:
    app: zookeeper-2
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-2
---
apiVersion: v1
kind: Service
metadata:
  name: zoo3
  labels:
    app: zookeeper-3
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-3

 

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-1
      name: zookeeper-1
  template:
    metadata:
      labels:
        app: zookeeper-1
        name: zookeeper-1
    spec:
      containers:
      - name: zoo1
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "1"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-2
      name: zookeeper-2
  template:
    metadata:
      labels:
        app: zookeeper-2
        name: zookeeper-2
    spec:
      containers:
      - name: zoo2
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "2"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-3
      name: zookeeper-3
  template:
    metadata:
      labels:
        app: zookeeper-3
        name: zookeeper-3
    spec:
      containers:
      - name: zoo3
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "3"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3

这里建立了3个deployment和3个service,一一对应。这样,三个实例均可以对外提供服务。

建立完成后,须要用kubectl logs查看一下三个Zookeeper的pod的日志,确保没有错误发生,而且在3个节点的日志中,有相似下面的语句,则代表Zookeeper集群已顺利搭建成功。

2016-10-06 14:04:05,904 [myid:2] - INFO [QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2181:Leader@358] - LEADING - 
LEADER ELECTION TOOK - 2613

 

2.搭建Kafka集群

一样建立3个deployment和3个service,编写kafka-svc2.yaml和kafka-deployment2.yaml以下:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-1
  labels:
    app: kafka-service-1
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-1
    targetPort: 9092
    nodePort: 30901
    protocol: TCP
  selector:
    app: kafka-service-1
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-2
  labels:
    app: kafka-service-2
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-2
    targetPort: 9092
    nodePort: 30902
    protocol: TCP
  selector:
    app: kafka-service-2
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-3
  labels:
    app: kafka-service-3
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-3
    targetPort: 9092
    nodePort: 30903
    protocol: TCP
  selector:
    app: kafka-service-3

 

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka-service-1
  template:
    metadata:
      labels:
        name: kafka-service-1
        app: kafka-service-1
    spec:
      containers:
      - name: kafka-1
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service1的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_CREATE_TOPICS
          value: mytopic:2:1
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-2
spec:
  replicas: 1
  selector:
  selector:
    matchLabels:
      name: kafka-service-2
  template:
    metadata:
      labels:
        name: kafka-service-2
        app: kafka-service-2
    spec:
      containers:
      - name: kafka-2
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service2的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "2"
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-3
spec:
  replicas: 1
  selector:
  selector:
    matchLabels:
      name: kafka-service-3
  template:
    metadata:
      labels:
        name: kafka-service-3
        app: kafka-service-3
    spec:
      containers:
      - name: kafka-3
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service3的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "3"

在deployment1中执行了建立一个新topic的操做。

3.测试

测试方法基本同单集群的状况,这里就不赘述了。不一样的是,此次能够将不一样的节点做为生产者和消费者。

 

至此,Kubernetes的Kafka集群搭建就大功告成了!

相关文章
相关标签/搜索