synapse spark streaming
Synapse notebook can use Spark structured streaming for loading data from a datalake storage to another storage. Its pretty much the same as in Databricks. Just PySpark stuff.
Firstly, mout the source. This query the source data lake been added as a linked service in Synapse first using either key or service principal or subscription, etc.
from notebookutils import mssparkutils
container_name ='your container'
account_name = 'your storage account'
mssparkutils.fs.mount(
f"abfss://{container_name}@{account_name}.dfs.core.windows.net",
"/mount",
{"linkedService":"your linked service name", "fileCacheTimeout": 5, "timeout": 5}
)
Then get the mount path
path = mssparkutils.fs.getMountPath("/mount/")
This returns something like
#'/synfs/11/mountODL/'
Reformat the path as synfs protocal
mount_path = path.replace('/synfs', 'synfs:')
#'synfs:/10/mount/'
Make sure the destination has been created and the check point folder has been created / cleared as well.
# from notebookutils import mssparkutils
# mssparkutils.fs.mkdirs('/temp/check_point')
# mssparkutils.fs.mkdirs('/raw/history')
# mssparkutils.fs.rm('/raw/history/', True)
# mssparkutils.fs.rm('/temp/check_point/', True)
Once ready:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
history_source = mount_path + "your source folder/subfolder/" #this is accessing a mounted location
history_target = '/raw/history' #the destination storage location
history_schema = 'Tag String, TimeStamp Timestamp, value Double, IsValueGood Boolean, Error String' #the data schema
your_tags = ['your tag'] #A list of tags as tiler
checkpoint_folder = '/temp/check_point' #check point folder, so streaming to restart if needed
df = (
spark.readStream
.schema(history_schema)
.parquet(history_source )
.filter(F.col('Tag').isin(your_tags )) #load only the tags required
.withColumnRenamed('Tag', 'Name')
.withColumn('TS', F.to_timestamp('TimeStamp', 'dd-MMM-yy HH:mm:ss.S')) #the TS string format 10-MAR-20 10:53:57.1
.withColumn('Date', F.to_date(F.col('TS'), 'yyyy-MM-dd')) #extra date column for partitioning
.withColumn('Value', F.col('value').cast('double')) #casting data type
.select('Name', 'TS', 'Value', 'IsValueGood', 'Error', 'Date')
)
query = (
df.writeStream
.partitionBy('Date')
.outputMode("append")
.option("checkpointLocation",checkpoint_folder )
.format("parquet")
.option("path", history_target )
.start()
)
The weird thing is the streaming doesn't seem to be blocking the cell. It just completes and you don't know if it is still running the backend as a separate thread or not.
One way to check is use the 'query' instance returned by the writestream. It can also show query's id, name, etc.
query.recentProgress
query.lastProgress
query.status
# {'message': 'Getting offsets from FileStreamSource[synfs:/10/mountODL/raw/data/osipi/af/aquila/archive/2023/01]',
# 'isDataAvailable': False,
# 'isTriggerActive': True}
This will show the progress of the query. Also spark can show which stream query is still active
spark.streams.active
spark.streams.active[0].lastProgress
Now check the data in the destination folder
mssparkutils.fs.ls('/raw/history/')
Read into spark dataframe
df = spark.read.schema(history_schema).parquet(history_target)
df.limit(10).show()