I have been recently working with Russ Miles on coding microservices that follow principles he has laid out in the Antifragile Software book. As a simple demo we decided on using Kafka as a simple event store to implement an event sourcing pattern.
The main focus of this article is to talk about how to deploy a kafka cluster and manage its lifecycle via Docker containers and Kubernetes on AWS.
Initially, I wanted to quickly see how to get one instance of kafka to be available from outside the AWS world so that I could interact with it. My first move is always to look at the main Docker image repository for official or popular images. Interestingly, as of this writing, there is no official Kafka image. The most popular is wurstmeister/kafka which is what I decided to use.
However, this was not enough. Indeed, Kafka relies on Zookeeper to work. Spotify offers an image with both services in a single image, I don’t think that’s a good idea in production so I decided to forego it. For Zookeeper, I didn’t use the most popular because its documentation wasn’t indicating any possibility to pass on parameters to the service. Instead, I went with digitalwonderland/zookeeper which was supporting some basic parameters like setting the broker id.
Setting one instance of both services is rather straightforward and can be controlled by a simple Kubernetes replication controller like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
--- apiVersion: v1 kind: ReplicationController metadata: name: kafka-controller spec: replicas: 1 selector: app: kafka template: metadata: labels: app: kafka spec: containers: - name: kafka image: wurstmeister/kafka ports: - containerPort: 9092 env: - name: KAFKA_ADVERTISED_HOST_NAME value: [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT] - name: KAFKA_ZOOKEEPER_CONNECT value: zook:2181 - name: zookeeper image: digitalwonderland/zookeeper ports: - containerPort: 2181 |
This could be exposed using the following Kubernetes service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
--- apiVersion: v1 kind: Service metadata: name: zook labels: app: kafka spec: ports: - port: 2181 name: zookeeper-port targetPort: 2181 protocol: TCP selector: app: kafka --- apiVersion: v1 kind: Service metadata: name: kafka-service labels: app: kafka spec: ports: - port: 9092 name: kafka-port targetPort: 9092 protocol: TCP selector: app: kafka type: LoadBalancer |
Notice the LoadBalancer type is used here because we need to create a AWS load-balancer to access those services from the outside world. Kubernetes scripts are clever enough to achieve this for us.
In the replication controller specification, we can see the requirement to let Kafka advertize its hostname. To make it work, this must be the actual domain of the AWS load-balancer. This means that you must create the Kubernetes service first (which is good policy anyway) and then, once it is done, write down its domain into the replication controller spec as the value of the KAFKA_ADVERTISED_HOST_NAME environment variable.
This is all good but this is not a cluster. It is merely a straight instance for development purpose. Even though Kubernetes promises you to look after your pods, it’s not a bad idea to run a cluster of both zookeeper and kafka services. This wasn’t as trivial as I expected.
The reason is mostly due to the way clusters are configured. Indeed, in Zookeeper’s case, each instance must be statically identified within the cluster. This means you cannot just increase the number of pod instances in the replication controller, because they would all have the same broker identifier. This will change in Zookeeper 3.5. Kafka doesn’t show this limitation anymore, indeed it will happily create a broker id for you if none is provided explicitely (though, this requires Kafka 0.9+).
What this means is that we now have two specifications. One for Zookeeper and one for Kafka.
Let’s start with the simpler one, kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
--- apiVersion: v1 kind: ReplicationController metadata: name: kafka-controller spec: replicas: 1 selector: app: kafka template: metadata: labels: app: kafka spec: containers: - name: kafka image: wurstmeister/kafka ports: - containerPort: 9092 env: - name: KAFKA_ADVERTISED_PORT value: "9092" - name: KAFKA_ADVERTISED_HOST_NAME value: [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT] - name: KAFKA_ZOOKEEPER_CONNECT value: zoo1:2181,zoo2:2181,zoo3:2181 - name: KAFKA_CREATE_TOPICS value: mytopic:2:1 |
Nothing really odd here, we create a single kafka broker, connected our zookeeper cluster, which is defined below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
--- apiVersion: v1 kind: ReplicationController metadata: name: zookeeper-controller-1 spec: replicas: 1 selector: app: zookeeper-1 template: metadata: labels: app: zookeeper-1 spec: containers: - name: zoo1 image: digitalwonderland/zookeeper ports: - containerPort: 2181 env: - name: ZOOKEEPER_ID value: "1" - name: ZOOKEEPER_SERVER_1 value: zoo1 - name: ZOOKEEPER_SERVER_2 value: zoo2 - name: ZOOKEEPER_SERVER_3 value: zoo3 --- apiVersion: v1 kind: ReplicationController metadata: name: zookeeper-controller-2 spec: replicas: 1 selector: app: zookeeper-2 template: metadata: labels: app: zookeeper-2 spec: containers: - name: zoo2 image: digitalwonderland/zookeeper ports: - containerPort: 2181 env: - name: ZOOKEEPER_ID value: "2" - name: ZOOKEEPER_SERVER_1 value: zoo1 - name: ZOOKEEPER_SERVER_2 value: zoo2 - name: ZOOKEEPER_SERVER_3 value: zoo3 --- apiVersion: v1 kind: ReplicationController metadata: name: zookeeper-controller-3 spec: replicas: 1 selector: app: zookeeper-3 template: metadata: labels: app: zookeeper-3 spec: containers: - name: zoo3 image: digitalwonderland/zookeeper ports: - containerPort: 2181 env: - name: ZOOKEEPER_ID value: "3" - name: ZOOKEEPER_SERVER_1 value: zoo1 - name: ZOOKEEPER_SERVER_2 value: zoo2 - name: ZOOKEEPER_SERVER_3 value: zoo3 |
As you can see, unfortunately we cannot rely on a single replication controller with three instances as we do with kafka brokers. Instead, we run three distinct replication controllers, so that we can specify the zookeeper id of each instance, as well as the list of all brokers in the pool.
This is a bit of an annoyance because we therefore rely on three distinct services too:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
--- apiVersion: v1 kind: Service metadata: name: zoo1 labels: app: zookeeper-1 spec: ports: - name: client port: 2181 protocol: TCP - name: follower port: 2888 protocol: TCP - name: leader port: 3888 protocol: TCP selector: app: zookeeper-1 --- apiVersion: v1 kind: Service metadata: name: zoo2 labels: app: zookeeper-2 spec: ports: - name: client port: 2181 protocol: TCP - name: follower port: 2888 protocol: TCP - name: leader port: 3888 protocol: TCP selector: app: zookeeper-2 --- apiVersion: v1 kind: Service metadata: name: zoo3 labels: app: zookeeper-3 spec: ports: - name: client port: 2181 protocol: TCP - name: follower port: 2888 protocol: TCP - name: leader port: 3888 protocol: TCP selector: app: zookeeper-3 |
Doing so means traffic is routed accordingly to each zookeeper instance using their service name (internally to the kubernetes network that is).
Finally, we have our Kafka service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
--- apiVersion: v1 kind: Service metadata: name: kafka-service labels: app: kafka spec: ports: - port: 9092 name: kafka-port targetPort: 9092 protocol: TCP selector: app: kafka type: LoadBalancer |
That one is simple because we only have a single kafka application to expose.
Now running these in order is the easy part. First, let’s start with the zookeeper cluster:
1 2 |
$ kubectl create -f zookeeper-services.yaml $ kubectl create -f zookeeper-cluster.yaml |
Once the cluster is up, you can check they are all happy bunnies via:
1 2 3 4 5 6 7 |
$ kubectl get pods zookeeper-controller-1-reeww 1/1 Running 0 2h zookeeper-controller-2-t4zzx 1/1 Running 0 2h zookeeper-controller-3-e4zmo 1/1 Running 0 2h $ kubectl logs zookeeper-controller-1-reeww ... |
One of them should be LEADING, the other two ought to be FOLLOWERS.
Now, you can start your Kafa broker, first its service:
1 |
$ kubectl create -f kafka-service.yaml |
On AWS, you will need to wait for the actual EC2 load balancer to be created. Once that’s done, take its DNS name and edit the kafka-cluster.yaml spec to set it to KAFKA_ADVERTISED_HOST_NAME.
Obviously, if you have setup a DNS route to your load-balancer, simply use that domain instead of the load-balancer’s. In that case, you can set its value once for good in the spec.
Then, run the following command:
1 |
$ kubectl create -f kafka-cluster.yaml |
This will start the broker and automatically create the topic “mytopic” with one replica and two partitions.
At this stage, you should be able to connect to the broker and produce and consume messages. You might wantt o try kafkacat as a simple tool to play with your broker.
For instance, listing the topic on your broker:
1 |
$ kafkacat -b [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT]:9092 -L |
You can also produce messages:
1 |
$ kafkacat -b [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT]:9092 -P -t mytopic |
Consuming messages is as simple as:
1 2 |
$ kafkacat -b [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT]:9092 -C -t mytopic ... |
At this stage, we still don’t have a kafka cluster. One might expect that running something like this should be enough:
1 |
$ kube scale --replicas=3 rc/kafka-controller |
But unfortunately, this will only create new kafka instances, it will not automatically start replicating data to the new brokers. This has to be done out of band as I will explain in a follow-up article.
As a conclusion, I would say that using existing images is not ideal because they don’t always provide the level of integration you’d hope for. What I would rather do is build specific images that initially converse with an external configuration server to retrieve some information they need to run. This would likely make things a little more smooth. Though in the case of zookeeper, I am looking forward for its next release to support dynamic cluster scaling.
Great article ! Were you able to achieve replication with Kafka controller ?
Thanks
I followed your instruction, but cannot consume message, the error when running
kafkacat -b localhost:9092 -C -t mytopic
% ERROR: Topic test error: Broker: Leader not available
Question:
http://stackoverflow.com/questions/37761476/kafka-on-kubernetes-cannot-produce-consume-topics-closedchannelexception-error
This normally means your advertised hostname is not set correctly. Once I got that right, it all came together.
I face problem executing this command “$ kafkacat -b [AWS_LB_DNS_or_YOUR_DNS_POINTING_AT_IT]:9092 -L”.
I used the LoadBalancer Ingress url from the kubectl service description, for AWS_LB_DNS. I am running k8s on AWS.
But when i try using kafkacat, i get the following error :
%3|1467839779.839|FAIL|rdkafka#producer-1| ae8384fb643bc11e68f090299ad60946-2095064155.us-west-2.elb.amazonaws.com:9092/bootstrap: Connect to ipv4#52.39.207.9:9092 failed: Connection refused
%3|1467839780.855|FAIL|rdkafka#producer-1| ae8384fb643bc11e68f090299ad60946-2095064155.us-west-2.elb.amazonaws.com:9092/bootstrap: Connect to ipv4#52.39.207.9:9092 failed: Connection refused
%3|1467839781.873|FAIL|rdkafka#producer-1| ae8384fb643bc11e68f090299ad60946-2095064155.us-west-2.elb.amazonaws.com:9092/bootstrap: Connect to ipv4#52.39.240.92:9092 failed: Connection refused
%3|1467839782.889|FAIL|rdkafka#producer-1| ae8384fb643bc11e68f090299ad60946-2095064155.us-west-2.elb.amazonaws.com:9092/bootstrap: Connect to ipv4#52.39.207.9:9092 failed: Connection refused
%3|1467839783.913|FAIL|rdkafka#producer-1| ae8384fb643bc11e68f090299ad60946-2095064155.us-west-2.elb.amazonaws.com:9092/bootstrap: Connect to ipv4#52.39.240.92:9092 failed: Connection refused
% ERROR: Failed to acquire metadata: Local: Broker transport failure
Please help as to how I can resolve this
Alternatives kafka and zookeeper packages……
https://github.com/rawmind0/alpine-kafka
https://github.com/rawmind0/alpine-zk
Deployable in rancher and kubernetes…. 🙂
Can we AWS load balancer for Kafka? AFAIK, both producers and consumers need to access all Kafka brokers.
You mentioned “But unfortunately, this will only create new kafka instances, it will not automatically start replicating data to the new brokers. This has to be done out of band as I will explain in a follow-up article.”
Any progress on the follow-up article? If not, do you mind giving an outline of how you did this?
Thanks!
Re-stating what John Ramey said, it would be good to know how to dynamically scale Kafka cluster with data replication.
Thanks,
Chaitanya
Jumping on the wagon as well, any advice on how to dynamically scale the Kafka cluster?
Thank you,
Alex
Got it working for 3 Kafka brokers and 3 zookeepers!!
(Actually, ‘m’ zookeepers & ‘n’ Kafka brokers – any numbers of m & n).
The trick is to use a single service for all zookeeper instances & use “one service for each Kafka broker”.
I have used LoadBalancer type services for both of them.
Here is how it works for me:
I have a private kubernetes cluster.
Below I have pasted my yaml files.
For ‘m’ no of zookeepers and ‘n’ no.of Kafka brokers – write a shell script which will replace “instanceCnt” in the following files (“*-template.yaml”) by a counter (1,2,3,4,… ) in a loop and appends the resulting file into a .yaml file.
Like for zookeeper-controller-template.yaml, if you need to create 3 zookeeper instances, so create a shell script like this:
1. initialize myCnt=1
2. A while/for loop
3. Replace myCnt value in file and append it to a new file like this–> sed s/instanceCnt/$myCnt/g >> zookeeper-controller.yaml file.
4. increase myCnt
5. End loop
So, finally you will get a yaml file with ‘m’ no. of zookeeper instances.
Exact same thing/script is useful for Kafka-services and Kafka-cluster.
Here’s all my files:
1. zookeeper-service.yaml
—
apiVersion: v1
kind: Service
metadata:
name: zookeeper-svc
labels:
app: zookeeper-svc
spec:
ports:
– name: client
port: 2181
protocol: TCP
– name: follower
port: 2888
protocol: TCP
– name: leader
port: 3888
protocol: TCP
selector:
app: zookeeper
type: LoadBalancer
externalTrafficPolicy: Local
2. zookeeper-controller-template.yaml (Create as many instances as you want by replacing instaceCnt via a shell script)
—
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: zookeeper-deployment-instanceCnt
spec:
replicas: 1
strategy:
type: RollingUpdate
template:
metadata:
labels:
app: zookeeper
id: “instanceCnt”
spec:
containers:
– name: zoo-cont-instanceCnt
image: digitalwonderland/zookeeper
ports:
– containerPort: 2181
env:
– name: ZOOKEEPER_ID
value: “instanceCnt”
3. kafka-service-template.yaml
—
apiVersion: v1
kind: Service
metadata:
name: kafka-svc-instanceCnt
labels:
name: kafka-svc-instanceCnt
spec:
ports:
– port: 9092
name: kafka-port
protocol: TCP
selector:
app: kafka
id: “instanceCnt”
type: LoadBalancer
kafka-cluster-template.yaml
—
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: kafka-broker-instanceCnt
spec:
replicas: 1
strategy:
type: RollingUpdate
template:
metadata:
labels:
app: kafka
id: “instanceCnt”
spec:
volumes:
– name: kafka-log
persistentVolumeClaim:
claimName: kafka-pvc
containers:
– name: kafka-cont-instanceCnt
image: wurstmeister/kafka
ports:
– containerPort: 9092
volumeMounts:
– name: kafka-log
mountPath: “/var/log/kafka”
env:
– name: KAFKA_ADVERTISED_PORT
value: “9092”
– name: KAFKA_ADVERTISED_HOST_NAME
value: “broker-ip” <– external ip from your kafka-svc-instanceCnt
– name: KAFKA_BROKER_ID
value: "instanceCnt"
– name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper-svc:2181"
– name: KAFKA_LOG_DIRS
value: "/var/log/kafka/broker-instanceCnt"
– name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
terminationGracePeriodSeconds: 8
Please let me know here if you need for help on it.
Hi,
I am running Kafka cluster in Kubernetes but not on AWS or any cloud. But I am not able to access Kafka from outside of kubernetes environment. Can you please advise how can I get it to work. Here is the question I posted on StackOverflow –
http://stackoverflow.com/questions/41868161/kafka-in-kubernetes-cluster-how-to-publish-consume-messages-from-outside-of-kub
Thanks
Change the service port for kafka to NodePort, and force kafka to listen on that port.
What about the data volumes ? In prod, any suggestion ?
Hi,
first I read the article and implemented your solution, to soon realise that it is completely wrong.
Kafka can not run under a load balancer! Zookeeper however can. I see that you only run one broker and have the roadmap to work on multiple brokers. That can’t be done with an ELB because as a kafka consumer or producer you need to connect to the leader of the cluster. Doesn’t work unless you set up an ELB for each broker to expose the service itself. I solved that issue working on a k8s statefulset on the kafka layer and expose brokers via headless service aka broker-1.svcname.whatever, broker-2.svcname.whatever, etc . But again, accessing them out of the cluster is not trivial, as it is required to have one ELB for each pod (not a good solution) or use port mapping in kubernetes to the EC2 instance and expose via Route53. Again, not very nice solution.
Hope that if you figure out an elegant way, you let us know. Looking forward to see a forward post.
we just hit the same issue (“accessing them out of the cluster”) — we were all excited to move Kafka to Kubernetes but I’m not sure how we’ll get around this…. any ideas?
Have you found the solution? I am still have the same issue (“accessing them out of the cluster”)
I build a zookeeper and kafka,
bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
when I close 127.0.0.1:9092, then I got an error as
connect to ipv4.# 127.0.0.1:9092 failed.
How to fix it?
I am following the single instance method for development purpose, but my kafka pod is not running because it is not able to connect to zookeeper.
This is my kafka-service.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.1.0 (36652f6)
creationTimestamp: null
labels:
app: kafka
name: zook
spec:
ports:
– name: zookeeper-port
port: 2181
targetPort: 2181
protocol: TCP
selector:
app: kafka
—
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
app: kafka
spec:
ports:
– port: 9092
name: kafka-port
targetPort: 9092
protocol: TCP
selector:
app: kafka
type: LoadBalancer
And below is my kafka-replicationController.yaml
apiVersion: v1
kind: ReplicationController
metadata:
name: kafka-controller
spec:
replicas: 1
selector:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
– name: kafka
image: wurstmeister/kafka
ports:
– containerPort: 9092
env:
– name: KAFKA_ADVERTISED_HOST_NAME
value:
– name: KAFKA_ZOOKEEPER_CONNECT
value: zook:2181
– name: zookeeper
image: digitalwonderland/zookeeper
ports:
– containerPort: 2181
I have filed the same issue on stackoverflow here as:
https://stackoverflow.com/questions/52146001/kafka-not-able-to-connect-with-zookeeper-with-error-timed-out-waiting-for-conne
Please let me know what is the issue and solution for this ?