Streaming late arriving data

Spark Structured streaming can handle late arriving data. Here is a demo of how to use it.

Firstly if the input data is stateless, i.e. a row is not relying on any other row, then streaming can simply insert every row that is late or not. This is the 'append' mode of structured streaming.

If the input data is stateful, e.g. an aggregation (count, previous value, avg, etc.) is applied on the stream, the streaming is more complex. The aggregation value could change over time as late arriving data comes through. Structured streaming needs to keep record of all aggregation buckets and update the aggregation value as soon as new row is added to a bucket. E.g. the count of dog is 20 as at 10:00am, but when another dog record arrives at 10:05 (the record itself can be late or new), the streaming needs to emit  another count of 21 for dog.

As the aggregation keeps being updated, the down stream target table needs to update the counts accordingly, or simply append new counts to the table.

Keeping record of all aggregation buckets costs a lot of memory, so we can apply a watermark, to specify a period of time in which late arriving data is considered. Anything earlier than that will just be ignored. The watermark specifies e.g. 10 hours earlier than the current timestamp.

Note the aggregation can use time window as well, e.g. dog count for every 5 minutes interval. The time windows can overlap, which is sliding window.

Ok, there is the example:

Create source and target delta tables. Note databricks use delta table by default. 

%sql

CREATE TABLE if not exists default.ben_test_late_data_source (

  id INT,

  name STRING,

  ts timestamp

) USING DELTA;

CREATE TABLE if not exists default.ben_test_late_data_target (

  name STRING,

  time_window struct<start:timestamp,end:timestamp>, --struct data type

  count int

) USING DELTA;

Note if need to restart streaming, truncate the tables and DELTE the checkpoint files used by streaming.

Now setup the streaming pipeline. As we want to update existing rows in the target table, we use a merge function so as to update the counts. The input stream emits changes rows continuously (in mini batch) and here we merge a mini batch to the target table.


from pyspark.sql import SparkSession

from pyspark.sql import functions as F


def merge_delta(changes, target):     

    changes.dropDuplicates(['name','time_window'])

                       .createOrReplaceTempView('tmp_changes')    #the deduplicate is just in case the streaming has bugs


    #A merge operation can fail if multiple rows of the source dataset match and the merge attempts to update the same rows of the target Delta table

    #A merge operation can produce incorrect results if the source dataset is non-deterministic, 

    #   e.g. Reading from non-Delta tables / Using non-deterministic operations e.g. Dataset.filter()

    merge_sql = '''MERGE INTO {} t

                  USING tmp_changes c

                  ON c.name = t.name AND c.time_window = t.time_window

                  WHEN MATCHED THEN UPDATE SET *

                  WHEN NOT MATCHED THEN INSERT * '''.format(target)   

   # NOTE: You have to use the SparkSession that has been used to define the `micro batch` dataframe

    changes._jdf.sparkSession().sql(merge_sql)


spark = SparkSession.builder.getOrCreate()

stream = (

    spark.readStream

         .format('delta')

         .option("ignoreChanges", "true") #allow changes to flow through instead of throwing exceptions

         .table('default.ben_test_late_data_source')

         .withWatermark("ts", "30 minutes") #allow changes no later than 30 minutes

         .groupBy('name', F.window(timeColumn = 'ts', windowDuration = '5 minutes',\

                                   slideDuration = None, startTime =None).alias('time_window')) #specify the time window for aggregation

         .agg(F.count('name').alias('count'))

)

#Currently Databricks Delta only supports 'append' and 'complete' as outputMode

#so do a merge for each batch here to update existing rows

#Each micro batch from the readstream has been aggregated already so every group key has only one row to output

#so dont worry about merging two cats' counts in one go

output = (

    stream.writeStream

          .foreachBatch(lambda batch, batch_id : merge_delta(batch, 'default.ben_test_late_data_target'))

          .outputMode('update')

          .option("checkpointLocation", '/tmp/default.ben_test_late_data')

          .start()

)


When the pipeline is running, we can test it by inserting & deleting recording in the source table.

%sql 

--initial record, should see this comes through to target after some seconds

--insert into default.ben_test_late_data_source

--values (1, 'dog', '2021-01-01 1:01:00');


--insert another dog, should see the target been updated with count 2, still one row though

-- insert into default.ben_test_late_data_source

-- values (2, 'dog', '2021-01-01 1:05:00');


--insert cat, now the timesteamp moves to 40 minutes later

--which means the anything earlier than (current time - water mark) wont be loaded

-- insert into default.ben_test_late_data_source

-- values (3, 'cat', '2021-01-01 1:40:00');


--insert a late arrive dog, this should not be picked up as it is outside of the watermark window

-- insert into default.ben_test_late_data_source

-- values (4, 'dog', '2021-01-01 1:06:00');


--another late arriving cat, this will be put in a different aggregation window as the window is 5 mins

-- insert into default.ben_test_late_data_source

-- values (5, 'cat', '2021-01-01 1:30:00');


--a third late arriving cat, this will be put in an existing window

-- insert into default.ben_test_late_data_source

-- values (5, 'cat', '2021-01-01 1:31:00');


--delete a dog, should impact anything because delte is ignored

delete from default.ben_test_late_data_source

where id = 1


Observe results in the target table

%sql 

select * from default.ben_test_late_data_target



To  improve MERGE performance, a few things can be done.