This is the story of “Husky”, a new event storage system we built at Datadog.
Building a new storage system is a fun and exciting undertaking—and one that shouldn’t be taken lightly. Most importantly, it does not happen in a vacuum. To understand the trade-offs that a new system makes, you need to understand the context: what came before it, and why we decided to build something new.
From Metrics to Logs
A few years ago, Datadog announced the general availability of its Log Management product. This was a significant addition to our platform. Before that, the company’s most widely known product was Infrastructure Monitoring (colloquially known as “Metrics”). Metrics systems are based on the idea of storing pre-aggregated timeseries data as tuples of
However, these metrics systems are not suitable for storing log data because they achieve efficiency by pre-aggregating many similar events into a single aggregated datapoint. For example, to get web hit rates, it’s a lot more effective (cost- and energy-wise) to capture the count per second on the web server than to read the click stream to derive the same information.
This optimization is highly desirable to produce compressed timeseries for a metrics system, but disastrous for a logs product. At a granularity of one second, a million “events” that occur within the same second can be compressed into a single 16-byte tuple of
. In addition, most modern metrics databases leverage delta-of-delta encoding, such that the actual storage cost of each 16-byte tuple is less than two bytes in most cases.
In exchange for the efficiency, metrics systems have limits on their capacity for storing context in the form of tags. You can filter and group metrics data in powerful ways, but there is a trade-off between unbounded tag cardinality and the time a query takes to return.
Hence, the practical recommendation to tag metrics by long-lived dimensions, such as datacenter
, service
or pod_name
, while pre-aggregating short-lived, fast-churning dimensions such as transaction_id
, or packet_id
.
On the other hand, a logging product has some very different requirements:
- Log sizes tend to be measured in kilobytes, not bytes. This has a dramatic effect on what’s needed to store and query those logs efficiently.
- A logging product that can’t support high cardinality data, like stack traces and UUIDs, isn’t very useful.
In other words, the appeal of metrics is in the ability to very efficiently compute timeseries, with enough context, out of lots of events. The appeal of logs is in the ability to retain lots of granular events with all their context—and to produce arbitrary dimensional aggregates at query time.
The initial version of Datadog Logs
This is what Datadog’s first Logs system looked like.
It worked quite well in the beginning, but it didn’t take long for the cracks to show. The primary problem was that within a multi-tenant cluster, a single misbehaving node could end up disrupting the experience of all tenants—and in the worst case, make the entire cluster unavailable.
Whenever this occurred, mitigating the issue was difficult. Scaling up or scaling out an overloaded cluster often made things worse instead of better. The nodes that were already overwhelmed by writes or reads would suddenly start streaming data to each other, in addition to all the work that they were already trying to perform.
Building our own clustering
The second iteration of our Logs system decoupled storage from clustering. We would handle all of the clustering separately, allowing us to tightly integrate the new clustering system with the rest of our multi-tenant technology. We retained the same single-node storage engine, and focused on getting fine-grained control over data distribution and topology changes. The new setup looked something like this:
There are a few key differences between this diagram and the previous one:
- None of the individual nodes know about each other. Each one behaves as if it’s a “cluster” of one. This means that a single misbehaving or unhealthy node can only ever disrupt as many tenants as we’ve assigned to the shard it owns, and there is no way for it to cause cascading failures to the rest of the “cluster”.
- We introduced a new service called “Shard Router” that reads from Kafka and writes back to a new Kafka cluster, but this time with the data organized into “shards” (groups of partitions). Tenants are automatically split into an appropriate number of shards based on their data volume over the last five minutes.
- Two storage node replicas consume each shard, consuming only events from their shards’ relevant partitions.
- We added a custom query engine that knows which shards each tenant’s data is spread across—and whose responsibility it is to query all of the relevant shards/replicas, merge the partial aggregates, and generate the final query result.
Platform hypergrowth
The migration to this new architecture represented a marked reliability improvement, meaning than our on-call rotations became a lot more bearable, and engineers got more sleep. Just in time for the scope of our platform (internally called “Event Platform”) to grow at an even faster clip!
Other teams launched new products like Network Performance Monitoring (NPM), Real User Monitoring (RUM), and Datadog Continuous Profiler. Many of these new products had similar storage requirements as the Log Management product: they needed to store and index multi-kilobyte timeseries “events”. Recognizing that the e