synapse spark streaming

Synapse notebook can use Spark structured streaming for loading data from a datalake storage to another storage. Its pretty much the same as in Databricks. Just PySpark stuff.

Firstly, mout the source. This query the source data lake been added as a linked service in Synapse first using either key or service principal or subscription, etc.

from notebookutils import mssparkutils


container_name ='your container'

account_name = 'your storage account'


mssparkutils.fs.mount(

   f"abfss://{container_name}@{account_name}.dfs.core.windows.net",

   "/mount",

   {"linkedService":"your linked service name", "fileCacheTimeout": 5, "timeout": 5}

)

Then get the mount path

path = mssparkutils.fs.getMountPath("/mount/")

This returns something like

#'/synfs/11/mountODL/'

Reformat the path as synfs protocal

mount_path = path.replace('/synfs', 'synfs:')

#'synfs:/10/mount/'

Make sure the destination has been created and the check point folder has been created / cleared as well.

# from notebookutils import mssparkutils

# mssparkutils.fs.mkdirs('/temp/check_point')

# mssparkutils.fs.mkdirs('/raw/history')

# mssparkutils.fs.rm('/raw/history/', True)

# mssparkutils.fs.rm('/temp/check_point/', True)

Once ready:

from pyspark.sql import SparkSession

from pyspark.sql import functions as F


spark = SparkSession.builder.getOrCreate()


history_source = mount_path + "your source folder/subfolder/" #this is accessing a mounted location

history_target = '/raw/history'  #the destination storage location

history_schema = 'Tag String, TimeStamp Timestamp, value Double, IsValueGood Boolean, Error String' #the data schema

your_tags = ['your tag'] #A list of tags as tiler

checkpoint_folder = '/temp/check_point' #check point folder, so streaming to restart if needed


df = (

     spark.readStream

          .schema(history_schema)

          .parquet(history_source )

          .filter(F.col('Tag').isin(your_tags )) #load only the tags required

          .withColumnRenamed('Tag', 'Name')

          .withColumn('TS', F.to_timestamp('TimeStamp', 'dd-MMM-yy HH:mm:ss.S')) #the TS string format 10-MAR-20 10:53:57.1

          .withColumn('Date', F.to_date(F.col('TS'), 'yyyy-MM-dd')) #extra date column for partitioning

          .withColumn('Value', F.col('value').cast('double'))  #casting data type

          .select('Name', 'TS', 'Value', 'IsValueGood', 'Error', 'Date')

     )


query = (

        df.writeStream

          .partitionBy('Date')

          .outputMode("append")

          .option("checkpointLocation",checkpoint_folder )

          .format("parquet")

          .option("path", history_target )

          .start()

      )

The weird thing is the streaming doesn't seem to be blocking the cell. It just completes and you don't know if it is still running the backend as a separate thread or not.

One way to check is use the 'query' instance returned by the writestream. It can also show query's id, name, etc.

query.recentProgress

query.lastProgress

query.status

# {'message': 'Getting offsets from FileStreamSource[synfs:/10/mountODL/raw/data/osipi/af/aquila/archive/2023/01]',

#  'isDataAvailable': False,

#  'isTriggerActive': True}

This will show the progress of the query. Also spark can show which stream query is still active

spark.streams.active

spark.streams.active[0].lastProgress


Now check the data in the destination folder

 mssparkutils.fs.ls('/raw/history/')

Read into spark dataframe

df = spark.read.schema(history_schema).parquet(history_target)

df.limit(10).show()