KairosDB inner workings; how does it store data in Cassandra?

KairosDB is a timeseries database built on Cassandra. Even though it is coded in a way to support different database backends, it is known mostly by its usage of Cassandra as a scaleable and performant time series data storage. With KairosDB, you can store timeseries metric data in different types and then query them with aggregations over time units. In this article, we will look into the details of KairosDB and its two different Cassandra schema to understand how the insert, delete and read operations we do, work under the cover. Cassandra is a database that you need to model your data, regarding to your queries. That is why its important to know how your queries are made to the Cassandra according to your KairosDB data model!

KairosDB has just released a new version that uses CQL and an improved Cassandra Schema this month. This version reportedly has some performance improvement over the old one. However to be compatible with the old schema, this new version still includes the schema of the old one. Queries are done on the both new and old tables to stay compatible.

The first thing you need to know before inspecting the schemas, is that most of the data in the old schema has a blob type. Meaning if you were to query the Cassandra database directly, the returned data would be not human-readable. To get around this when inspecting the database with a GUI or by hand, you can run this Python code in an interpreter to see what the data is: bytearray.fromhex('7461675f76616c756573').decode('utf8'). The reason why the data is stored as blobs comes down to custom metric data types and the schema design. We will see this later.

Let’s start with dissecting the old Cassandra Schema and see how it works for our read/insert/delete operations.

Old Cassandra Schema

This Cassandra Schema is used prior to v1.2.0 for the versions of KairosDB that uses Thrift instead of CQL. Check this gist to see the full schema as a gist file.

Tables

This Schema has a single keyspace called kairosdb and 3 tables called string_index, row_key_index and data_points. Each of these tables are explained in detail below.

Metric data for a specific metric and tags are hold in buckets of 3 weeks. This means a new row is created every three weeks to hold metric data for that period of time. The 3 weeks is hardcoded into KairosDB and is a design choice. Finding out which buckets to query for a period of time, or which bucket a specific data point belongs to is done with the help of a simple arithmetic calculation.

string_index

This table has 3 fields key = blob, column = text and value = blob however only the first two columns are used. KairosDB stores every string it sees on inserts into this table to speed up queries. There are 3 possible values for the key field, namely metric_names, tag_names and tag_values(stored as UTF-8 blobs). The other field which is used is the column field, which is used to store seen metric names, tag names and tag values. This table is queried to make sure searching the other tables is not unnecessary for a specific metric/tag. This table is queried only when populating the KairosDB UI’s metric name dropdown menu. tag_names and tag_values are not used and they aren’t populated in the new version anymore.

data_points

This table has 3 fields which are used to hold a metric data for a specific metric name and tags.

The fields for tables are key = blob, column1 = blob and value = blob. The key field is the concatenation of metric name, bucket start timestamp, the data type and the tags. column1 is the offset of this data’s time, relative to the bucket start time, and value is the serialized binary data for the value(this is how KairosDB can have custom data types) for your metric data. How this columns are utilized are explained below in Workflow section.

The data in this rows are ordered by their offset time in ASC order, so you can have sequential access to your timeseries data when querying. Also another thing to keep in mind is, since the key field is used for partioning this data in your Cassandra cluster, each metric data for a specific metric/tags/bucket will be in the same Cassandra node.

row_key_index

This table is used to keep track of which buckets are available in the data_points table for a specific metric/tags. Lets say you have a time range and you want to do aggregates over a metric with a tag. You need a way of finding matching buckets of data(in data_points) for this query first. This tables helps you with this.

It has 3 fields key = blob, column1 = blob, value = blob. The key field is the metric name in utf-8 bytes. And the column1 field is the same with the data_points table’s key field. Meaning metric name + bucket start time + data type + tags concatenated together. The value field is not used. This table essentially is a way of saying(in plain English); for this metric name, we have these buckets of data with these tags.

You can later use this information to make your queries. This is explained in detail below in Workflow section.

This table’s partition key is the key field which holds the metric name. Because of this, all of row_key_index entries for a single metric get stored in the same cassandra node. This is improved in the new schema.

Workflow

Below we try to explain how each insert/read and delete operation is done with this schema so you can better understand the inner workings of KairosDB.

As an example in this workflow, we will assume that we’re holding temperature data for different cities. Lets model it so the metric name is Temperature and the city information is a tag.

Insert

Let’s say you have a metric data with Temperature metric name, and city=Antalya tag. For inserting a single metric to this set, we will use 33 for the value and 1501672887988 for the timestamp.

We first calculate(CassandraDatastore.java Line 287) which bucket this data belongs to. It’s done using the formula timestamp - (timestamp % row_width) where row_width is the hardcoded value(Line 92) in KairosDB. This then translates to: 1501672887988 - (1501672887988 % 1814400000) = 1500508800000. In unix time, for a data dated 08/02/2017 @ 11:21am (UTC) the bucket start time corresponds to 07/20/2017 @ 12:00am (UTC).

We then create(Line 289) a row_key with this information. This row_key is the concatenation of Metric Name(Temperature), the bucket start time we calculated(1500508800000), the data type(kairos_long) and our tags(city=Antalya).

Then we move on to making sure(Line 298) we have a row_key_index entry for the Metric Name(Temperature) and the row_key we calculated. And then we put the data in to the data_points table! We do this by calculating the offset relative to the bucket start time data_time - row_time in our case: 1501672887988 - 1500508800000 = 1164087988. This all in all translates to an insert to data_points table with key = row_key, column1 = offset we calculated and the value = our metric value = 33.

After the data is written, we write metric names, tag names to the string_index table.

Read

After getting familiar with insert, we don’t need to go into details here. It’s apparent that, for a time range and metric/tags query, before querying the data_points table, we need to take a look(Line 425) into row_key_index table. To do this we create a query into the row_key_index table with the start and end time of our query(Line 582). This results in a query with Metric Name(e.g. Temperature) and time range to the row_key_index table. For each returned result, we match them in-memory with our query(Line 712 this requires parsing the binary row_key values aswell) and then we make queries(Line 457) to data_points table for each matched bucket.

This is why its not recommended to have a very populated row_key_index table with the old schema. Because that may result in too many in-memory comparisons and too many different queries to be made to data_points table.

Delete

Deletes goes pretty much same with the read. However instead of doing read queries to the data_points table, we instead make delete queries. A good part to notice here(Line 522) that, if you happen to delete a whole row of data(meaning the time range you gave includes the start and end time of the bucket), its very performant. However if you do a partial deletes and keep querying that row, you may have performance issues because of Cassandra tombstones(may even result in your read queries to not respond!).

Keep in mind!

An important thing to keep in mind when doing reads/deletes is that; Lets say you have 3 metric names and 300.000 cities. To delete 1 week of data for these 3 metrics on Antalya, KairosDB needs to filter all of your row_key_index table in-memory and that may translate to millions of comparisons. See calculating row_key_index size in Optimization section!

New Cassandra Schema

The maintainer of the KairosDB database has been doing some work with CQL instead of using THRIFT. The beta version of KairosDB that uses CQL was released in July 2017.

This version has some new tables in the Schema, however its still compatible with the old version of KairosDB, meaning it queries both tables for backwards compatibility. There is a small difference that you need to keep in mind, the new version of the Cassandra schema creates string_index table with the column1 fields as blob. This field was text with the old schema. However these schemas are created only if the table doesn’t already exists(Schema.java Line 50). So it shouldn’t cause any problem when upgrading KairosDB to the new version.

The data_points and string_index are actively used for the same purpose they were used with the old version.

New Tables

row_key_time_index

This table has 3 fields metric = text, row_time = timestamp and an unused value = text. Essentially it’s job is to hold what buckets we have for a specific metric. It’s queried with the time range and the metric name, and the result is used to query the row_keys_index table.

row_keys_index

This table holds the row_key we calculated in a CQL friendly way. It has 4 fields namely; metric = text, row_time = timestamp, data_type = text and tags = frozen<map<text, text>>. These are used to hold a row_key in Cassandra table, so they can be queried with their metric, timestamp and data type to be parsed and compared in memory with the query.

Workflow

Insert

Insert is pretty simple and almost the same as the old schema and addition is that the CQL commands are run batched. Bucket is calculated, then the row_key and it’s made sure corresponding entries are created in row_key_time(Line 54) and row_keys(Line 65), after this the data is inserted into the data_points(Line 116) and only the metric names are inserted into the string_index.

Read

We still need a way to find which data_points buckets we need to query. To do this, KairosDB needs to query both old schema’s row_key_index table and the new row_key_time_index and row_keys table.

The old schema is queried like the old version of KairosDB but instead of using Thrift, CQL is used.

For the new schema; row_key_time_index is queried with the metric name and the time range(Line 780) to get the buckets available, then multiple queries are made to the row_keys table(Line 788) and the returning entries are filtered in memory(Line 823) according to the query made.

Delete

New version of cassandra doesn’t seem to delete row_key_time_index and row_keys entries for the metric data. It just deletes the old schema’s row_key_index entry, and the data_points entries(Line 595 and Line 614). There are no delete queries for the new tables in Schema.java.

Optimization, what to keep in mind

Calculating your row_key_index size

To do this calculation. Lets say you have 300.000 different cities, and you hold Temperature, Humidity and Wind metrics for these cities. And you decided to model your data in KairosDB so the metric names correspond to the Temperature, Humidity and Wind. And the cities are stored with the city=XXX tag. You will have N = 300.000 * 3 = 900.000 entries in your row_key_index. And you get N more row_key_index entries each 3 week because KairosDB divides your data into buckets! You can see how this could slow your queries with the Read and Delete, because of in-memory comparisons. The situation is different with the new schema.

Gotchas, not many performance improvements… Slow deletes, reads.

KairosDB isn’t the most optimized database when it comes to executing your queries on Cassandra. One of the most apparent feature it lacks, is that when you make a query with same metric name, but different tags, it queries the row_key_index table multiple times. And considering this metric name is populated, this may cause a lot of slow downs on your system. This can happen both with reads and deletes.

Lets keep going with our example where we had 3 different metrics Temperature, Humidity and Wind and we had 1 tag named city, which holds the city name. If we were to run a delete query which deletes all of the metrics for 2 different cities Antalya and Istanbul, this will cause 2 scans on all of our row_key_index. Lets think a scenario where we run this kind of query for deleting a big number of cities… This could cause a serious slowdown with the current schema(s).

Don’t have too many tombstones

Cassandra doesn’t delete records when you execute a delete query. It just marks them as tombstones and deletes them in the next compaction. Until this compaction happens, every query you make to these sstables, will need to filter out tombstones records and this may cause problem such as your read queries not responding or getting slower. You should keep in mind KairosDB stores 3 weeks of metric data in a row for each specific metric/tag combinations. If you delete some data in the row you are actively using, you may get slow downs/halts/aborts. Please see this stackoverflow question, regarding this problem.