Data factory - incremental load - high watermark

Incremental data load from ODBC to data lake csv files


In the ODBC data source there is a MODIFIED column as the timestamp for incremental load.

Firstly it needs an ODBC linked service and dataset pointing to the source table.

The ODBC source doesn't seem to support dynamic parameter / query so it has to point to exactly the table.

If multiple tables, it needs multiple datasets.


The target folder is in a container on data lake gen2.

It needs two datasets pointing to the same target folder.

One dataset is for the 'Sink', which points to the folder with a dynamic file name.

i.e. the File Path of the dataset has "@concat('prefix-', utcnow(), '.csv')" as the file name.

Every time a new incremental csv is loaded, it's appended with a current time suffix.

The second dataset is for the 'Source', which points to the folder but with the file name empty.

In this way, the dataset picks up all the csv files from the folder, so as to query the max MODIFIED from the files.


For incremental data load using timestamp (i.e. high watermark)

the key is to store and query the max of the timestamp.

Here it can use a separate table/file to store the max timestamp from last run

or simply query the max timestamp from the files that have been loaded.


Dataflow

It use a dataflow to query the max timestamp from the files.

The dataflow can not be run from self-hosted runtime, so when debugging it needs to run from an azure runtime.


The dataflow use the source dataset pointing to the target folder, with the Wildcard Paths as "/folder/*"


As the MODIFIED fields is loaded as string from csv, the next task in the dataflow is to convert it to timestamp format.

Use Drived Column task, and add the MODIFIED column with an expression to do the conversion "toTimestamp(Modified, 'yyyy-MM-dd HH:mm:ss.SSS')".

NOTE that the format yyymmmdd must match exactly. If extra digits in millisecond field, (longer than 3 digits SSS), it needs to drop the 

extra digits, e.g. dropRight(Modified, 4)


Next step is to Aggregate and get the max timestamp.

IN the Aggregate task, leave the GroupBy to empty and use the MAX function

In expression, put in toString(max(Modified)).

Why not just max(Modified)? because it automatically converts integer format (# milliseconds) e.g. 1649412494000

Later in the flow, we need the string format e.g. 2022-04-05 08:40:59.755 in construct a query.


Last thing in the dataflow is the Sink component. There are 3 types of sinks: dataset, inline and cache.

Here it uses Cache as it will be visited later frow within the pipeline.

Make sure the 'Write to activity output' is selected, because the next pipeline task will read it.


Now create a new pipeline

Add the dataflow into it as the first task.

If click Debug, it will run the datafow. On the output tab, refresh the run log. 

The little icons next to the dataflow name link to dataflow input, output and details / error.

The output -> is the output from running the dataflow, which is in json format, e.g.

data flow1

{

    "runStatus": {

        ...

        "output": {

            "CacheMaxTimestamp": {

                "value": [

                    {

                        "MaxModified": "2022-04-08 10:08:14.569"

                    }

                ],

                "count": 1

            }

        },

    },

    ...

}

Obvisouly, the MaxModified is returned as a column value. To access it, we need the full path of the JSON element:

runStatus.output.CacheMaxTimestamp.value[0].MaxModified

And append the full path to the dataflow activity output which is: 

activity('data flow name').Output

So in the dynamic content expression,

activity('data flow name').Output.runStatus.output.CacheMaxTimestamp.value[0].MaxModified 

will give "2022-04-08 10:08:14.569"


After the dataflow task, put in a Lookup task to check if there is any newer records in the source

The lookup uses a query:

@concat('select count(*) as RowCount

           from (

              select top 1 *

                from source_table where modified > '''          

         , activity('dataflow name').Output.runStatus.output.CacheMaxTimestamp.value[0].MaxModified

         , ''')sub')

The query is a string conatenated from the select statment and the max modified time.

It queries if any record with modified larger than the max modified.

Note @concat and activity functions are used.


The lookup returns the count, which can be accessed through the lookup's output json.

e.g.

lookup

 {

    "firstRow": {

        "RowCount": 0

    },

    ...

which can be accessed through the full path "activity('lookup name').output.firstRow.RowCount"


Finally, if the lookup count is larger than zero, we copy the new data over to the target, 

otherwise just do nothing. This is done through a conditional task.

The conditional expression is:

@greater(activity('lookup name').output.firstRow.RowCount, 0)

Note it uses @greater to compare the rowcount with 0, instead of rowcount>0.


The edit icon for 'True' / 'False' activities on the conditional task enables a new design pipleine canvas

for adding pipeline tasks. (not handy, would be better to show those tasks within the same (parent) canvas)

Anyway, on the True activity panel, add a copy data task.

Simply put in the ODBC source with a filtering query:

@concat('select ID,Name,Created,Modified 

           from source_table where modified > '''

         , activity('lookup name').Output.runStatus.output.sink1.value[0].MaxModified

         , '''')

It selects only records newer than the max modified time.

And flow into the datalake sink, which will create a new csv file with the current timestamp suffix.


A few more notes:

if using self-hosted runtime, make sure dependency libraries are installed on the host server as well

e.g. for writing parquet file

Make sure Java Runtime Environment has been installed on the Self-hosted Integration Runtime


The datalake source pointing to csv file folder already specifies the subfolder.

But in the dataflow source, the wildcard paths still need to specify the subfolder again, all the way from root to file.

e.g. /folder1/subfolder1/*


in dataflow, the cache sink's data is by default only available within the data flow

but in the Sink setting, you can tick "Write to activity output" then it will be output to

the activity output. So you can access cache result from outside of the dataflow.

But if enable cache output, the dataflow's logging level must be set to None. this can be done from the setting tab of the dataflow task.

Also on the dataflow's settings tab, expand Sink Properties, there is by default an option "First row only" for the output. However, if you expect multiple rows, you need uncheck this manually.


'Parse' transformation is only to parse a JSON/CSV/XML text block (in one column) to separate columns.

Not for parse a column's data format to another format. 


Stupid Azure expression builder says: toTimestamp(Modified, 'yyyy-MM-dd HH:mm:ss.SSS') is for converting a timestamp string with millisecond

toTimestamp('2019-02-03 05:19:28.871', 'yyyy-MM-dd HH:mm:ss.SSS') this works

toTimestamp('2019-02-03 05:19:28.8710000', 'yyyy-MM-dd HH:mm:ss.SSS') this Does Not work! Because of better precision with the millisecond field

But the 'yyyy-MM-dd HH:mm:ss.SSS' DOES NOT work.