We are building an IoT platform receiving data from million of devices every minute. We need to run data analytics over time series using Spark where data is stored in Cassandra. We are doing benchmarking Cassandra and Spark to find out best Cassandra table model along with best querying method of Spark.Modeling Cassandra Table and for query optimization and also following the best practices of Cassandra seems contradictory. Table models that suits best for querying either causes larger partition size problem or hotspot issue in Cassandra cluster.
We are using a virtual machine with 8 vCPUs and 30 GB memory running Ubuntu Server 16.04. Both Cassandra and Spark are running on same machine with single node deployment. We are using Cassandra:3.11.2, Spark 2.2.0 and Spark-Cassandra-Connector:2.0.7-s_2.11 version.
Data-set contains data from 1000 devices, each sending one packet per minute over a total duration of two months (July and Aug 2017) or 62 days. Each table contains 89.28 million entries. We have created 5 tables in Cassandra having same data but different PRIMARY KEYs as shown below:
#### C* table1 ####
CREATE TABLE test_timeseries.timeseries_with_date1 (
dev_id text,
day date,
rec_time timestamp,
voltage float,
PRIMARY KEY ((dev_id, rec_time), day)
) WITH CLUSTERING ORDER BY (day ASC)
#### C* table2 ####
CREATE TABLE test_timeseries.timeseries_with_date2 (
dev_id text,
day date,
rec_time timestamp,
voltage float,
PRIMARY KEY (dev_id, day, rec_time)
) WITH CLUSTERING ORDER BY (day ASC, rec_time ASC)
#### C* table3 ####
CREATE TABLE test_timeseries.timeseries_with_date3 (
day date,
dev_id text,
rec_time timestamp,
voltage float,
PRIMARY KEY (day, dev_id, rec_time)
) WITH CLUSTERING ORDER BY (dev_id ASC, rec_time ASC)
#### C* table4 ####
CREATE TABLE test_timeseries.timeseries_with_date4 (
day date,
dev_id text,
rec_time timestamp,
voltage float,
PRIMARY KEY ((day, dev_id), rec_time)
) WITH CLUSTERING ORDER BY (rec_time ASC)
#### C* table5 ####
CREATE TABLE test_timeseries.timeseries_with_date5 (
dev_id text,
day date,
rec_time timestamp,
voltage float,
PRIMARY KEY ((dev_id, day), rec_time)
) WITH CLUSTERING ORDER BY (rec_time ASC)
Initially we are benchmarked these 5 tables with three different querying methods available in Spark. We ran a simple count query on each table for different number of devices over variable time duration in days. We will discuss each method one by one:
1. where() function of DataFrame API: Conditions on day and dev_id are given through where() function followed by cunt() function on dataframe. This query is performed as follows:
### Loading Table from Cassandra ###
table1_df = spark.read.format("org.apache.spark.sql.cassandra")\
.options(keyspace = 'test_timeseries', table = 'timeseries_with_date').load()
#### Querying table ####
query_table1_df = table1_df.where((col("day") >= "2017-07-01") & (col("day") <= "2017-07-04") & \
(col("dev_id").isin(devices))).count()
2. Spark SQL Query using range queries: Here we are running queries using Spark SQL API and gives duration through range. It makes writing queries simple and improves code readability. This query is performed as follows:
#### Loading Table from Cassandra and Registering Temp View ####
spark.read.format("org.apache.spark.sql.cassandra")\
.options(keyspace = 'test_timeseries', table = 'timeseries_with_date')\
.load().createOrReplaceTempView("table1")
#### Querying table where devices is a string having list of all dev_id's ####
query_table1 = spark.sql("SELECT COUNT(1) FROM table1 WHERE day >= cast('2017-07-01' as date) AND \
day <= cast('2017-07-15' as date) AND dev_id IN(" + devices + ")"
3. Spark SQL Query using IN() : Here we are running queries using Spark SQL API and giving dates as list through IN(). This is to check weather range performs better or passing days through IN(). This query is performed as follows:
query1 = "SELECT COUNT(1) FROM table1 WHERE day IN " + dates + " AND dev_id IN " + devices
## dates is string containing all days and devices is string containing all dev_id's
query_table1 = spark.sql(query1)
Results of these queries are shown in Without Year sheet in results sections. Time format is minutes:seconds.milliseconds (MM:SS.sss)
After reading this article of Jon Haddad about “Cassandra Time Series Data Modeling For Massive Scale”, we added an extra field of year.in partition keys of above four PRIMARY KEYs. Dataset contains data from 1000 devices, each sending one packet per minute over a total duration of four months (Nov 2017 to Feb 2018) or 120 days.Now we have four tables with following schema and primary keys:
#### C* table with year 1 ####
CREATE TABLE test_timeseries.timeseries_table_with_year1 (
year int,
dev_id text,
day date,
rec_time timestamp,
voltage float,
PRIMARY KEY ((year, dev_id), day, rec_time)
) WITH CLUSTERING ORDER BY (day ASC, rec_time ASC)
#### C* table with year 2 ####
CREATE TABLE test_timeseries.timeseries_table_with_year2 (
year int,
day date,
dev_id text,
rec_time timestamp,
voltage float,
PRIMARY KEY ((year, day), dev_id, rec_time)
) WITH CLUSTERING ORDER BY (dev_id ASC, rec_time ASC)
#### C* table with year 3 ####
CREATE TABLE test_timeseries.timeseries_table_with_year3 (
year int,
dev_id text,
day date,
rec_time timestamp,
voltage float,
PRIMARY KEY ((year, dev_id, day), rec_time)
) WITH CLUSTERING ORDER BY (rec_time ASC)
#### C* table with year 4 ####
CREATE TABLE test_timeseries.timeseries_table_with_year4 (
year int,
day date,
dev_id text,
rec_time timestamp,
voltage float,
PRIMARY KEY ((year, day, dev_id), rec_time)
) WITH CLUSTERING ORDER BY (rec_time ASC)
Results of querying these tables are also available in With Year sheet in Results section
Here we will briefly discuss the behavior of each PRIMARY KEY with respect to query time and its effect on C* cluster.
We observed that having a multi-field partition key allows for fast querying only if the "=" is used going left to right. If an IN() (for specifying eg. range of time or list of devices) is used once that order, than any further usage of IN() removes any benefit (i.e. a near full table scan).
Another useful observation was that using the IN() to query for days is less useful than putting in a range query.
The main takeaways from this benchmarking is that we don't have a single schema to answer our (IoT) use case without any drawbacks. Thus while the ((day, dev_id), rec_time) gives a constant response, it is dependent entirely on the total data size (full scan). On the other hand, (dev_id, day, rec_time) and its counterpart (day, dev_id, rec_time) provide acceptable results, we have the issue of very large partition space in the first, and hotspot while writing for the latter case.