In this, the fourth article in our Collector Highlight Series, we take a look at Heka, a composable collector developed at Mozilla.
What is it?
Heka is the quintessential "Swiss Army Knife" tool for processing streams of data and routing them to one or more destinations. Written in Go and released as open source by engineers at Mozilla, Heka is a high performance tool supporting a wide variety of tasks and workflows. Its primary focus is on data gathering, analysis, monitoring, and reporting, although it supports a wide variety of different tasks, such as:
- Loading and parsing log files from a file system.
- Accepting statsd type metrics data for aggregation and forwarding to upstream time series data stores such as graphite or InfluxDB.
- Launching external processes to gather operational data from the local system.
- Performing real time analysis, graphing, and anomaly detection on any data flowing through the Heka pipeline.
- Shipping data from one location to another via the use of an external transport (such as AMQP) or directly (via TCP).
- Delivering processed data to one or more persistent data stores.
How do I install it?
$ wget https://github.com/mozilla-services/heka/releases/download/v0.8.2/heka_0.8.2_amd64.deb $ sudo dpkg -i heka_0.8.2_amd64.deb
When your installation is complete you should have a number of new files: Lua plugins (mostly encoders and decoders), some dashboard assets, and a handful of binaries. The single most important file is the hekad daemon, which loads and orchestrates the various plugins specified in your configuration.
Heka uses the TOML markup language for its configuration syntax, which is functionally similar to the INI format. By default, hekad looks for a configuration stored at /etc/hekad.toml, although you can override this with the --config parameter. You can get started by creating a new empty TOML file and adding configuration blocks for each plugin type needed to process your stream data. We'll look at this in more detail in the following sections.
How does it work?
Heka uses a plugin system to load and process streams of data at various stages of its "pipeline". Inputs provide an ingress point for data to enter the system. Decoders convert the native data into Heka's internal message data structure so that downstream handlers will understand how to interact with the data. Filters are the core processing units within Heka, performing useful analytics or aggregation on the data before routing it forward. Encoders act as the inverse of decoders, serializing the internal data structures into the preferred format for the intended recipient. And finally, Outputs handle delivery of the payload to their final destination.
In the sections below we're going to preview what it might look like to process some data that we want to store in Librato. Specifically, we'll look to track HTTP requests through an NGINX webserver. For this exercise we'll need to be able to read from the NGINX logs, translate the log entries into something usable as a metric, and then transmit it over to our Librato account.
Input plugins receive data from external systems so they can begin processing within the Heka pipeline. Heka supports a variety of input mechanisms such as: reading from files, listening on a TCP socket, accepting log streams, pulling messages from AMQP, launching external processes, or even connecting to remote HTTP services. Virtually anything can serve as a Heka input, although the plugin must be written in Go.
The following configuration snippet instructs hekad to use the LogstreamerInput plugin and passes a number of directives to the plugin. In particular, it directs the plugin to load NGINX's access.log files and specifies the decoder ("nginx-access-decoder") where the stream should be routed.
[nginx-access-logs] type = "LogstreamerInput" log_directory = "/var/log/nginx" file_match = 'access\.log\.?(?P<Index>\d+)?(.gz)?' priority = ["^Index"] decoder = "nginx-access-decoder"
Heka uses decoders to convert the source data (as imported by the Input plugin) into a native data structure that Filters will be able to understand.
Decoders deserialize the native stream into Heka's internal message data structure so that downstream handlers can work with it. These plugins can be written in Go or as sandboxed Lua code. Lua makes it very easy to prototype (it doesn't require recompilation of Heka), so many of the community-contributed plugins will be in this format.
In this snippet we tell Heka to load the SandboxDecoder plugin and the Nginx Access Log Decoder. There is also a nested configuration block that is passed to the Lua sandbox, specifying the log format and identifying each of the fields in the stream. Once the data has been decoded it is effectively "in the system" and ready for processing by any of the various Heka Filter plugins.
[nginx-access-decoder] type = "SandboxDecoder" filename = "lua_decoders/nginx_access.lua" [nginx-access-decoder.config] log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"' type = "nginx.access"
Filters do most of the heavy lifting within Heka. They're useful for performing aggregation, monitoring, or even processing new metrics to be reinjected into the Heka pipeline. Like encoders, filters can be written in Go or as sandboxed Lua modules.
For this example, we want to operate on the NGINX log streams that have been processed by our Input and Decoder plugins. We're going to use the HTTP Status sandbox filter to match any messages with a Type of "nginx.access" (as set in our decoder). This will aggregate the $status field that we identified in the decoder configuration and flush all status counts every 5 seconds.
[http_status] type = "SandboxFilter" filename = "lua_filters/http_status.lua" ticker_interval = 5 preserve_data = true message_matcher = "Type == 'nginx.access'" [http_status.config] sec_per_row = 5 rows = 17280 preservation_version = 0
Encoder plugins serialize Heka's internal data structures back into a format appropriate for the designated output. These can also be written in either native Go code or as sandboxed Lua.
Here we want to choose the CBUF Librato Encoder, which will extract our metrics from the circular buffer and serialize them into JSON appropriate for the Librato API. There isn't much involved in the configuration here other than loading the proper encoder and making sure preserve_data is enabled, which will help to avoid duplicate messages in case Heka is restarted.
[cbuf_librato_encoder] type = "SandboxEncoder" filename = "lua_encoders/cbuf_librato.lua" preserve_data = true
Last but not least, output plugins are responsible for the final delivery of our byte stream to its intended destination. We can use them to write data out to files, send them to remote network sockets, or in our specific example, for sending our NGINX status code measurements to Librato. These plugins must be written in Go.
We'll need to use the HttpOutput plugin here. Our configuration specifies which messages we want to match (type == heka.sandbox-output and payload_type == cbuf) from the "cbuf_librato_encoder" encoder. The configuration also provides the remote address (URL) of the HTTP service, as well as our username and password (token) credentials. Lastly, we also need to specify the Content-Type header as application/json.
[librato] type = "HttpOutput" message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'cbuf'" encoder = "cbuf_librato_encoder" address = "https://metrics-api.librato.com/v1/metrics" username = "firstname.lastname@example.org" password = "f48c781b49f43b8fe991e004978b9717abd8e065b9656abeb476dc3ac44480fe" [librato.headers] Content-Type = ["application/json"]
Putting it Together
All of the previous snippets should be concatenated into a single file. Our final configuration would look something like this:
[nginx-access-logs] type = "LogstreamerInput" log_directory = "/var/log/nginx" file_match = 'access\.log\.?(?P<Index>\d+)?(.gz)?' priority = ["^Index"] decoder = "nginx-access-decoder" [nginx-access-decoder] type = "SandboxDecoder" filename = "lua_decoders/nginx_access.lua" [nginx-access-decoder.config] log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"' type = "nginx.access" [http_status] type = "SandboxFilter" filename = "lua_filters/http_status.lua" ticker_interval = 5 preserve_data = true message_matcher = "Type == 'nginx.access'" [http_status.config] sec_per_row = 5 rows = 17280 preservation_version = 0 [cbuf_librato_encoder] type = "SandboxEncoder" filename = "lua_encoders/cbuf_librato.lua" preserve_data = true [librato] type = "HttpOutput" message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'cbuf'" encoder = "cbuf_librato_encoder" address = "https://metrics-api.librato.com/v1/metrics" username = "email@example.com" password = "f48c781b49f43b8fe991e004978b9717abd8e065b9656abeb476dc3ac44480fe" [librato.headers] Content-Type = ["application/json"]
After saving the file, Heka can be started manually and you should see output for each of the plugins.
$ sudo hekad --config=/etc/heka/hekad.toml 2014/12/29 15:56:24 Pre-loading: [nginx-access-logs] 2014/12/29 15:56:24 Pre-loading: [nginx-access-decoder] 2014/12/29 15:56:24 Pre-loading: [http_status] 2014/12/29 15:56:24 Pre-loading: [cbuf_librato_encoder] 2014/12/29 15:56:24 Pre-loading: [librato] 2014/12/29 15:56:24 Pre-loading: [ProtobufDecoder] 2014/12/29 15:56:24 Pre-loading: [ProtobufEncoder] 2014/12/29 15:56:24 Loading: [nginx-access-decoder] 2014/12/29 15:56:24 Loading: [ProtobufDecoder] 2014/12/29 15:56:24 Loading: [cbuf_librato_encoder] 2014/12/29 15:56:24 Loading: [ProtobufEncoder] 2014/12/29 15:56:24 Loading: [nginx-access-logs] 2014/12/29 15:56:24 Loading: [http_status] 2014/12/29 15:56:24 Loading: [librato] 2014/12/29 15:56:24 Starting hekad... 2014/12/29 15:56:24 Output started:librato 2014/12/29 15:56:24 Filter started:http_status 2014/12/29 15:56:24 MessageRouter started. 2014/12/29 15:56:24 Input started: nginx-access-logs
If you're using the same five-second Filter settings as above, you'll want to set your Period to match in the Librato Metric Attributes.
Assuming you have live data hitting your NGINX server, you should begin seeing metrics in your Librato account. Here is an instrument showing all of the HTTP 2xx and 4xx metrics from my own NGINX server (with some test traffic thrown in for good measure).
Hints, Trade-offs, and Gotchas
Heka's Librato encoder only supports gauge metrics so if you're trying to capture counter values make sure to use the integrate() function with our Composite Metrics.
As I mentioned earlier, the ticker_interval in your encoder will define the flush interval within Heka's circular buffer. It's important that this matches up with your metric's Period attribute or you'll end up with gaps in your data.
Whether you're running ephemeral cloud instances or physical hardware, Heka and Librato are a simple, scalable, and powerful pipeline for systems and application monitoring. Sign up for a free trial today, and we'll have you correlating systems metrics across your infrastructure in minutes.