Before starting this topic, please be aware that it is quite long and dense. Also, it contains quite some database procedural code. But please bear with me until the end of the topic. This is an almost must have technique to maintain and update data warehouses with any significant volume of data. And you will see it all come together in the end !
When we periodically update our Data Warehouse with new data, instead of going to the entire source data tables, which might have millions of rows, ideally we would want a way to only capture the data that was added, changed or deleted.
This is where Change Data Capture comes in. Below we will be examining an example on how to implement this on PostgreSQL.
This is a bit dense and at first not directly related to KNIME. But later on we will see how this is useful to incrementally load our Facts table.
This example is based on the PostgreSQL data base, but should work on any database that supports triggers.
In this example, suppose we have a table called 'users', which has the columns user_id, username and email, and we want to track the changes made to this table.
For this to work we will use a feature of PostgreSQL called the Write Ahead Log (WAL). To set this up we need to make some changes to the Postgres configuration file postgresql.conf (see below)
For our example, we will first create the 'users' table with this SQL code:
The Change Data Capture uses a Pub/Sub mechanism. That's why the CREATE PUBLICATION statement is there.
The next step will be to create the table where we will log the changes to our 'users' table:
Now we create a Subscription to the Publication we set up before:
The next step is to create a function called 'capture_changes' , of which you can see the code below:
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO users_changes (user_id, operation_type, timestamp, username_before, email_before)
VALUES (OLD.user_id, 'DELETE', NOW(), OLD.username, OLD.email);
ELSEIF (TG_OP = 'UPDATE') THEN
INSERT INTO users_changes (user_id, operation_type, timestamp, username_before, email_before, username_after, email_after)
VALUES (NEW.user_id, 'UPDATE', NOW(), OLD.username, OLD.email, NEW.username, NEW.email);
ELSEIF (TG_OP = 'INSERT') THEN
INSERT INTO users_changes (user_id, operation_type, timestamp,username_after, email_after)
VALUES (NEW.user_id, 'INSERT', NOW(), NEW.username, NEW.email);
END IF;
RETURN NEW;
END;
And finally, we need a trigger which fires on changes to the 'users' table and calls our function:
CREATE TRIGGER users_trigger
AFTER INSERT OR UPDATE OR DELETE
ON users
FOR EACH ROW
EXECUTE FUNCTION capture_changes():
On the next page we will see what happens when me make changes to our 'users' table.
Appendix:
Changes made to the postgresql.conf file: