Running Kafka in Kubernetes, Part 1: Why we migrated our Kafka clusters to Kubernetes
At Wise, we chose to migrate our Apache Kafka clusters, previously running on Amazon Web Services (AWS) EC2 instances, into a multi-cluster Kubernetes setup. This article is the first part of a two-part series aiming to outline the motivations behind this choice and the challenges we faced.
The Realtime Data Platform team at Wise provides realtime messaging and stream processing capabilities as a platform to 500+ services at Wise. Product teams use this platform to develop features that Wise’s customers have come to depend on today. These include instant transfers, fast verification checks, payment processing and much more. Our mission statement to the users of our platform is:
“To enable Wise to process data in realtime, in a scalable, reliable, secure, cost effective, easy and convenient way. We aim to build a self-service platform and tools, anticipating the needs of our product teams.”
A pivotal component of our platform is Apache Kafka. We currently maintain 6 Kafka clusters, constituting of 30+ brokers, processing billions of messages every day.
To improve the reliability and scalability of our clusters, we decided to move our Kafka clusters into our Kubernetes infrastructure. The rest of this article outlines where we started, our motivations and the challenges we faced.
Where we started
We have dedicated Kafka clusters for different functions at Wise. Three of these clusters are:
- Service Kafka Cluster: Used mainly for messaging between our application services. This cluster facilitates publishing/consuming of domain events that power business/domain logic of our applications. As this cluster is mainly used for messaging, most topics in this cluster have time-based retention. This means the service cluster has moderate requirements with respect to disk size. This cluster serves more applications than any other Kafka cluster at Wise.
- Stream Processing Cluster: At Wise, product teams leverage our stream processing platform to create pipelines to do stateful realtime aggregations. This is made possible by our Streaming Engine Platform. This platform abstracts the use of stream processing frameworks such as Kafka Streams and Apache Flink, to provide a common DSL through which product teams can build stream processing applications. The stream processing cluster is dedicated to the operations of these streaming applications. All messages published from the Service Kafka cluster are collected into compacted topics with no time-based retention in the stream processing cluster. In effect, this cluster stores the entire state of domains across Wise within its topics. This allows streaming applications to perform historical aggregations over any period of time needed. New streaming applications can decide to start aggregations from a certain point in the past till present day. This cluster has much higher disk space requirements compared to the others and also high compute requirements.
- Logging Cluster: This cluster collects application logs from application services and provides this data to our observability stack (currently an ELK stack).
Before we migrated, we managed our Kafka and backing Zookeeper clusters in AWS EC2 instances. We managed them with the following tools:
- Terraform: to provision the AWS EC2 instances and other AWS resources for networking, storage etc.
- Ansible Playbooks/Packer: to install and configure Kafka/Zookeeper on the provisioned instances.
- Cruise Control: to automate data rebalancing and detection/repair of cluster anomalies (e.g disk distribution skews, topic replication factor anomalies etc.) in the Kafka clusters.
- DNS: to configure static addresses through which application services connect to Kafka brokers.
These clusters were mostly static and instances always retained the same IP addresses. The IP addresses where registered in our DNS and applications used the defined domain names to access the clusters. An update to an instance’s IP address required updates to the corresponding DNS records.
Adding/removing a broker was slow and involved a fair bit of toil. Rolling out new broker versions or configurations, OS patching and other changes could take hours to complete at a point, especially as the number of brokers grew. Updating the clusters involved:
- Updating and applying our terraform modules,
- Running Ansible playbooks on new/existing instances,
- Updating DNS records,
- In some occasions, updating client-side configurations of all our applications to reflect address/DNS changes.
Our Kafka and Zookeeper instances were run across 3 availability zones to ensure we were resilient against availability zone failures. Kafka partition replicas were distributed across availability zones using Kafka’s rack aware replica assignment feature. We set the following configurations across all our Kafka clusters:
- Default replication factor of topics was set to 3 (hence we had one replica per availability zone).
min.insync.replicassetting on the brokers was set to 2. This ensured every message is written to brokers across, at least, 2 availability zones.
- The nature of data handled by most application services are critical so most (if not all) producers had
Application services at Wise are (unless in exceptional cases) run in a Kubernetes multi-cluster, managed by our central infrastructure team. These services are capable of accessing our Kafka brokers directly.
Connections to our Kafka clusters are secured over mutual transport layer security (mTLS). A client application communicates to our Kafka brokers through envoy sidecars. Applications initiate plain-text connections to their envoy sidecar, and the envoy sidecar upgrades these plain-text connections to mTLS connections. The envoy sidecar then proxies these requests directly to secure ports on the brokers. We simplify certificate management with the SPIFFE protocol. SPIFFE defines a protocol to generate short-lived certificates for both clients (in their envoy sidecars) and brokers. Watch this video if you want to learn more about how we secured client connections to our Kafka clusters.
Application envoy sidecars were setup with static configurations. An application needs to be rolled out to update how Kafka requests are proxied. Adding a new broker requires adding new mappings to these configurations so as to be proxy requests to the new broker.
Problems with the previous setup
We had a couple of difficulties with this setup:
- Our clusters had quite a bit of toil associated with them, reducing the amount of time we could put into building features and functionality to help product teams address their use-cases. It took significant amount of time to scale our Kafka clusters, replace faulty machines, apply OS patches, rollout new versions of Kafka and other admin tasks. We also had to keep up with recommendations and changes required for other supporting systems (observability, security etc) when they were needed.
- The networking between clients and brokers was mostly static, and sometimes could require clients to make changes to how they connected to Kafka. This required rolling out changes to all clients connecting to Kafka. Making these changes to hundreds of services was time-consuming. Whenever a new broker is added to the Kafka cluster, we also needed to rollout changes to all application services before they could access the new broker.
- We had no self-healing or automated remedial actions for our clusters. Failures to broker instances always required manual intervention.
- We needed to manage multiple sources of configuration to manage a Kafka cluster (e.g Terraform, Ansible, service envoy config, DNS). A new joiner to the team would take a bit more time to onboard and understand where these moving parts were and how to manage them.
- In the event of a disaster/failure, with a replication factor of 3, we could tolerate one failure at most (or more if all failures are localised to a single availability zone). A failure to two brokers in two separate availability zones would lead to unavailable partitions for most producers at Wise. This is a bigger risk during an availability zone failure, which could last several hours. At such times, our systems would be at high risk.
- Our clusters were not immutable. Using Ansible playbooks opened some of our clusters to the possibility of configuration drift between brokers.
- Any form of automation we built would not be cloud agnostic, and we would need to build tools/scripts specifically for AWS.
Why run Kafka in Kubernetes?
Our major motivations for moving Kafka to Kubernetes were:
- Leveraging Kubernetes to improve scalability of our Kafka clusters. This will allow us to easily satisfy growing usage of the platform by our product teams.
- Increase availability and reliability of the cluster using Kubernetes’ native self-healing functionality.
- Setting up better (and cloud agnostic) automation around administration of our Kafka clusters, through the use of liveness/readiness probes and operators.
- By moving to Kubernetes, which is managed by our central infrastructure team, we have reduced the time spent by the Realtime Data Platform team maintaining our Kafka infrastructure. This gives us more time to focus on building a self-service platform and tools to improve the productivity of our teams and enable them to ship features to market faster.
The table below shows the difference between both setups in some common operations:
We decided that we would migrate only our Kafka clusters but not our Zookeeper clusters. Considering that Zookeeper was set to be deprecated in the near future, we decided to let it remain in AWS EC2 instances until then. Once an updated Kafka version that doesn’t rely on Zookeeper is available, we could upgrade our Kafka clusters and then decommission our Zookeeper instances. However, some of the methods used to migrate our Kafka clusters to Kubernetes could be extended to migrating Zookeeper if desired.
- Using auto-scaling groups for scaling, network load balancers for discovery and automation scripts for rollout and other automations.
- Running Kafka in containers within a container OS.
A major reason for choosing Kubernetes was because it is the primary way of hosting applications at Wise. Our central platform team has a lot of experience running applications in Kubernetes. They were also able to provide us with the necessary support required to successfully run Kafka in Kubernetes. We wanted to be able to leverage that support to further reduce the toil in our team. Both considered alternatives would have still required significant effort from the Realtime Data Platform team as we would have had to maintain them, defeating the objective of reducing toil.
Migrating a production Kafka cluster to Kubernetes is not without its challenges. Kubernetes (and a multi-cluster Kubernetes at that!) provides a very dynamic environment. Hosting a stateful service like Kafka required a lot of design and planning with co-operation from other infrastructure teams within Wise to achieve success. Some of these challenges were:
1. Zero Downtime Migration.
As mentioned earlier, our platform is used by mission critical applications providing services to Wise’s customers all around the World. It was very important that no downtime was caused due to this migration or our customers would experience degraded performance of our services.
We needed to plan for how Kafka would work in a multi-cluster Kubernetes setup. Some of these challenges included:
- Kafka brokers are stateful and need to be able to maintain identity. Their identity and data they store are not interchangeable with other brokers.
- Kubernetes liveness and readiness probes have to be configured based on Kafka health metrics. This ensures broker pods are not killed unnecessarily during operation and that rollouts are paced correctly to avoid shrinking the number of in-sync replicas of partitions, possibly to a level that might cause unavailability.
- In a multi-cluster setup, each Kubernetes cluster operates independently. In our case, we run two Kubernetes clusters. The Kafka cluster needs to be designed in such a way that it can sustain operations that happen simultaneously on both Kubernetes clusters, e.g broker pods being rescheduled at the same time on both Kubernetes clusters.
- The Kafka cluster needs to be resilient against both availability zone and Kubernetes cluster failures.
3. Resilient Replica Distribution.
Our Kubernetes clusters are setup such that Kubernetes workers in each cluster are distributed between availability zones. We need to ensure an even (or as even as possible) distribution of Kafka brokers between both availability zones and Kubernetes clusters. This means that, for a six broker Kafka cluster, having 3 availability zones and 2 Kubernetes clusters, we would have 2 brokers in each availability zone, and 3 brokers in each Kubernetes cluster:
The challenge here is distributing replicas such that a partition is resilient to availability zone or kubernetes cluster failures. Kafka’s native rack aware replica assignment is not sufficient in this case as it only considers a single dimension i.e the broker.rack (previously set to the availability zone). To solve this problem, we need to be able to distribute replicas evenly across two dimensions i.e both availability zones and kubernetes clusters.
4. Resource Allocation.
Kafka needs dedicated resources. Since we were going to host Kafka in the same Kubernetes cluster that other services were going to be hosted in, it was important to ensure that the workers in our Kubernetes clusters could satisfy the resource requirements (CPU, memory etc.) of our Kafka clusters and could easily scale when needed. We also needed to ensure that we didn’t find ourselves in a position where Kafka is starved of resources (cpu, memory e.t.c) by other services and vice-versa.
Communication between brokers, and between brokers and clients also posed the following challenges:
- Broker pods will have dynamic IP addresses. Clients need to be able to discover these new broker addresses without any disruptions. This needs to be possible across Kubernetes clusters.
- By design, Kafka clients need to be able to connect to specific brokers in the cluster. Client requests can not be load balanced between brokers (except for metadata requests). Kubernetes’ design caters for load balanced http services better. This problem is made a little more difficult by the fact that a client in one Kubernetes cluster might have to connect to a broker in another Kubernetes cluster. This removes the possibility of relying on any of Kubernetes’ native networking features.
In Part 2, we will talk about how we overcame the challenges outlined in Part 1 so we could successfully migrate our Kafka clusters to Kubernetes.