Note: a good discussion of this article can be found on Hacker News.
We use stream processing at Librato to provide near-real-time time series analytics and alerting capabilities, so our customers can effectively measure the quality of their services. In June this year, @DaveJosephsen gave a talk at Monitorama PDX about stream processing at Librato and our experiences with Apache Storm. Towards the end of the presentation he talked a little about SuperChief, the new distributed stream processing system I was building to replace Storm.
I’m happy to announce that SuperChief is now in production handling all of Librato’s time series aggregations. SuperChief is currently processing around 200K messages/sec and we expect that number to grow significantly as we replace all instances of Storm across our infrastructure.
Apache Storm has had a successful run at Librato but we ultimately decided to replace it because we found the benefits of its abstractions no longer outweighed their costs. In this post we’ll discuss both the benefits and cost of Storm’s abstractions and take a high level look at the design and performance of SuperChief.
Why We Initially Chose Apache Storm
When Librato was founded, the team was small, so they had to carefully choose what to work on. Time series aggregation was an important feature, but they also understood that building and managing a distributed computation system would have an impact on their ability to work on other features. With this in mind, they began to look at open source platforms for distributed computation, and chose Apache Storm.
Almost everything in Apache Storm is an abstraction. To run a job in Storm you create a topology, which is a directed graph of spouts and bolts. Spouts allow you to retrieve data from some source (e.g., Apache Kafka) and transform it into a stream of tuples (e.g., messages). Bolts are an abstraction that allow you to perform computation on the stream. Spouts and bolts are run as an abstraction called tasks, which are grouped into an abstraction called executors. Executors are run by a few threads and are grouped together and run inside a JVM by another abstraction called a worker. Additionally, bolt code the user provides runs in a single-threaded context, eliminating the need for a user to implement thread-safe code.
Finally, in order to run your Storm topology you need a Nimbus server. The nimbus is a workload manager that packages up all your spouts and bolts and distributes them across a cluster of servers running the Storm worker code. Storm abstracts and handles all network based communication between your components using netty. If a server fails, tasks will be reassigned to different servers by Nimbus, and tasks can move around on subsequent deploys of your topology.
Using Apache Storm, Librato was able to quickly implement features like time series aggregation, alerting and historical importing. Over time, the number of servers required to run these topologies continued to grow until we had a large (relative to the size of our other services) Storm deployment, and found we were over-provisioned with Storm at what was essentially a low efficiency level.
Why We Decided to Replace Apache Storm
Inability to isolate, reason about, or debug performance issues due to the worker/executor/task paradigm. This led to building and configuring clusters specifically designed to attempt to mitigate these problems (i.e., separate clusters per topology, only running a worker per server.), which added additional complexity to development and operations and also led to over-provisioning.
Ability of tasks to move around led to difficult to trace performance problems.
Storm’s work provisioning logic led to some tasks serving more Kafka partitions than others. This in turn created latency and performance issues that were difficult to reason about. The initial solution was to over-provision in an attempt to get a better hashing/balancing of work, but eventually we just replaced the work allocation logic.
Due to Storm’s architecture, it was very difficult to get a stack trace or heap dump because the processes that managed workers (Storm supervisor) would often forcefully kill a Java process while it was being investigated in this way.
The propensity for unexpected and subsequently unhandled exceptions to take down an entire worker led to additional defensive verbose error handling everywhere.
This nasty bug STORM-404 coupled with the aforementioned fact that a single exception can take down a worker led to several cascading failures in production, taking down entire topologies until we upgraded to 0.9.4.
Additionally, we found the performance we were getting from Storm for the amount of money we were spending on infrastructure was not in line with our expectations. Much of this is due to the fact that, depending upon how your topology is designed, a single tuple may make multiple hops across JVMs, and this is very expensive. For example, in our time series aggregation topologies a single tuple may be serialized/deserialized and shipped across the wire 3-4 times as it progresses through the processing pipeline.
The more network calls you make, the greater the likelihood data may get lost, so one nice feature of Apache Storm is tuple tracking. Tuple tracking ensures each tuple makes it all the way through your topology. Unfortunately, this feature is not cheap (incidentally it’s not exactly cheap in SuperChief either, though it’s faster than Storm and we’re working on improving it significantly - but more on that later). Here is a screenshot of CPU utilization of our 1-minute time series aggregation Storm cluster. These nodes take all the raw data that Librato receives and aggregate these measurements into single 1-minute windowed measurements with sum, average, min, max and count attributes. By simply disabling tuple tracking we saw an approximately 50% reduction in CPU utilization.
Perhaps most importantly, as Librato’s data engineering team grew, we started having the bandwidth to reconsider the previous choice we had made. We determined that it was in the best interest of our customers and the business to have ultimate control over the design of this service and any trade-offs that were to be made.
SuperChief vs. Apache Storm in Production
To date we’ve replaced three Storm topologies (running 23 c3.2xl instances) with SuperChief (12 m3.2xl instances), cutting our bill on these workloads in half. We switched from c3.2xl instances to m3.2xl because reducing our instance count required more memory per instance. The m3.2xl ($0.532/hr) and c3.2xl ($0.42/hr) have the same number of cores, but the m3.2xl instances have more memory but slightly slower processors.
Even with SuperChief running half the instances and half the cores of Storm we see a 50% CPU utilization reduction. We expect to realize further gains as we continue to optimize SuperChief.
SuperChief is a Dropwizard service written in Java 8. SuperChief uses many of our internal and open source libraries including disco-java, watchconf and rollout. SuperChief also depends on ZooKeeper and Apache Curator for cluster management and offset management for Apache Kafka.
The core SuperChief package allows you to create a SuperChief instance, supply configuration, register with a cluster and consume some data. It also has a couple of Dropwizard resources for a simple UI. At Librato we use Kafka for durability of our measurements before they are processed, so it’s the only data source SuperChief supports today, but adding a new one is pretty straight forward.
SuperChief and Kafka
SuperChief provides classes for reading and writing to Kafka, offset tracking and cluster coordination when consuming from Kafka. You may programmatically create KafkaReader and KafkaCheckpointer instances or you can create them by passing in configuration.
Each KafkaReader consumes from a single broker/topic/partition, and each broker has its own thread pool. This ensures issues with one broker doesn’t impact reading from another. Messages read from Kafka are added to a bounded BlockingQueue and then serviced by a pool of worker threads.
The KafkaCheckpointer handles writing offset information to ZooKeeper and verifies that all offsets read from Kafka exit a SuperChief topology within some configurable time, similar to tuple tracking and acknowledgement in Storm.
SuperChief and Kafka Cluster Coordination
In order to perform in-memory time series aggregations by metric and source in Storm we used its fields grouping capabilities. This meant a spout on one server read a record from Kafka, sent that record to a fields grouping bolt on another server, and that server processed the record and potentially determined 3rd server to send the record to. With SuperChief we’ve pushed this logic into our API tier, eliminating the need for these additional network calls.
When our API receives measurements, it hashes the measurement to a Kafka partition index and then attempts to send the measurement to a broker. If the broker is unavailable the API will failover to another broker but will still write to the same partition index. We use this logic to create a work unit in SuperChief (i.e., a work unit is a particular partition index across a cluster of Kafka brokers). We then provision these work units to SuperChief instances to ensure that any measurements for an unique metric/source will be processed and aggregated by the same SuperChief instance.
We use Apache Curator to elect a leader for the SuperChief topology/cluster. The leader’s responsibility is to monitor ZooKeeper for any changes in Kafka brokers or SuperChief cluster instances and reallocate work units to the cluster. This is accomplished by writing KafkaReader configuration to a ZooKeeper znode that SuperChief cluster instances monitor with Watchconf.
One Million Messages/Sec on SuperChief
SuperChief is only a few months old but it’s is already several orders of magnitude faster than our existing Apache Storm infrastructure. A single server (depending on number of cores and available memory etc.) can process millions of messages per second.
In order to test the performance of SuperChief we simulated customer traffic in our staging environment to generate millions of metric measurements a second. All measurements were gathered on EC2 with paravirtualized hosts (bare metal numbers are not currently available). We used 10 c3.2xlarge instance to simulate 60 customers sending 200-250 metrics with 1000 unique sources every 10 seconds and performed one minute aggregations on the data, writing the results to Kafka.
The simulated customer data was sent directly to 20 m1.xlarge Kafka nodes (bypassing our API tier). The Kafka nodes averaged 330 - 350MB a minute; iostat showed the Kafka drives mostly untaxed with CPU utilization averaging between 50 - 60% on the Kafka nodes.
Below is a screenshot of 6 node SuperChief cluster of m3.8xlarge (32 cores per node/192 cores total) paravirtual instances with 54GB heaps running the G1 collector. The total in-memory working set size of the cluster is around 160-170GB. The cluster is running Librato’s 1-minute time series aggregation service processing ~1.2 million Kafka messages/sec sustained. One minute average CPU utilization was ~45 - 53%. Total processing time (e.g., latency) was measured as the difference between timestamps from when the measurement was created on the traffic generator to processing was completed in SuperChief, which includes time to write and read from Kafka. Average latency was between 1.5 and 2 seconds with a p999 of 5 seconds.
We estimate we’d need 7 to 10 times the number of resources used in these tests in order to achieve this level of performance with Apache Storm.
Room for Improvement
One of the more resource-intensive functions in SuperChief and Storm is offset tracking. At millions of messages a second, offset tracking significantly increases memory requirements and generates additional garbage, which in turn requires additional GC cycles. For example, with offset tracking disabled in SuperChief we were able to process the same workload on a much smaller cluster of servers 6 m3.2xlarge (8 cores per node / 48 cores total) running 24GB heaps. However, offset tracking is a guarantee we do not wish to give up so we have plans to significantly optimize this function and will follow up in future posts about the techniques we’re investigating.
Apache Storm has been important to Librato’s success. Storm’s abstractions allowed us to quickly deliver important features instead of focusing on the complexity of building a distributed computation system. SuperChief, however, is our future. With SuperChief we’ve stripped away costly abstractions and can now focus on exacting the highest levels of performance from our infrastructure. There’s a lot more to SuperChief than we had time to cover in this article. For example, it provides classes for generic time series aggregation of any data. We look forward to sharing more about SuperChief as we continue to improve its performance and capabilities.
It’s engineering like this that makes Librato awesome. Try us out - it’s free, fun, and full-featured for 30 days.