Here it uses the merge statement from DeltaTable to implement type 2 changes merge / upsert.
Insert / update / delete rows accordingly.
from pyspark.sql import SparkSession
from delta import DeltaTable
spark = SparkSession.builder.getOrCreate()
#create a test delta table, the 'deleted' column indicates if the row is deleted
test_table_path = 'yourcatalog.schema.test_delta'
df = spark.createDataFrame([[1,'a', False], #for testing update
[2,'b',False]], #for testing delete
schema=['id', 'value','deleted'])
df.write.mode('overwrite').option("overwriteSchema", True).saveAsTable(test_table_path)
#merge changes into the test delta table
target_table = DeltaTable.forName(spark, test_table_path)
df_source = spark.createDataFrame([[1,'aa', False], #update the existing row
[2,'b', True], #delete the existing row
[3, 'new', False]], #insert a new row
schema=['id', 'value', 'deleted'])
(
target_table.alias('t')
.merge(df_source.alias('s'), condition='t.id = s.id')
.whenMatchedDelete(condition='s.deleted is true')#remove the deleted rows
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll(condition='s.deleted is false')#insert only non-deleted rows
.execute()
)
Operation semantics
match target and source tables based on match condition. The target is a DeltaTable type and source is a spark dataframe.
A basic structure of merge is:
target.merge(source, condition='a.id = b.id ...')
.whenMatchedXXX(optional_condition)
.whenNotMatchedXXX(optional_condition)
...
There can be any number of whenMatchedXXX and whenNotMatchedXXX clauses,but for whenMatchedXXX, there can be at most one update and one delete, for whenNotMatchedXXX there can be only insert action.
Each whenMatchedXXX or whenNotMatchedXXX can have optional condition as well, so it's considered matched or not matched if the optional conditions are met as well.
When there are multiple whenMatchedXXX, they are executed in the specified order. All whenMatchedXXX clauses, except the last one, must have optional conditions.
This is like if-elseif-else. The last clause captures the rest after the prior clauses each handles a subset conditionally.
This is the same for multiple whenNotMatchedXXX clauses.