How i log-pressured our Elasticsearch in Kubernetes using tdagent, and a series of unfortunate configurations one should never do

Alexandru Ersenie
Categories: Tech

Kibana is down again!

This was coming up almost daily. Right in the morning, when we were having our stand-ups. Right at the beginning of the day. Right in my face

Use case

We are using self hosted Kubernetes. This has grown bigger and bigger lately, so while we started with a couple of Nodes, we are now operating more than 20 Kubernetes Workers / Kubernetes Cluster.

The reason we’re growing is that we need more: more cpu, more memory, more deployments, more stateful sets, more technological stacks, more customers.

Another reason we’re growing is that our developers need to build fast. And when i say fast, i mean “now, speedy, and with resource guarantee” . So we are growing. And while we are groing, so is our logging backend.

Pretty much every new deployment, may it be a customer, a pipeline, an operator will log. And we want to see it. We’re growing fast!

History

When i started implementing the logging solution, we were still running ElasticSearch 3, on bare metal, using logstash for processing and good old Grok patterns. But then something happened, and out of nowhere there was this awesome container concept, and even better, there was a management tool to handle all of this.

I needed to adapt. Say hello to tdagent. Or fluent. Even i cannot tell the difference anymore, for they are pretty much the same thing

I implemented a straight-forward solution, having td-agent as log forwarder running as a daemon-set, and pushing data directly into Elasticsearch.

The concept is pretty simple:

  1. Run td-agent as a daemon-set, therefore a pod will be running on each Kubernetes Node
  2. Mount the node’s container logging path into the pod, so td-agent can access the logs
  3. Tail the logs
  4. Process the logs (using filters)
  5. Flush the logs (using matches)

There are a lots of articles out there how to do this, and what is what, and what is the meaning of what. I am here to tell another story. The keywords that i want you to have in mind are: routing, tags, retags, flushing threads

Log Event lifecycle

I do recommend looking into the excellent documentation for the fluent bit, explaining how logs should be treated as streams, and how the lifecycle of an event looks like: https://docs.fluentbit.io/manual/getting_started

Let me detail shortly, and i hope understandable for everybody:

  1. The input define the logging stream, in other words where the log events come from
  2. The parser will try to give a structure to unstructured events (json parser, grok parser, regex parser, etc.)
  3. The filter will process the log event: it may enrich it, delete some fields, update some fields, or it may label it, and by doing so route it to some other track than the main log street
  4. The buffer is where the processing ends, and the log event reaches the “package” . Think of this like a convey belt. The package is now ready to be routed
  5. The route is the address where the package should be sent to. It may be a file, a database, a logging ingestion tool, etc.

Running in Kubernetes

Containers run in pods. Pods run on nodes.

A pod will always be running on one, and only one node at a time. And a pod may have one, two, or multiple containers. Each of these containers will be generating a unique log file, which will be stored and rotated in the filesystem of the node, and have something like this as filename:

backend-844468db56-g8cxc_area19_backend-services-578bfd34df699ebfbfb04597fde1d5ffeb4378e48857183f501a7e695b9e1227.log

So what does all this tells us, and why is this relevant?

Well it is relevant because we run in a dynamic ecosystem like Kubernetes, where everything goes up and down, and repeat. Deleted pods will be started with a new name on another node. We need to track back in time what and where it happened. So we need some more information about the “running backbone” We need to know things like:

  • what was the name of the container?
  • what was the pod for this container?
  • what was the namespace for this pod?
  • on what node was this pod running?
  • what are the labels and annotations of this pod?
  • other additional information

Let’s take the filename given as example above, and dissect it:

  • backend-844468db56-g8cxc : this is the pod name
  • area19 : this is the namespace where the pod was running
  • backend-services: this is the name of the container in the pod. Multiple containers will of course have each its own name
  • 578bfd34df699ebfbfb04597fde1d5ffeb4378e48857183f501a7e695b9e1227 : this is the docker container id, which makes it unique across pods, namespaces, nodes

As you may see, there isn’t anything about labels, annotations and so on. So how do we get all that stuff added to the logs ? We’ll get there in a minute

Container logs are files; log files are filled and rotated; so let’s tail and process them

Tail the logs

To use a log, you need to define a log source. So getting back to step 1 in the log event lifecycle, we’ll use tailed files as source for our logs. Let’s assume that our “backend-services” container is logging in json, and that we have two pods running the same container type, “backend-services” in different namespaces (although they may run under the same namespace as well) Let’s define the source and explain what happens:

 <source>

      @type tail
      path /var/log/containers/*backend-services*.log
      pos_file /var/lib/tdagent/{{ .Release.Namespace }}/position/backend-services.log.pos
      tag backend.*

      # Parse log statement as json structure
      @include parser.json

    </source>

So we’re pretty much saying:

  1. I want to tail each and every file under /var/log/containers, that matches the wildcard “*backend-services*”
  2. Please tag this as backend.* (this is the most wicked part of this post)
  3. Parse the logs as structured json logs

Let us now assume we have the following log files, and see how their logs flow through tdagent.

  • backend-7944787ff8-wlpzj_ns1_backend-services-a430a88f939a0b2c55f0ee7033fdc0932b514f242119cd232ba872c46f78be09.log
  • backend-7944787gg7-wlpzj_ns2_backend-services-e7798ec2a19603d69a4e7aa24e39fab8bcf1187f555c3cd333c8e9080ae950e1.log
  • backend-796c9b48d9-m8xvg_ns3_backend-services-1f4767a804adbcd3789f1879541bd47c2af2a43ad120d0470f875d13bb2a88e4.log

Tag the logs

We are using tags, so we can later route the events and handle them on “different tracks” With the logs above, and the rule defined in the source, tdagent will create therefore following logging tracks (flows, or workflows):

  • backend.backend-7944787ff8-wlpzj_ns1_backend-services-a430a88f939a0b2c55f0ee7033fdc0932b514f242119cd232ba872c46f78be09.log
  • backend.backend-7944787gg7-wlpzj_ns2_backend-services-e7798ec2a19603d69a4e7aa24e39fab8bcf1187f555c3cd333c8e9080ae950e1.log
  • backend.backend-796c9b48d9-m8xvg_ns3_backend-services-1f4767a804adbcd3789f1879541bd47c2af2a43ad120d0470f875d13bb2a88e4.log

Funny, we know they are the same kind of logs, but we treat them separately? Why would we do that? Hold your horses, we are getting there. We can’t just retag and group them at the moment, because we’d be loosing the one essential information that we need in the next step for getting Kubernetes Metadata, and that is that big, fat log prefix

By now, a typical log record, going through the tdagent highway, would look like this:

{"log":"{"Timestamp":"2019-11-17T11:21:44.478+0000","Level":"INFO","Version":"Payara 5.192","LoggerName":"backend.app","ThreadID":"36","ThreadName":"http-thread-pool::http-listener(2)","TimeMillis":"1573989704478","LevelValue":"800","LogMessage":" Cannot produce root URL"}n","stream":"stdout","time":"2019-11-17T11:21:57.27285518Z"}

As you can see, there is nothing there telling us things like pod name, container name and so on. So let’s get that information

Enrich the logs with Kubernetes Metadata

We want to do that because in order to enrich the log events with kubernetes metadata, tdagent needs to know things like: pod name, container name, namespace, and container id (pretty much what makes a container so unique) so it can go to Kubernetes API and say:

Hey Kube API? What do you know about this container, in this pod, in this name running here? Lemme' know so i can add some info to this log

So after tailing the logs, we are now processing them, and enriching them using filters. The filter used for the operation above is called “kubernetes metadata filter“, and works pretty much like this:

<filter backend**>
      @type kubernetes_metadata
      skip_namespace_metadata true
      skip_master_url true
    </filter>

This basically will enrich every log statement from coming from the files above with kubernetes metadata, so our our logs have been parsed, processed, and enriched.

The log statement now looks like this:

{
  "_source": {
    "stream": "stdout",
    "docker": {
      "container_id": "a430a88f939a0b2c55f0ee7033fdc0932b514f242119cd232ba872c46f78be09"
    },
    "kubernetes": {
      "container_name": "backend-services",
      "namespace_name": "ns1",
      "pod_name": "backend-7944787ff8-wlpzj",
      "container_image": "myregistry:5000/docker-backend-services:5.0",
      "container_image_id": "docker-pullable://myregistry.e-dict.net:5000/docker-backend-services@sha256:081843841f9662ecb75a4ed8054f1d21f49aa6f01df1677ac3cf8765d0754060",
      "pod_id": "f91a9880-0928-11ea-9ddb-005056aa17f3",
      "host": "kbw01",
      "labels": {
        "pod-template-hash": "79ff8c8cf6",
        "app_kubernetes_io/component": "backend",
        "app_kubernetes_io/instance": "ns1"
      },
      "master_url": "https://10.96.0.1:443/api",
      "namespace_id": "4de44959-057f-11ea-b5d1-005056aa1a79",
      "namespace_labels": {
        "stage": "SystemTest"
      }
    },
    "Level": "INFO",
    "LoggerName": "backend.app",
    "LevelValue": "800",
    "ThreadID": "45",
    "ThreadName": "http-thread-pool::http-listener(11)",
    "TimeMillis": "1573991197212",
    "@timestamp": "2019-11-17T11:21:57.272+0000",
    "LogMessage": "Cannot produce root URL "
  },
  "fields": {
    "@timestamp": [
      "2019-11-17T11:21:57.272Z"
    ]
  }
}

It is time to package them and send them to their destinations.

Buffering and flushing to destination

Tdagent uses buffers, and queues for flushing the buffers. Destinations are managed either by tdagent’s core (such as “forward to another tdagent”, or “log to console”, or “copy to file”), or by plugins, such as “elasticsearch, kafka, reddis, etc.”

Depending on the implementation of the destination plugin, the buffer may, or may not be created based on the tag.

Elasticsearch’s output plugin organises buffers based on tags. That means, that for the files above, the elasticsearch plugin will create 3 buffer chunks, one for each tag. These chunks will be sent to the flush queue whenever the flush_interval is reached. Assuming we have a flush interval of 15 seconds ( we want near to live data), we’d have three flushes every 15 seconds into elasticsearch. Sounds good, doesn’t it? Aaaaaahhamm…Say hello to the grow

By now, the folder structure of the buffer looks something like this (for three files):

/var/log/td-agent/buffer/td
buffer.b59455f38d2e4cce05a6dd5f984e57bfd.log
buffer.b59455f38d2e4cce05a6dd5f984e57bfd.log.meta
buffer.b59455f5f137761d44201930b4af9eebf.log
buffer.b59455f5f137761d44201930b4af9eebf.log.meta
buffer.b59455f854fd63f9da3f2577e3541a24c.log
buffer.b59455f854fd63f9da3f2577e3541a24c.log.meta

Now, this may work fine for three, 10, 20 chunks. But how would it look like when you have:

  • +20 namespaces
  • 5- 10 pods / namespace
  • 2-3 containers / namespace

If we add all of these, we’d have about 20*10*3 log files, which makes about 600 log files. Nevermind the logs produced by Kubernets itself, or other logs running on system level (kernel, kubelet, docker, containerd, etcd , etc)

Your file structure suddenly looks like this:

Queued chunks in tdagent buffer

Suddenly we have a lot of chunks, and every 15 seconds we will try to flush them. From different td-agent instances running on different nodes. Almost at the same time. No wonder Elasticsearch refuses bulk inserts:

Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 500)

To have a better understanding of the problem in Kubernetes, have a look at this image

TDAgent running in Kubernetes - Complexity of log file tags

Allright. We have a problem. And this will have no end, for this is a vicious circle. And suddenly td-agent pods running in Kubernetes start crashing, start over again

We’re seeing duplicates again! I thought we were generating unique id’s ourselves, so why do we see duplicates? Again!?

This is because of an option when tailing, called read_from_head, which basically says: if tdagent starts over, read the file from head, and not from tail. Meaning td-agent will read the same data, again, and again, and again, therefore not only generating dupes, but adding more to the already existing pressure. We’ll solve this one by setting this option to false.

Still, what do we do about all these files queued in the buffer?

Retag to rescue

By now we know that all of the backend-services container are logging in json format. Let’s look at this from the outside: all containers, of this type, regardless where they run, will produce the same log structure

So the question is, how do we group them now under one hood, so we can optimize writing into a single chunk, instead of 600?

Well, we’re done with processing the data, enriching it, modifying it and so on. So let’s retag all of those backend.** logs into one, single tag:

 <match backend.**>
      @type rewrite_tag_filter
      @log_level info
      <rule>
       key $['kubernetes']['namespace_name']
       pattern ^(.+)$
       tag jsonlogs
      </rule>
    </match>

What we did here was to look at each individual log record of source backend.**, and to retag it if it has a namespace assigned, which it has – performed by the filter above

By now all logs produced by backend.** containers will now be tagged as jsonlogs. One tag, one chunk! Amazing, we are now able to put all our toys into one package, and send it to destination. Oh wait, isn’t that pretty much what Amazon is doing? 🙂

Suddenly the buffer looks like this:

sh-4.2# ls
buffer.b597902f5aa8440e4d70d9940a459bd5c.log	   buffer.b597903114213eb72b932d60cfbef7403.log
buffer.b597902f5aa8440e4d70d9940a459bd5c.log.meta  buffer.b597903114213eb72b932d60cfbef7403.log.meta

Let’s take a look at the output configuration now:

 <match jsonlogs.** dbmigrate.** payaralogs.** errors.**>
    @type elasticsearch
    @log_level info
    id_key _hash # specify same key name which is specified in hash_id_key
    remove_keys _hash # Elasticsearch doesn't like keys that start with _
    host elasticsearch.{{ .Release.Namespace }}
    user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
    password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
    logstash_format true
    reload_connections true
    reload_on_failure true
    reconnect_on_error true
    request_timeout 15s # defaults to 5s
    port 9200
    scheme http
    ssl_verify false 
    <buffer tag>
        @type file
        path /buffers/cluster-logstash.buffer   
# Definition of buffering options for controlling the flushing of the buffer. We will reuse for every match
    # directive for flushing logs into the log aggregators

    # The buffer queue will be flushed at this rate, unless it becomes full (chunk_limit_size)
    flush_interval 30s
    # Maximum size of one chunk of data; tdagent will write data up to this value in the buffer
    # after which it will write into another chunk
    chunk_limit_size 8M
    # Maximum size of allowed chunks ( this results in the maximum number of chungs being limit/chunk_size)
    total_limit_size 1000M
    # If the total limit size has been reached, oldest chunks will be removed ( yes, data will be lost)
    # Nevertheless we use persistence for the log forwarders. The maximum size for the buffers is set into the match
    # directive
    overflow_action drop_oldest_chunk
    # output plugin will retry periodically with fixed intervals (configured via retry_wait)
    retry_type exponential_backoff
    # constant factor of exponential backoff
    retry_wait 1s
    # Wait intervals are tweaked by multiplying by a randomly chosen number between 0.875 and 1.125
    retry_randomize true
    # Increase wait time exponential for each retry attempt (1,2,4,8,16,32,64 etc)
    retry_exponential_backoff_base 2
    # This is the maximum value that the exponential backoff is allowed to reach ( so we avoid retries set to
    # large periods - hours or days) This means that if the next retry is set by the exponential backup to 256, it will
    # be capped by this parameter to 130s
    retry_max_interval 130s
    # The number of threads to flush the buffer (parallelization of multiple chunk flushes)
    flush_thread_count 20
    # Seconds to sleep between checks for buffer flushes in flush threads.
    flush_thread_interval 5
    # Seconds to sleep between flushes when many buffer chunks are queued.
    flush_thread_burst_interval 15
    # When shutting down all buffers will be flushed to the output. We do not need this, since we use persistence.
    # By setting to false we avoid concurrent flushes from multiple forwarders in the eventuality that they get restarted
    flush_at_shutdown false
    </buffer>
</match>

Wrap up – tdagent in kubernetes

It is essential that you retag logs of the same type. Otherwise your Elasticsearch Cluster (or whatever other output) will eventually explode, and you will start missing logs.

You do that by using the retag filter. https://docs.fluentd.org/output/rewrite_tag_filter

Good luck with it. I know it solved my problems!

Print Friendly, PDF & Email