AWS Redshift Change Data Capture(CDC) Part 02

Huzefa Khan
3 min readMar 11, 2023

Change Data Capture (CDC) is a technique used to track changes in a database and propagate those changes to downstream systems. Watermarking is a common approach to implementing CDC, where a watermark table is used to keep track of the latest change processed by the CDC system.

In Amazon Redshift, CDC can be implemented using a combination of SQL scripts. Here’s a high-level overview of how you can implement CDC using a watermark table in Redshift:

  1. Create a watermark table that will hold the table name to store the watermark value(last updated timestamp of data).
create table watermark
(
table_name varchar(255),
transact_timestamp datetime,
);

2. Set the default value of the high watermark with the table name of source data store.

INSERT INTO watermarktable
VALUES ('data_source_table',sysdate)

3. Create a source table on the target schema and add the necessary columns

CREATE OR REPLACE PROCEDURE public.create_table (source_schema varchar,target_schema varchar,vtable varchar)
AS $$
DECLARE tname VARCHAR(50):=target_schema||'.'||vtable;

BEGIN

execute 'alter table ' || source_schema||'.'||vtable ||' add column transact_timestamp datetime default sysdate';
execute 'create table ' || tname || ' (LIKE '||source_schema || '.'||vtable||')'; -- TODO auto distkey auto sortkey (select * from )
-- execute 'alter table ' || tname||' add column transact_timestamp datetime default sysdate';
execute 'alter table ' || tname||' add column operation VARCHAR(64) encode lzo';
execute 'INSERT INTO public.watermark VALUES ('''|| tname|| ''',NULL);';

END;
$$ LANGUAGE plpgsql;

4. The following is the final process that was utilized to detect alterations in the data and transfer it to the desired destination table. It can be customized to fit specific needs.

The below example moved the changed data from the sales_dms_landing table to the sales_target table.

CREATE OR REPLACE PROCEDURE public.sales_dms_staging_to_final()
AS $$
DECLARE
watermark_timestamp TIMESTAMP WITHOUT TIME ZONE;
BEGIN
SELECT INTO watermark_timestamp
w.transact_timestamp FROM public.watermark w
WHERE w.table_name = 'public.sales_target';

RAISE INFO 'Initial watermark_timestamp = %', watermark_timestamp;

DROP TABLE IF EXISTS example_staging_temp;
CREATE TEMPORARY TABLE example_staging_temp (LIKE public.sales_target);

-- Initial load has no watermark timestamp
IF watermark_timestamp IS NULL THEN
-- Add a limit to the query for initial load? ~350 mill records on example table
-- should we paginate, from the beginning? CDC can have max of 8,000 records in a batch
INSERT INTO public.sales_target
SELECT *
FROM public.sales_dms_landing;

SELECT INTO watermark_timestamp
MAX(transact_timestamp) FROM public.sales_target;
RAISE INFO 'Final watermark_timestamp = %', watermark_timestamp;
END IF;

--Subsequent loads will have a watermark timestamp set
IF watermark_timestamp IS NOT NULL THEN
INSERT INTO example_staging_temp
SELECT * FROM public.sales_dms_landing t
WHERE t.transact_timestamp >= watermark_timestamp;

update example_staging_temp set operation = 'I'
where salesid not in ( select sales_target.salesid from sales_target );

update example_staging_temp set operation = 'U'
where salesid IN ( select sales_target.salesid from sales_target);

-- Get records to be deleted or updated from the final table
DELETE FROM public.sales_target USING example_staging_temp
WHERE sales_target.salesid = example_staging_temp.salesid;

-- Get records to be inserted into the final table
-- All the records will be insert since deleted & updated
-- records will already be deleted from the final table

INSERT INTO public.sales_target
SELECT * FROM example_staging_temp;
-- WHERE operation = 'I' OR operation = 'U';

SELECT INTO watermark_timestamp
MAX(transact_timestamp) FROM example_staging_temp;
RAISE INFO 'Final watermark_timestamp = %', watermark_timestamp;
END IF;

DELETE FROM public.watermark WHERE table_name = 'public.sales_target';
INSERT INTO public.watermark VALUES('public.sales_target', watermark_timestamp);

DROP TABLE example_staging_temp;
END;
$$ LANGUAGE plpgsql;

5. At first time you have to create tables on the target schema and add time stamps and operation columns on target tables. Run the following procedure at the first load of a specific table.

call public.create_table('source_shema','target_schema','table_name');
call public.sales_dms_staging_to_final()

After the initial load, you just need to run the final procedure.

call public.sales_dms_staging_to_final()

At the last job, the scheduling phase will come to automate scripts as per requirements. There are multiple ways to do it like Scheduled queries in redshift, Executing scripts through lambda functions using redshift data APIs, and Simple python jobs.

--

--

Huzefa Khan

Passionate Sr. Data Engineer with years of experience in developing and architecting high-class data solutions https://www.linkedin.com/in/huzzefakhan/