AWS Redshift Change Data Capture(CDC) Part 01
CDC (Change Data Capture) is a technique used in data management to identify and capture changes made to data in a database over time. The goal of CDC is to track the history of changes made to data, allowing data analysts and data scientists to analyze and understand how the data has changed and evolved over time.
CDC works by monitoring the database for changes made to specific tables or columns. When a change is detected, the CDC process captures information about the change, such as the type of change (insert, update, or delete), the timestamp of the change, and the data that was changed.
The captured changes can then be stored in a separate database or data store, allowing them to be queried and analyzed later. This can be useful in a variety of scenarios, such as auditing, compliance, data analysis, and data synchronization between different systems.
CDC can be implemented using various techniques, such as triggers, log-based replication, and polling-based approaches. The specific implementation of CDC will depend on the database system and the requirements of the use case.
Redshift CDC
Amazon Redshift does not have a built-in Change Data Capture (CDC) feature, but it is still possible to implement CDC using various techniques. One way to implement CDC in Redshift is by using triggers and a staging table to capture changes.
Here’s an example implementation of CDC in Redshift using triggers and a staging table:
- Create a staging table with the same schema as the source table, but with additional columns to capture CDC metadata:
CREATE TABLE my_source_table_stg (
id INT,
name VARCHAR(50),
age INT,
modified_date TIMESTAMP,
cdc_operation CHAR(1), -- 'I' for insert, 'U' for update, 'D' for delete
cdc_timestamp TIMESTAMP DEFAULT GETDATE()
);
2. Create a procedure on the source table to capture changes and insert them into the staging table:
CREATE OR REPLACE PROCEDURE insert_into_stg()
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO my_source_table_stg (id, name, age, modified_date, cdc_operation)
SELECT id, name, age, modified_date, 'I'
FROM my_source_table
WHERE modified_date > (SELECT COALESCE(MAX(modified_date), '1900-01-01') FROM my_source_table_stg WHERE cdc_operation = 'I');
END;
$$;
CREATE OR REPLACE PROCEDURE update_into_stg()
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO my_source_table_stg (id, name, age, modified_date, cdc_operation)
SELECT id, name, age, modified_date, 'U'
FROM my_source_table
WHERE modified_date > (SELECT COALESCE(MAX(modified_date), '1900-01-01') FROM my_source_table_stg WHERE cdc_operation = 'U');
END;
$$;
CREATE OR REPLACE PROCEDURE delete_into_stg()
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO my_source_table_stg (id, name, age, modified_date, cdc_operation)
SELECT id, name, age, modified_date, 'D'
FROM deleted_source_table
WHERE modified_date > (SELECT COALESCE(MAX(modified_date), '1900-01-01') FROM my_source_table_stg WHERE cdc_operation = 'D');
END;
$$;
You can do it by creating a procedure
3. Create a stored procedure to merge the changes from the staging table into the target table, and truncate the staging table:
CREATE OR REPLACE PROCEDURE my_source_table_cdc AS
BEGIN
MERGE INTO my_target_table AS tgt
USING (
SELECT id, name, age, modified_date, cdc_operation
FROM my_source_table_stg
) AS src
ON tgt.id = src.id
WHEN MATCHED AND src.cdc_operation = 'U' THEN
UPDATE SET
tgt.name = src.name,
tgt.age = src.age,
tgt.modified_date = src.modified_date
WHEN MATCHED AND src.cdc_operation = 'D' THEN
DELETE
WHEN NOT MATCHED AND src.cdc_operation = 'I' THEN
INSERT (id, name, age, modified_date)
VALUES (src.id, src.name, src.age, src.modified_date);
TRUNCATE TABLE my_source_table_stg;
END;
4. Schedule the stored procedure to run periodically, e.g. every minute or every hour, depending on your needs.
-- Schedule for insert procedure
CREATE SCHEDULE insert_schedule
CRON '0 * * * *' -- Runs every hour
ENABLE
DO
CALL insert_into_stg();
-- Schedule for update procedure
CREATE SCHEDULE update_schedule
CRON '0 * * * *' -- Runs every hour
ENABLE
DO
CALL update_into_stg();
-- Schedule for delete procedure
CREATE SCHEDULE delete_schedule
CRON '0 * * * *' -- Runs every hour
ENABLE
DO
CALL delete_into_stg();
Ensure that the modified_date
column is updated appropriately in your source table to track changes accurately. Also, you might need to adapt the scheduling frequency based on your requirements and the volume of data changes.
There is another example to implement the CDC using a watermark table technique.
By using triggers and a staging table, we can capture changes to the source table and merge them into the target table using a stored procedure. This approach is not as efficient as a built-in CDC feature, but it can still provide a reliable and effective way to capture changes in Redshift.