Insert Only
Firstly, note that Spark Structured Streaming supports insert-only data sources. In streaming scenarios, insert-only sources typically refer to data streams where new data is continuously appended to the source without any updates or deletions of existing data.
If the data is not insert only, but you still want to use streaming to load changes, then some sort of change data capture / change data feed needs to be implemented. Stream of changes (insert update delete) will have to be handled properly at the destination.
This is not a comprehensive guide, but only covers a few common topics.
An simple example:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#stream to a delta table
spark.readStream \
.table("myTable") \
.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.format("parquet") \
.toTable("newTable")
#stream to a directory
spark.readStream \
.table("myTable") \
.select("value") \
.writeStream \
.format("parquet") \
.option("path", "path/to/destination/dir") \
.start()
Triggers
By default the streaming runs in micro-batch mode, where micro-batches run one immediately after another.
Fixed interval, can be specified to run micro-batch only at the interval time, e.g. every 2 hours
One-time, it runs only once to process all available data until now and stops. This is useful if don't want to keep a cluster running all the time, but only spin up a cluster by schedule. Essentially not streaming, but using the convenience of streaming function with a job schedule. Note this is deprecated and replaced by the available-now trigger.
Available-now. Similar to one-time trigger, it processes all data until now and stops. The difference is it can divide the new data into multiple micro-batches, instead of only one possibly massive batch by the one-time trigger.
Continuous with fixed checkpoint interval.
# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
.format("console") \
.start()
# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
# Available-now trigger
df.writeStream \
.format("console") \
.trigger(availableNow=True) \
.start()
# Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
Change Data Feed
Change Data Feed allows delta table to capture change events (insert, update, delete), which is similar to CDC in sql server.
Structured streaming can be seamlessly used to load changes.
Enable change data feed on delta table.
new table: CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
existing table: ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
all new tables: set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
in new version of spark with dataframewriterv2 or plus, simply use write.tableProperty(property: str, value: str)
Once enabled, changes are stored in the _change_data folder under the table directory, which causes extra storage. Reading the change data should go through delta lake apis, such as structured streaming or sql.
example:
use catalog test;
create table cdf.xyz2 (
id_num int,
description string
) TBLPROPERTIES (delta.enableChangeDataFeed = true)
;
insert into cdf.xyz2
values
(1, 'abc'),
(2, 'hello');
The insert and any later changes will be saved. The "table_changes()" function helps to get the change records, starting from a commit version.
SELECT * FROM table_changes('test.cdf.xyz', 0)
the change records keep the change type, version and timestamp, so you can play the history of the table.
id_num description _change_type _commit_version _commit_timestamp
1 abc insert 1 2024-05-23T04:36:11.000+00:00
2 hello insert 1 2024-05-23T04:36:11.000+00:00
There are different parameters for querying the range of changes.
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
Streaming change data feed
Note that the change data is not persisted forever. It will be purged regularly by interval. A good way to persist the changes, is to plug it into structured streaming and save the changes to a physical table.
if you can afford a 24-7 cluster to stream changes, no need to set trigger. It will continue to run
(spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("test.cdf.xyz")
.writeStream
.option("checkpointLocation", '/tmp/test_cdf_history')
.toTable("test.cdf.xyz_history")
)
Otherwise if running by schedule. The availableNow=true forces it to run ONLY ONCE and stops the streaming. The checkpoint will help to resume next time when kick off the stream again.
(spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("test.cdf.xyz")
.writeStream
.option("checkpointLocation", '/tmp/test_cdf_history')
.trigger(availableNow=True)
.toTable("test.cdf.xyz_history")
)
The .option("readChangeFeed", "true") specifies to read the change feed instead of the table itself.
The history table keeps the change data feed records, which looks like below.
For update transaction, it keeps the values before and after (preimage / postimage).