Connect to Kafka inside Kubernetes – access to pods using other transport protocols that require TCP forwarding

Alexandru Ersenie
Categories: Tech

Context

I’m going to keep this simple as possible, and try not to detail something that is so very well documented like concepts as Containers, Pods, Services and Ingress in Kubernetes. In just a few words, and looking at it from right to left:

  • containers run in pods;
  • connecting and working with a pod runs by using the concept of Service
  • Services are L4 Balancers; for complex scenarios like session persistence, cookies and so on, services will not suffice.
  • Ingresses are “paths” inside the cluster, that are configured and managed by a so called Ingress Controller
  • Ingress Controller is a L7 balancer, which deals with complex load balancing requirements like the ones mentioned above

Considering that we are using NGINX as Edge Proxy for controlling access into the Kubernetes cluster, a typical http request, being balanced to the backend pods would look like this:

http request being ingressed into the kubernetes cluster
Http request routed into the Kubernetes Cluster by the ingress Controller

Here is what happens:

  1. Request hits the cluster; assuming we have configured dns balancing for all Kubernetes Nodes, using a service of type NodePort, we guarantee that each Kubernetes Node will proxy the request into the service.
  2. The service will then load balance the request to one of the Nginx Pods (from port 30080 to port 80)
  3. Nginx will use the rules in the ingress (look at it like a vhost configuration) and will balance the request to the endpoints. The way the ingress controller works, is that it uses services for routing, but in terms of asking the API what are the endpoints for a service. Not going to go into details here.

This is actually easier to get up and running that many think, but it requires a little bit of understanding of how Kubernetes works. But that’s not the reason for this blog.

Now, what happens if you want to access Pods that use a lower level protocol, such as some databases, or Kafka, for the sake of this blog post?

Kafka uses a binary protocol over TCP. The protocol defines all APIs as request response message pairs. … TCP is happier if you maintain persistent connections used for many requests to amortize the cost of the TCP handshake, but beyond this penalty connecting is pretty cheap.

https://kafka.apache.org/protocol

And to make it even more complicated, what if you have a firewall in front of your Kubernetes Cluster, and you’d like to put another webserver in front of the Kubernetes Cluster? You cannot rely here on Apache…let’s see what our options are. I’m going to approach this by considering these two different aspects:

Make Kafka accessible from outside

Access Kafka from the outside using a L4 Load Balancer (like Nginx, HAProxy, or Envoy)

Make Kafka accessible from outside

Without going into the details of Kafka, i’m going to just paste in what Robin Moffatt detailed in an excellent manner in his blog post Kafka Listeners explained Do read the entire post for a complete understanding of how this works

Kafka Listeners

KAFKA_LISTENERS is a comma-separated list of listeners, and the host/ip and port to which Kafka binds to on which to listen. For more complex networking this might be an IP address associated with a given network interface on a machine. The default is 0.0.0.0, which means listening on all interfaces.listeners
KAFKA_ADVERTISED_LISTENERS is a comma-separated list of listeners with their the host/ip and port. This is the metadata that’s passed back to clients.advertised.listeners
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP defines key/value pairs for the security protocol to use, per listener name.listener.security.protocol.map
Kafka brokers communicate between themselves, usually on the internal network (e.g. Docker network, AWS VPC, etc). To define which listener to use, specify KAFKA_INTER_BROKER_LISTENER_NAME (inter.broker.listener.name). The host/IP used must be accessible from the broker machine to others.
Kafka clients may well not be local to the broker’s network, and this is where the additional listeners come in.
Each listener will, when connected to, report back the address on which it can be reached. The address on which you reach a broker depends on the network used. If you’re connecting to the broker from an internal network it’s going to be a different host/IP than when connecting externally.
When connecting to a broker, the listener that will be returned to the client will be the listener to which you connected (based on the port).
kafkacat is a useful tool for exploring this. Using -L you can see the metadata for the listener to which you connected.

Let me detail the problem with trying to subscribe to a Kafka cluster. We will use the tool mentioned above (kafkacat)

If you run Kafkacat from inside the Kubernetes Cluster, and access the brokers at the internal port (9092), you will get the following list of brokers back. We will have a so called Kafka service listening on port 9000, routing requests to internal port 9092 on all Kafka Broker ports. So we will connect to this service

kafkacat -L -b kafka:9000
Metadata for all topics (from broker -0: kafka-0.kafka:9092/bootstrap):
 3 brokers:
  broker 0 at kafka-0.kafka:9092 (controller)
  broker 2 at kafka-2.kafka:9092
  broker 1 at kafka-1.kafka:9092

The first broker in the list got the hit, and responded with its known list of brokers. What we can see here, is that the address is in the form of “pod.service”, which will be NAT translated in Kubernetes, wherever we access it from inside the cluster. This is what Kubernetes will solve for us.

Kafka Internal Communication using Kubernetes Service

What i am trying to say here is that if you plan on running Kafka only internal, and subscribe to it from the cluster only, then internal listeners will suffice. But what if we want to subscribe to this brokers from the outside ?

Imagine i have a subscriber somewhere in the cloud, and the broker address i have to subscribe to is kafka-0.kafka:9092…say what?

Referring to the blog post above, this line is of importance:

When connecting to a broker, the listener that will be returned to the client will be the listener to which you connected (based on the port).

Luckily enough, Kafka thought about this when they implemented the concept of listeners, internal and external. So we need an external listener in order to subscribe from the outside world

Kafka external listeners – Use the Kubernetes Statefulset ordinal functionality

Listeners to publish to ZooKeeper for clients to use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners it is not valid to advertise the 0.0.0.0 meta-address.

https://docs.confluent.io/current/installation/configuration/broker-configs.html

Summarizing it, this is what we need to make it work:

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT 
KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:900${HOSTNAME##*-}
KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-${HOSTNAME##*-}.kafka:9092,EXTERNAL://kafka.mydomain.com:900${HOSTNAME##*-}  KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL

So what is happening here? Let’s take it one by one:

  1. We are configuring kafka to run with two listeners, listenin on all interfaces (0.0.0.0). Because we are running kafka as a statefulset, the pods will be named kafka-0, kafka-1, kafka-2 and so on. Which means the cluster will look like this:
    1. kafka-0:
      KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092, EXTERNAL:// 0.0.0.0:9000
      KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-0.kafka:9092,EXTERNAL://kafka.mydomain.com:9000
    2. kafka-1:
      KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092, EXTERNAL:// 0.0.0.0:9001
      KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-0.kafka:9092,EXTERNAL://kafka.mydomain.com:9001
    3. kafka-2:
      KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092, EXTERNAL:// 0.0.0.0:9002
      KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-0.kafka:9092,EXTERNAL://kafka.mydomain.com:9002
      As you see, we are using the ordinal functionality of the statefulset, and incrementing the external port on each kafka broker.
  2. We are letting Kafka know that for internal communication, it should use the internal listener.

Now this is where it gets messy 🙂 You may use a service to forward requests incoming on one port, to one, and only one internal port. Which means, if we’d want to use a normal Kubernetes Service, balancing the external requests, we would actually need the service to forward requests to ports:

  • 9000
  • 9001
  • 9002
  • 9003

Not possible. We need to take care of the balancing ourselves. Meet the Kubernetes Headless Service

Expose Kafka Brokers to the outside world using individual headless services

We will provide each kafka broker with its’ own headless service, and we’ll take care of the balancing further west (in the frontend)

Each headless service will listen to port 9000, but forward the requests to the ordinal of the backend pod:

  • 9000-> 9001 for kafka-0
  • 9000->9002 for kafka-1
  • 9000->9003 for kafka-2
  • and so on

Here are the service definitions for the first two headless services, kafka-0 and kafka-1

apiVersion: v1
kind: Service
metadata:
  name: kafka-0
  namespace: kafka
spec:
  clusterIP: None
  ports:
  - name: broker
    port: 9000
    protocol: TCP
    targetPort: 9000
  selector:
    statefulset.kubernetes.io/pod-name: kafka-0
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-1
  namespace: kafka
spec:
  clusterIP: None
  ports:
  - name: broker
    port: 9000
    protocol: TCP
    targetPort: 9001
  selector:
    statefulset.kubernetes.io/pod-name: kafka-1
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}

By now we have the following view:

Forwarding TCP (L4 Balancing) into the Cluster using Nginx Ingress Controller

In order to provide L4 balancing for Kafka communication, we now need to configure Nginx to forward tcp requests

This is the tcp-services config map used by Nginx, that we will now configure for Kafka communication:

apiVersion: v1
data:
  "9000": remotedatabone/kafka-0:9000
  "9001": remotedatabone/kafka-1:9000
  "9002": remotedatabone/kafka-2:9000
  "9003": remotedatabone/kafka-3:9000
  "9004": remotedatabone/kafka-4:9000
kind: ConfigMap
metadata:
  name: tcp-services
  namespace: ingress-nginx

So what is happening here? We are instructing Nginx to:

  • Map packages from 9000 to the headless service kafka-0
  • Map packages from 9001 to the headless service kafka-1
  • and so on

Why do we need this? Let’s take a look again on the list of external listeners that we get for a cluster of three brokers:

  1. EXTERNAL://kafka.mydomain.com:9000
  2. EXTERNAL://kafka.mydomain.com:9001
  3. EXTERNAL://kafka.mydomain.com:9002

When we subscribe to the cluster, we will start a direct communication with these brokers

Listening for Kafka Packages in Ingress Controller and forwarding them to the internal ports

When packages reach the cluster, they need to reach an Ingress Pod. That is the reason we use NodePort for the Nginx Ingress. This guarantees that each node in the cluster will open this port, listen on it, and forward the packages to the internal port

Additionally to the port used for http requests (30080 , mapping requests to the nginx port 80 – the webserver port), we now need to define several other nodeports:

apiVersion: v1
kind: Service
metadata:
  name: ingress-nginx
  namespace: ingress-nginx
spec:
  externalTrafficPolicy: Local
  ports:
  - name: http
    nodePort: 30080
    port: 80
    protocol: TCP
    targetPort: 80
  - name: proxied-tcp-9000
    nodePort: 30900
    port: 9000
    protocol: TCP
    targetPort: 9000
  - name: proxied-tcp-9001
    nodePort: 30901
    port: 9001
    protocol: TCP
    targetPort: 9001
  - name: proxied-tcp-9002
    nodePort: 30902
    port: 9002
    protocol: TCP
    targetPort: 9002
  - name: proxied-tcp-9003
    nodePort: 30903
    port: 9003
    protocol: TCP
    targetPort: 9003
  selector:
    app: ingress-nginx
  sessionAffinity: None
  type: NodePort
status:
  loadBalancer: {}

We now have the following image:

We have now configured everything we need to provide end to end communication, from the edge (Nginx ) to the kafka brokers. We could make a query now directly over the Nginx Nodeport to test the functionality. Since we still have no balancing, we need to ask one broker at a time:

kafkacat -L -b $mynode:30900
Metadata for all topics (from broker -0: kafka.mydomain.com:30900/bootstrap):
 3 brokers:
  broker 0 at kafka.mydomain.com:9000 (controller)
  broker 2 at kafka.mydomain.com:9002
  broker 1 at kafka.mydomain.com:9001

Now let’s take care of the balancing in front of the Cluster. For that we will use Envoy, which i find to be the next gen Load Balancer.

Use Envoy Load Balancer

The usecase we would like to implement is following:

  • Connect to a common url and query the list of brokers
  • We would not like to connect to specific brokers, but instead leave the selection of the broker to the balancer

So we’ll need to connect to something like : kafka.mydomain.com:9080 Let’s call this the discovery url. It is of course on your own to configure this properly in your DNS records

Since we cannot rely on a Kubernetes Service to spread the load (see above, the ports differ from one backend pod to another), we need to take care of the balancing in front of Kubernetes

We will set up so called “listeners“, which will:

  • take requests on each of the broker’s configured port (9000, 9001, 9002, etc.)
  • balance requests on the specific broker services via ingress (9000 -> 30900 -> 9000; 9001 -> 30900 -> 9001, etc)
  • take first, the discovery request, and balance it over the above configured listeners

Let’s take a look at the configuration in envoy:

static_resources:
  listeners:
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9080
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          "weighted_clusters": {
            "clusters": [
            { "name": "kafka-0", "weight": 33},
            {"name": "kafka-1", "weight": 33},
            {"name": "kafka-2", "weight": 33}
            ]
          }
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
          idle_timeout: 0s
          max_connect_attempts: 5
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9000
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          cluster: kafka-0
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
          idle_timeout: 0s
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9001
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          cluster: kafka-1
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9002
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          cluster: kafka-2
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9003
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          cluster: kafka-3
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
  - address:
      socket_address:
        address: 0.0.0.0
        port_value: 9004
    filter_chains:
    - filters:
      - name: envoy.tcp_proxy
        config:
          stat_prefix: ingress_tcp
          cluster: kafka-4
          access_log:
          - name: envoy.file_access_log
            config:
              path: /dev/stdout
  clusters:
  - name: kafka-0
    connect_timeout: 0.25s
    type: strict_dns
    lb_policy: round_robin
    hosts:
    - socket_address:
        address: 10.49.11.101
        port_value: 30900
    - socket_address:
        address: 10.49.11.102
        port_value: 30900
    - socket_address:
        address: 10.49.11.103
        port_value: 30900
    - socket_address:
        address: 10.49.11.104
        port_value: 30900
- socket_address:
        address: 10.49.11.105
        port_value: 30900
    - socket_address:
        address: 10.49.11.106
        port_value: 30900
    - socket_address:
        address: 10.49.11.107
        port_value: 30900
    - socket_address:
        address: 10.49.11.108
        port_value: 30900
    - socket_address:
        address: 10.49.11.109
        port_value: 30900
    - socket_address:
        address: 10.49.11.110
        port_value: 30900
    - socket_address:
        address: 10.49.11.111
        port_value: 30900
    - socket_address:
        address: 10.49.11.112
        port_value: 30900
    health_checks:
    - timeout: 1s
      interval: 1s
      unhealthy_threshold: 1
      initial_jitter: 1s
      healthy_threshold: 5
      tcp_health_check: {}
      always_log_health_check_failures: true
      event_log_path: "/dev/stdout"
    close_connections_on_host_health_failure: true
  - name: kafka-1
    connect_timeout: 0.25s
    type: strict_dns
    lb_policy: round_robin
    hosts:
    - socket_address:
        address: 10.49.11.101
        port_value: 30901
    - socket_address:
        address: 10.49.11.102
        port_value: 30901
    - socket_address:
        address: 10.49.11.103
        port_value: 30901
    - socket_address:
        address: 10.49.11.104
        port_value: 30901
    - socket_address:
        address: 10.49.11.105
        port_value: 30901
    - socket_address:
        address: 10.49.11.106
        port_value: 30901
    - socket_address:
        address: 10.49.11.107
        port_value: 30901
    - socket_address:
        address: 10.49.11.108
        port_value: 30901
- socket_address:
        address: 10.49.11.109
        port_value: 30901
    - socket_address:
        address: 10.49.11.110
        port_value: 30901
    - socket_address:
        address: 10.49.11.111
        port_value: 30901
    - socket_address:
        address: 10.49.11.112
        port_value: 30901
    health_checks:
    - timeout: 1s
      interval: 1s
      unhealthy_threshold: 1
      initial_jitter: 1s
      healthy_threshold: 5
      tcp_health_check: {}
      always_log_health_check_failures: true
      event_log_path: "/dev/stdout"
    close_connections_on_host_health_failure: true
  - name: kafka-2
    connect_timeout: 0.25s
    type: strict_dns
    lb_policy: round_robin
    hosts:
    - socket_address:
        address: 10.49.11.101
        port_value: 30902
    - socket_address:
        address: 10.49.11.102
        port_value: 30902
    - socket_address:
        address: 10.49.11.103
        port_value: 30902
    - socket_address:
        address: 10.49.11.104
        port_value: 30902
    - socket_address:
        address: 10.49.11.105
        port_value: 30902
    - socket_address:
        address: 10.49.11.106
        port_value: 30902
    - socket_address:
        address: 10.49.11.107
        port_value: 30902
    - socket_address:
        address: 10.49.11.108
        port_value: 30902
    - socket_address:
        address: 10.49.11.109
        port_value: 30902
    - socket_address:
        address: 10.49.11.110
        port_value: 30902
    - socket_address:
        address: 10.49.11.111
        port_value: 30902
    - socket_address:
        address: 10.49.11.112
        port_value: 30902
    health_checks:
    - timeout: 1s
interval: 1s
      unhealthy_threshold: 1
      initial_jitter: 1s
      healthy_threshold: 5
      tcp_health_check: {}
      always_log_health_check_failures: true
      event_log_path: "/dev/stdout"
    close_connections_on_host_health_failure: true
admin:
  access_log_path: "/dev/null"
  address:
    socket_address:
      address: 0.0.0.0

The only thing left to do is to configure your DNS to point to the server running envoy, run envoy, and then test:

Start envoy stand alone ( i am using Centos here, follow these steps for installing: https://www.getenvoy.io/install/envoy/centos/ )

envoy --config-path envoy-config.yaml

Test connection with Kafkacat

kafkacat -L -b kafka.mydomain.com:9080
Metadata for all topics (from broker -1: kafka.mydomain.com:9080/bootstrap):
 3 brokers:
  broker 0 at kafka.mydomain.com:9000 (controller)
  broker 2 at kafka.mydomain.com:9002
  broker 1 at kafka.mydomain.com:9001
 20 topics:
.....

That’s it. And now, everything looks like this:

Envoy / Kubernetes / Nginx / Kafka Connectivity : Connect to Kafka inside Kubernetes Cluster
Envoy / Kubernetes Nginx Ingress / Kafka Headless Services : Access to Kafka in Kubernetes from outside

Bind other services inside Kubernetes to outside world

I have shown how you can use the concept of headless services for accessing Kafka containers running inside Kubernetes. The concept is the same for any other services using a lower protocol as L7, which requires special tcp forwarding of packages.

Conclusion:

  • Configure Nginx as TCP forwarder; Nginx needs to know the backend service where to forward the request to
  • Add TCP Ports as Nodeports to the NginxService, so each Node can be configured as an TCP Nginx Listener
  • Configure Envoy as TCP listener, balance requests to the Kubernetes Nodes on the Nodeports
  • Configure headless services to get targeted pods as backend pods

That is it. Let me know if you need any further help

Best,

Alex

Print Friendly, PDF & Email