Building a scalable ELK stack

An ELK stack is a combination of three components; ElasticSearch, Logstash and Kibana, to form a Log Aggregation system. It is one of the most popular ways to make log files from various services and servers easily visible and searchable. While there are many great SaaS solutions available, many companies still choose to build their own.

When we set about building a log aggregation system, these were the requirements we had:

  • Durability: we need to persist logs long term with a very high level of confidence.
  • Integrity: we need to ensure logs cannot be tampered with in the event of a security breach.
  • Maintainability: we have a small team with limited resources to operate and manage the platform.
  • Scalability: we need to be able to deal with a very large number of events that may spike during peak periods, outages and when bugs are introduced on the producer end.

ELK stacks are not known for their Durability or Integrity, so we had to think outside the box to solve these problems. But they are fairly easy to maintain and scale, when they are designed and implemented thoughtfully. Logstash can become particularly unruly if not implemented carefully.

Structured Logs

“You want structured data generated at the client side. It isn’t any harder to generate, and it’s far more efficient to pass around.” - Charity Majors

We decided early on to enforce the complexity of making logs easy to consume on to the application or server that produced them. Each server has an agent that collects logs from a file or network socket. Either the logs are structured or not, if they are not the agent parses them and forwards them on. This greatly simplifies the Logstash configuration. Instead of having dozens of rules that covered a variety of applications (nginx, HAProxy) and frameworks (Rails, Phoenix, NodeJS) we have a single JSON format.

Queuing

“(A Queue) is imperative to include in any ELK reference architecture because Logstash might overutilize Elasticsearch, which will then slow down Logstash until the small internal queue bursts and data will be lost. In addition, without a queuing system it becomes almost impossible to upgrade the Elasticsearch cluster because there is no way to store data during critical cluster upgrades.” - logz.io

Since building our modified ELK stack we have experienced incidents where the number of logs being created was greater than we were able to process. Having a queue in place was invaluable to buffer the logs whilst we caught up. It also allows us to easily take down any part of our stack for maintenance without losing logs. For the queue we chose AWS Kinesis, because it scales well beyond what we will need and we don’t have to manage it ourselves.

Logstash now has Persistent Queues, which are a step in the right direction, but they are not turned on by default.

Persistent Queues also have an important side effect, without them Logstash is a stateless service that can be treated like the rest of your Infrastructure; built up and torn down whenever you want. With persistence they become stateful and your Logstash instances become harder to manage. They have to be managed like a database; backed up, disk space carefully measured and data replicated somehow between multiple data-centres to make them highly available. Many people use Kafka or Kinesis to form a queue in front of Logstash, which comes with in built replication. Logstash may be more manageable than Kafka, particularly on a smaller scale, but Kafka and Kinesis are a lot more robust and have been developed with durability as a primary concern.

Durability

While having a queue improves the durability of our logs in ElasticSearch there’s still a risk we could lose them from there. We can replicate our shards across multiple availability zones, but there’s still the risk, through human error or catastrophic failure, that logs could be lost. So we configure all our Log Agents to forward logs to both a Kinesis Stream and a Kinesis Firehose Delivery Stream. This Delivery Stream persists the logs to an S3 bucket for long term archival. In the event of a failure we can always retrieve logs from S3.

Integrity

One of the many benefits of Centralised Logs is providing an audit trail of actions taken by users in any given system. During a security breach they form an essential piece of evidence for establishing what was breached, how and potentially by whom. But if the log servers themselves are breached the audit trail could be modified, rendering them useless. This forms the basis of the PCI DSS 10.5.2 requirement:

“Protect audit trail files from unauthorized modifications”

Therefore all the S3 buckets we use for long term archival are stored in a dedicated AWS Account with much tighter security controls. Amazon provide more details on how this works.

A side note about Logstash Plugins

While Logstash core is a robust service, there are many community maintained plugins with varying levels of maturity. In an early implementation of this platform we attempted to load some logs from S3 using the logstash-input-s3 plugin. However we had many issues:

We also tried to use a number of Codec plugins to parse CloudTrail and CloudFront logs, but had many issues including a lack of compatibility with Logstash 5. In the end we dumped all plugins except logstash-input-kinesis.

The curious case of the blocked pipeline

Another issue we faced was Logstash seemed unable to keep up with CloudTrail logs. This manifested as all logs being delayed several hours at regularly intervals. We tried to report metrics from the S3 input plugin, but this proved difficult to get working. To understand why we need to look at the structure of a CloudTrail payload as delivered to S3:

1
2
3
4
5
{
    "Records": [
    ...
    ]
}

Reference

Inside the Records array can be hundreds of events. This means Logstash has to deserialise a large (several megabyte) JSON file before passing it down the pipeline. In testing we found that Ruby, even JRuby which is used by Logstash, would take dozens of seconds to load such a large JSON string. Ultimately Logstash is designed to deal with streams of logs and not large serialised payloads. Our theory is the CloudTrail logs were choking all the worker threads causing all logs to be delayed. The number of Logstash instances, with their RAM and CPU requirements, needed to ingest all our CloudTrail logs was cost prohibitive.

To solve this we looked at ways to pre-process the events before they were consumed by Logstash. Writing a small program to chunk the JSON into smaller events and feed them to the Kinesis Stream also simplified our architecture since all events now come from Kinesis and reduced the responsibility and complexity of the Logstash implementation. We decided to write this program in Golang, as benchmarks showed it was six times faster at deserialising large JSON strings than Ruby or JRuby.

Tuning Logstash for performance

The issue with CloudTrails was just one of the performance issues we have experienced with Logstash. Over time we developed a better understanding how Logstash performs and how to troubleshoot it. The Elastic guide provides some insights into how you can scale worker threads and batch sizes to get better utilisation and throughput. However these are very one-dimensional in tuning performance, they only generally affect filter and output components and not performance issues in the input plugin. For instance the previous issue discussed is exacerbated by the fact the S3 input plugin is single threaded. Adding more worker threads or increasing the batch size does not improve this issue.

Given a Logstash Pipeline, consisting of input, filter and output plugins, how do we find the bottleneck? A simple way is to start with the guide provided by Elastic and see if this improves performance.

The pipeline.workers setting determines how many threads to run for filter and output processing.

If increasing the pipeline.workers setting improves performance then great! If not though, the issue could still be an input, filter or output plugin. To determine which we can dive into the Logstash API:

$ curl localhost:9600/_node/stats/pipeline?pretty=true

This API, available in Logstash 5.0 and greater, gives a breakdown of the number of events that have executed in the pipeline and how long each step has taken. To understand what these numbers mean we first need to understand a bit more about how the Logstash Pipeline works:

Each input stage in the Logstash pipeline runs in its own thread. Inputs write events to a common Java SynchronousQueue. This queue holds no events, instead transferring each pushed event to a free worker, blocking if all workers are busy. Each pipeline worker thread takes a batch of events off this queue, creating a buffer per worker, runs the batch of events through the configured filters, then runs the filtered events through any outputs.

With this in mind, and some basic queuing theory, we can see how increasing the pipeline.batch.size and pipeline.workers may improve throughput. If we process more events at a time we can reduce the number of network calls made to the output we are feeding. If we have more workers, we can process more events at a time. However if the input or output plugin are single threaded, or a filter plugin takes a long time to process each event, we need more CPU and RAM to be dedicated to Logstash (either by creating more instances, or adding more resources to the existing instance), which becomes cost prohibitive.

A blocked pipeline, in which the input worker is waiting for a free thread to handle a new batch of events, can be discovered by looking at the queue_push_duration_in_millis statistic from the node pipeline stats. For this example we’ll look at a Logstash instance we use that takes inputs from syslog and forwards them to a Kinesis Stream and a Firehose Delivery Stream:

1
2
3
4
5
6
7
8
  "pipeline" : {
    "events" : {
      "duration_in_millis" : 3764814011,
      "in" : 28471136,
      "out" : 28471136,
      "filtered" : 28471136,
      "queue_push_duration_in_millis" : 3758074993
    },

By dividing the queue_push_duration_in_millis by the duration_in_millis we can determine the percentage of time that logstash spent waiting for a free worker:

1
3758074993 / 3764814011 = 0.9982

This means Logstash spent 99.82% of it’s time waiting for a free worker. To see which pipeline step is taking the most time, we can extract the duration_in_millis of each step:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"plugins" : {
      "filters" : [ {
        "events" : {
          "duration_in_millis" : 1767002,
          "in" : 28471136,
          "out" : 28471136
        },
        "matches" : 28471136,
        "name" : "date"
      } ],
      "outputs" : [ {
        "events" : {
          "duration_in_millis" : 1735364,
          "in" : 28471136,
          "out" : 28471136
        },
        "name" : "kinesis"
      }, {
        "events" : {
          "duration_in_millis" : 1459221995,
          "in" : 28471136,
          "out" : 28471136
        },
        "name" : "firehose"
      } ]

Here we can see the “firehose” output plugin is taking a lot longer than others. 99% of time is spent in the firehose plugin! As it turns out the Firehose output plugin we were using was sending each event one at a time in a single thread, limiting it to about 20 events per second.

Summary

Building any kind of event processing pipeline is not an easy task and Log Aggregation is no exception. But as with any pipeline, decoupling components to reduce their responsibilities greatly simplifies the architecture and makes it easier to scale and troubleshoot issues. By taking this approach to the problem of Log Aggregation we have been able to scale to consistently processes 1000 events per second, with regular spikes exceeding 1300 events per second.