Databricks recommends frequently running the OPTIMIZE command to compact small files.
This operation does not remove the old files. To remove them, run the VACUUM command.
VACUUM
VACUUM removes all files from the table directory that are not managed by Delta, as well as data files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. VACUUM will skip all directories that begin with an underscore (_), which includes the _delta_log.
If you run VACUUM on a Delta table, you lose the ability to time travel back to a version older than the specified data retention period.
The default configuration for a Delta table is 30 days. In databricks, the default retention threshold for data files after running VACUUM is 7 days.
You change retention duration per table:
ALTER TABLE <table-name>
SET TBLPROPERTIES ('delta.logRetentionDuration'='7 days')
Note VACUUM runs for a long time, especially when tables are huge and/or when tables are a source for high frequency input streams.
In SQL:
VACUUM table_name [RETAIN num HOURS]
[RETAIN num HOURS] is the retention threshold in hours.
In Spark, just run the SQL command through spark.sql()
OPTIMIZE
Run OPTIMIZE to eliminate small files. When you combine OPTIMIZE with regular VACUUM runs you ensure the number of stale data files (and the associated storage cost) is minimized.
In SQL:
OPTIMIZE table_name;
OPTIMIZE table_name WHERE date >= '2017-01-01';
Can enable table properties autoOptimize and autoCompaction as well.
CHECKPOINT
Delta Lake writes checkpoints as an aggregate state of a Delta table at an optimized frequency. These checkpoints serve as the starting point to compute the latest state of the table. Without checkpoints, Delta Lake would have to read a large collection of JSON files (“delta” files) representing commits to the transaction log to compute the state of a table.
Databricks Runtime 11.1 and above set the checkpoint creation interval to 100, instead of 10. As a result, fewer checkpoint files are created. With less checkpoint files to index, the faster the listing time in the transaction log directory
In SQL, set default table checkpoing interval:
alter table <delta-table-name> set tblproperties ('delta.checkpointInterval' = 100)
An example script to check if the log retention and checkpoint intervel are set. Also run the vacuum at the end.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
delta_path = '/mnt/your_table'
df_detail = spark.sql(f"DESCRIBE DETAIL delta.`{delta_path}`")
properties = df_detail.select('properties').first()[0]
print(properties)
if 'delta.logRetentionDuration' not in properties:
spark.sql(f"ALTER TABLE delta.`{delta_path}` SET TBLPROPERTIES ('delta.logRetentionDuration'='7 days')")
else:
print('delta.logRetentionDuration is already set')
if 'delta.checkpointInterval' not in properties:
spark.sql(f"ALTER TABLE delta.`{delta_path}` SET TBLPROPERTIES ('delta.checkpointInterval' = 100)")
else:
print('delta.checkpointInterval is already set')
spark.sql(f"VACUUM delta.`{delta_path}` RETAIN 169 HOURS")
To set the log retention and checkpoing interval when writing to a delta table. Use the write option.
(
df.write
.mode('overwrite')
.option('delta.logRetentionDuration', '7 days')
.option('delta.checkpointInterval', '100')
.saveAsTable('target table')
)