Streaming late arriving data
Spark Structured streaming can handle late arriving data. Here is a demo of how to use it.
Firstly if the input data is stateless, i.e. a row is not relying on any other row, then streaming can simply insert every row that is late or not. This is the 'append' mode of structured streaming.
If the input data is stateful, e.g. an aggregation (count, previous value, avg, etc.) is applied on the stream, the streaming is more complex. The aggregation value could change over time as late arriving data comes through. Structured streaming needs to keep record of all aggregation buckets and update the aggregation value as soon as new row is added to a bucket. E.g. the count of dog is 20 as at 10:00am, but when another dog record arrives at 10:05 (the record itself can be late or new), the streaming needs to emit another count of 21 for dog.
As the aggregation keeps being updated, the down stream target table needs to update the counts accordingly, or simply append new counts to the table.
Keeping record of all aggregation buckets costs a lot of memory, so we can apply a watermark, to specify a period of time in which late arriving data is considered. Anything earlier than that will just be ignored. The watermark specifies e.g. 10 hours earlier than the current timestamp.
Note the aggregation can use time window as well, e.g. dog count for every 5 minutes interval. The time windows can overlap, which is sliding window.
Ok, there is the example:
Create source and target delta tables. Note databricks use delta table by default.
%sql
CREATE TABLE if not exists default.ben_test_late_data_source (
id INT,
name STRING,
ts timestamp
) USING DELTA;
CREATE TABLE if not exists default.ben_test_late_data_target (
name STRING,
time_window struct<start:timestamp,end:timestamp>, --struct data type
count int
) USING DELTA;
Note if need to restart streaming, truncate the tables and DELTE the checkpoint files used by streaming.
Now setup the streaming pipeline. As we want to update existing rows in the target table, we use a merge function so as to update the counts. The input stream emits changes rows continuously (in mini batch) and here we merge a mini batch to the target table.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def merge_delta(changes, target):
changes.dropDuplicates(['name','time_window'])
.createOrReplaceTempView('tmp_changes') #the deduplicate is just in case the streaming has bugs
#A merge operation can fail if multiple rows of the source dataset match and the merge attempts to update the same rows of the target Delta table
#A merge operation can produce incorrect results if the source dataset is non-deterministic,
# e.g. Reading from non-Delta tables / Using non-deterministic operations e.g. Dataset.filter()
merge_sql = '''MERGE INTO {} t
USING tmp_changes c
ON c.name = t.name AND c.time_window = t.time_window
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT * '''.format(target)
# NOTE: You have to use the SparkSession that has been used to define the `micro batch` dataframe
changes._jdf.sparkSession().sql(merge_sql)
spark = SparkSession.builder.getOrCreate()
stream = (
spark.readStream
.format('delta')
.option("ignoreChanges", "true") #allow changes to flow through instead of throwing exceptions
.table('default.ben_test_late_data_source')
.withWatermark("ts", "30 minutes") #allow changes no later than 30 minutes
.groupBy('name', F.window(timeColumn = 'ts', windowDuration = '5 minutes',\
slideDuration = None, startTime =None).alias('time_window')) #specify the time window for aggregation
.agg(F.count('name').alias('count'))
)
#Currently Databricks Delta only supports 'append' and 'complete' as outputMode
#so do a merge for each batch here to update existing rows
#Each micro batch from the readstream has been aggregated already so every group key has only one row to output
#so dont worry about merging two cats' counts in one go
output = (
stream.writeStream
.foreachBatch(lambda batch, batch_id : merge_delta(batch, 'default.ben_test_late_data_target'))
.outputMode('update')
.option("checkpointLocation", '/tmp/default.ben_test_late_data')
.start()
)
When the pipeline is running, we can test it by inserting & deleting recording in the source table.
%sql
--initial record, should see this comes through to target after some seconds
--insert into default.ben_test_late_data_source
--values (1, 'dog', '2021-01-01 1:01:00');
--insert another dog, should see the target been updated with count 2, still one row though
-- insert into default.ben_test_late_data_source
-- values (2, 'dog', '2021-01-01 1:05:00');
--insert cat, now the timesteamp moves to 40 minutes later
--which means the anything earlier than (current time - water mark) wont be loaded
-- insert into default.ben_test_late_data_source
-- values (3, 'cat', '2021-01-01 1:40:00');
--insert a late arrive dog, this should not be picked up as it is outside of the watermark window
-- insert into default.ben_test_late_data_source
-- values (4, 'dog', '2021-01-01 1:06:00');
--another late arriving cat, this will be put in a different aggregation window as the window is 5 mins
-- insert into default.ben_test_late_data_source
-- values (5, 'cat', '2021-01-01 1:30:00');
--a third late arriving cat, this will be put in an existing window
-- insert into default.ben_test_late_data_source
-- values (5, 'cat', '2021-01-01 1:31:00');
--delete a dog, should impact anything because delte is ignored
delete from default.ben_test_late_data_source
where id = 1
Observe results in the target table
%sql
select * from default.ben_test_late_data_target
To improve MERGE performance, a few things can be done.
improve the match criteria. If the target table is partitioned by 'date' for example, and you are merging only delta from the last few days, then adding the 'date' to the match conditions can reduce the search space in the target table and thus improve speed.
compact files. You can use 'OPTIMIZE' command to compact a delta table. Should do this periodically for a table that has many small files.
Control the shuffle partition for writes. The merge operation shuffles data multiple times to compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session configuration spark.sql.shuffle.partitions. Setting this parameter not only controls the parallelism but also determines the number of output files. Increasing the value increases parallelism but also generates a larger number of smaller data files, so need a trade off.
etc.