SQL CDC Introduction
Overview
SQL CDC is a scripting language that uses a lightweight version of the SQL language to facilitate access to a large number of endpoints, sources and targets, including HTTP/S platforms, and provides data engineering functions that enable advanced ETL processing scenarios. Once an SQL CDC pipeline has been created, it can be managed like any other pipeline in DataZen, and can be scheduled for execution or triggered from other pipelines. The implementation of this design follows a prescribed architecture established by our DataZen implementation approach and is centered on a Change Log integration paradigm. As a result, the SQL CDC language has specific constraints that ensure pipelines are built for ease of maintenance and flexibility.
Fundamentals
A SQL CDC pipeline is made of two distinct parts: read operations, and write operations.
While read and write operations can be included as a single script they are always treated as two distinct parts in DataZen;
deciding on whether to create a single script or two different scripts is an implementation choice with subtle differences.
The two side-by-side examples below are functionally equivalent:
| Single SQL Script | Two Separate SQL Scripts |
A single SQL CDC script that has both read and write operations is called a "Direct" Job Pipeline. SELECT * FROM DB [az-sql-db] (SELECT name FROM sys.tables); -- The capture operation marks the end of the read operation -- and applies change capture automatically on the data set CAPTURE 'tablelog' INSERT ON KEYS 'id'; -- although this section is part of the same -- script, it is physically treated as a "target" script -- with an implied LOAD operation before running: PUSH INTO DRIVE [adls] FORMAT 'CSV' FILE 'tables.csv'; |
A SQL CDC script that only has read operations is called a "Read" Job Pipeline. A reader can have multiple SELECT operations but a single CAPTURE command. SELECT * FROM DB [az-sql-db] (SELECT name FROM sys.tables); -- The capture operation marks the end of the read operation -- and applies change capture automatically on the data set CAPTURE 'tablelog' INSERT ON KEYS 'id'; A SQL CDC script that only has a load/push operation is called a "Write" Job Pipeline. Only a single LOAD and PUSH operation is permitted in a writer. However, multiple writer operations can be created against the same change log. -- Because this write operation is created as a separate -- script, the LOAD operation is now required. LOAD FROM 'tablelog'; PUSH INTO DRIVE [adls] FORMAT 'CSV' FILE 'tables.csv'; |
Three fundamental implementation patterns are supported with SQL CDC:
- Read Pipeline: Allows you to read source data with optional change cature and transformation of data
A simple SQL HTTP read pipeline that captures a change log for new and updated records would look like this:
SELECT * FROM HTTP [connection] (GET /test); ADD COLUMN 'capturedOn' WITH EXPRESSION '#utcnow(s)'; CAPTURE 'test' INSERT UPDATE ON KEYS 'id' WITH PATH [adls] 'logs'; - Write Pipeline: Allows you to load previously created change logs and forward them
to a target of your choice with optional further target-specific transformations
A simple SQL write pipeline that pushes changes captured in a change log into a database table would look like this:
LOAD UPSERTS FROM [adls] 'test' PATH 'logs' WITH KEY_COLUMNS 'id'; ADD COLUMN 'pushedOn' WITH EXPRESSION '#utcnow(s)'; PUSH INTO DB [mysql] TABLE 'testdata' AUTO_MERGE; - Read + Write Pipeline (a.k.a. Direct Pipeline):
Allows you to read source data with optional change cature and transformation of data, and forward the change logs
to a target of your choice with optional further target-specific transformations
A simple SQL direct pipeline that reads from two database tables and creates a Parquet file of newly created records in the cloud, with SSN Masking on the target, would look like this:
SELECT * FROM DB [postgresql] (SELECT * FROM table;); APPEND SELECT * FROM DB [postgresql] (SELECT * FROM table2;); CAPTURE 'test2' INSERT ON KEYS 'id' WITH PATH [adls] 'logs'; MASK COLUMN 'ssn' AS SSN KEEP '0,4'; PUSH INTO DRIVE [adls] FORMAT PARQUET FILE 'myfile_@executionid.parquet' COMPRESSION 'snappy';
SQL CDC allows you to transform data inline, without the need to stage your data in most instances. Transforming data is done through a Component, which can be used in three different places depending on integration objectives: as part of the SELECT operation, after the SELECT operation but before the CAPTURE command, and after either the CAPTURE or LOAD operation:
| Inside SELECT | When included inside a SELECT operation, the transformation block within APPLY PIPELINE executes one or more components, one after the other, for every page of data being read from the source |
| After SELECT, before CAPTURE | When added after a SELECT operation, but before a CAPTURE operation, the transformation block within APPLY PIPELINE, or any individually component specified will execute one after the other, for the entire data set collected after the completion of the SELECT command |
| After CAPTURE, LOAD | When added after a CAPTURE or LOAD operation, but before a PUSH operation, the transformation block within APPLY PIPELINE, or any individually component specified will execute one after the other, for the entire data set available at that time before the PUSH operation starts; it should be noted that if both UPSERT and DELETE streams are available, the pipeline components will execute on both streams. If no data leaves the transformation components, the PUSH operation will not be executed. |
Read Pipeline
SQL CDC is built as a data pipeline engine that is made of a Read Pipeline, one or more data engineering functions, a optionally followed by a change capture operation. The output of a read pipeline is a change log that can then be played against one or more targets, using write pipelines.
Syntax
The SQL CDC batch syntax for a read pipeline is as follows:
{ DECLARE PARAM ... }
{ USING ... }
SELECT ... APPLY PIPELINE (...) ;
{ APPEND SELECT ... APPLY PIPELINE (...) };
{ APPLY PIPELINE (...) };
{ CAPTURE ... };
{ TRIGGER ... };
The USING operation is only supported for SELECT HTTP operations. While the USING provides advanced capabilities, consider using the APPLY HTTP operation instead for more flexibility.
You may use other ETL operations before the first SELECT operation, such as SET and PRINT commands. However, IF and APPLY PIPELINE blocks are only supported after the first SELECT command.
You may make any number of SELECT or APPEND SELECT operations to combine data from any number of source systems; however, only the first SELECT operation supports inner pipelines.
Applying a pipeline inside the SELECT operation for HTTP, Databases, and Big Data sources implies that all its transformations will be executed for every page of data retrieved from the source; by contrast, any transformation declared after the SELECT command will execute once for the entire data set retrieved. Both can be used in the same batch.
The CAPTURE operation is optional, but if ommited, the change log will not be persisted upon batch completion.
Comments
The following read pipeline gets the latest RSS feed available
from an HTTP/S connection previously defined, called RSS Feed,
and automatically transforms the XML output into rows and columns using the
item nodes returned.
SELECT TOP 100 * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
Adding Fields
You can add any number of fields to the SELECT operation; these fields are added after the data has been extracted from the source system, so you can add calculated fields. These fields are always treated as a string; you can use the
APPLY SCHEMA operation in an after pipeline to convert fields to specific data types.
You can combine the "*" with additional fields;
DataZen functions are supported along with late-bound (per row) function calls. In this example, after the read operation
against the source system an automatic transformation on the data node is performed to turn the JSON response into
rows and columns; the addition of the new fields is performed last, allowing the uppername field to be
calculated for each row. The SELECT operation also supports the @rowid() function, which adds 1 to each
row, allowing you to create a unique row identifier if needed.
SELECT [uid] = '@rowid()', [uppername] = '@toupper({{name}})', *
FROM HTTP [Samsara]
(GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active)
WITH HWM 'updatedAtTime'
HWM_PARAM 'startTime'
HWM_AS_DATETIME
PAGING 'token'
PAGING_PARAM 'after'
PAGING_PATH '$.pagination.endCursor'
DEBUG_MAX_PAGES 2
APPLY TX 'data'
;
Multiple Reads
You can execute multiple read operations anywhere, although only the first one will support the built-in high watermark feature. If you use multipleSELECT operations in your ETL script, the pipeline data will be replaced automatically. However,
using the APPEND SELECT operation will union the current pipeline data with the output of the select operation.
For example, this operation combines two outputs from the HTTP endpoint. Using this approach works well for APIs that return
a forward-only continuation token, like the Sage Intacct API.
SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=VA);
APPEND SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=NC);
APPLY TX 'features' ON 'payload';
When the source system is an HTTP Endpoint, you can also store a list of items in a database table, and use the
records as parameters that will loop on the endpoint. Assuming a table contains the state column, the
following would loop on all states found in the table and combine all records accordingly; the state column will
also be added to the final output:
USING (SELECT [state] FROM [USStates]) ON DB [sqlconnection] WITH INCLUDE_FIELDS
SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area={{@param.state}})
APPLY TX 'features';
Complex SQL ETL
To apply complex SQL-based transformations on the fly, use theEXEC ON DB operation. Note that this operation only runs on SQL Server
database at the moment. The following script retrieves all available RSS feeds,
and executes an inline SQL operation using the @pipelinedata() function to
further transform the data, and replaces the pipeline data with the new output.
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
EXEC ON DB [sqldb] (SELECT TOP 50 CAST([guid] as NVARCHAR(36)) [guid], CAST([title] as NVARCHAR(500)) [title] FROM @pipelinedata()) WITH REPLACE;
Note that you can execute full database scripts, including stored procedures.
Schema Enforcement
Instead of using theEXEC ON DB approach to make simple changes to your schema, it is usually more efficient
to use the APPLY SCHEMA operation:
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
ETL Components: Inline vs. After vs. Block
ETL Components are operations that modify data or perform an action outside of the main
SELECT, PUSH, CAPTURE or LOAD operation. In other words, ETL Components execute in the middle of the main operations.
ETL Components can be added on their own or within a pipeline block.
The following examples show you the three ways to use Components:
Inline Block
When added as part of a read operation block, ETL Components execute "per batch" or "per page" of data being retrieved. In other words, they may execute multiple times depending on the read operation. This option is most useful for HTTP Readers but can also be used for DB readers.
Inner Pipelines are only supported as part of the first SELECT operation.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml)
APPLY TX (//item)
APPLY PIPELINE (
-- one or more ETL Components in this block will execute per page of data retrieved;
-- using this approach allows you to execute an ETL block on a chunk of data
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
);
After, Ah-Hoc
When added after a
SELECT or LOAD operation, ETL Components can specified on their own.
Each ETL component runs on the entire data set available in the data pipeline sequentially.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml)
APPLY TX (//item);
-- one or more ETL Components in this section will execute against the entire pipeline data set
-- after the read operation has completed.
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
After, Pipeline BlockAn after pipeline can also be included in a block. See the Apply Pipeline Block below for more information.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml)
APPLY TX (//item);
-- You can declare a pipeline block. This allows you to control execution of an entire block
APPLY PIPELINE (
-- one or more ETL Components in this section will execute against the entire pipeline data set
-- after the read operation has completed.
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
);
Combining ETL ComponentsYou can use all the methods described above together into a single script. In this example, we make a call to a Samsara endpoint, use a paging token provided by the HTTP API to fetch multiple pages of data, execute an inner pipeline, an after read pipeline, and an after write pipeline before pushing the data.
SELECT * FROM HTTP [Samsara]
(GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active)
WITH HWM 'updatedAtTime'
HWM_PARAM 'startTime'
HWM_AS_DATETIME
PAGING 'token'
PAGING_PARAM 'after'
PAGING_PATH '$.pagination.endCursor'
DEBUG_MAX_PAGES 2
APPLY TX 'data'
APPLY PIPELINE (
-- Execute per page of data
APPLY SCHEMA (
string(50) id,
string(255) name,
string(2) licenseState
) WITH STRICT_COLUMNS;
)
;
-- Execute on the entire data set
APPLY PIPELINE (
APPLY FILTER ([licenseState]='NC');
PRINT 'We have some data!' LEVEL 'WARNING';
) BREAK_ON_EMPTY;
Apply Pipeline Block
As discussed above, you can group multiple ETL operations within a single block. Doing so allows you to disable a block entirely quickly or to set a block-level option:BREAK_ON_EMPTY.
This option will force-exit the entire pipeline if at any point a transformation returns 0 records.
You can also add comments to the SQL CDC script using either /*...*/ or -- at the start of a line.
/*
This comment block will be ignored during execution
*/
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY PIPELINE (
-- Apply a schema on two fields, but leave the remaining fields untouched
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
PRINT 'This will print to the output log';
--
-- Apply an in-memory filter to demonstrate how to force-exit the pipeline
--
APPLY FILTER SQL '1=0';
PRINT 'This will NOT print to the output log and the pipeline will force-exit';
) BREAK_ON_EMPTY;
Capture
If no CDC operation is applied on the source data, all the records identified will be saved into a change log. However, a CDC operation can optionally be applied to identify changes from the last execution of the pipeline, and if changes are detected, a change log is created containing the changes only. In this example, a CDC operation is performed using theguid column and the change log is
to be stored in the path specified (this can also be a cloud connection to an AWS S3 bucket or
an Azure ADLS 2.0 container).
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY SCHEMA (
NVARCHAR(36) [guid],
NVARCHAR(500) [title]
);
-- Eliminate records that have been previously captured based on the guid column
-- and store the change log on a local drive. Also, in case the HTTP endpoint returns
-- multiple guids (yes, some HTTP APIs return duplicate records), remove them without generating an error.
CAPTURE 'pipeline001' INSERT UPDATE ON KEYS 'guid' WITH PATH 'c:\tmp\logs\' IGNORE_DUPS;
-- Start another pipeline when some data has been captured; wait 5 seconds
TRIGGER 'pipeline2' ON SUCCESS_WITH_DATA DELAY 5;
Write Pipeline
Write pipelines start by reading the last available change log previously created by a read pipeline, apply optional ETL processing, and push the data to a specific target.
Syntax
The SQL CDC batch syntax for a write pipeline is as follows:
LOAD ...
{ APPLY ... };
PUSH ...
{ TRIGGER ... }
The LOAD operation must the first operation in a write pipeline and only a single load operation is allowed.
Any number of transformations is allowed following the LOAD operation.
A single PUSH operation is allowed in a Write pipeline and only TRIGGERS may be declared afterwards.
Comments
The first step involves loading a change log using the LOAD
command. By default, this operation loads the next available change log; this information
is stored as metadata in the pipeline definition.
LOAD FROM 'pipeline001' PATH 'c:\tmp\logs\';Once loaded, you can apply ETL functions similarly to the read pipeline, and finally call the PUSH operation. In this example, a different file name is created every time the pipeline executes, and the data is further distributed across folders in ADLS 2.0. Also, the
LOAD operation
uses a specific change log timestamp (execution id) when previewing data on the screen.
LOAD FROM 'pipeline001' PATH 'c:\tmp\logs\' WITH PREVIEW_FILE '1758142467502'; ADD COLUMN 'savedOn' WITH EXPRESSION '#utcnow()'; PUSH INTO DRIVE [adls2.0] FORMAT 'parquet' COMPRESSION 'snappy' CONTAINER 'scada' FILE '[yyyy]/[mm]/[dd]/target_@executionid.parquet';
Direct Pipeline
Direct pipelines have both a read and write pipeline combined into a single SQL CDC Batch operation. Although the read pipeline still creates change logs when data is available, direct pipelines have the option to discard the change log upon successful execution of the write operation.
Syntax
The SQL CDC batch syntax for a write pipeline is as follows:
{ USING ... }
SELECT ... APPLY PIPELINE (...) ;
{ APPLY ... };
{ CAPTURE ... };
{ APPLY ... };
PUSH ...
{ TRIGGER ... }
The USING operation is only supported for SELECT HTTP operations.
Any number of transformations is allowed following the SELECT operation. If a CAPTURE operation is specified, any number of additional transformations is allowed following the CAPTURE operation; these transformations will not be captured.
A single PUSH operation is allowed and only TRIGGERS may be declared afterwards.
Comments
Here is an example of a SQL CDC batch operation that reads and writes captured changes:
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); CAPTURE 'pipeline001' INSERT UPDATE ON KEYS 'guid' WITH PATH 'c:\tmp\logs\'; PUSH INTO DRIVE [adls2.0] FORMAT 'parquet' COMPRESSION 'snappy' CONTAINER 'scada' FILE '[yyyy]/[mm]/[dd]/target_@executionid.parquet';
Environment Variables
You can access and update environment variables within SQL CDC scripts. Environment variables can be used to store secrets that need to be used as part of scripts or payloads, to track custom high watermarks, or to save state information for advanced integration needs.
Environment Variables differ from Parameters in the way they are used. In addition, script parameters are always read-only and are not persisted.
In the following example, we use an environment variable called hwm to keep track of the current date/time. The variable was created manually before executing this script. We are also updating it after completion of the SELECT operation if any records were retrieved.
PRINT 'Current timestamp: #getvar(hwm)';
SELECT * FROM DB [sql2017]
(
-- order by date desc to simplify the ETL script
SELECT * FROM sys.tables
WHERE create_date > '#getvar(hwm)'
ORDER BY create_date DESC
);
-- skip this section while debugging
IF ('@preview' = 0)
BEGIN
-- Add a calculated value to the data set, which uses the first record for evaluation
ADD COLUMN (string _hwm = '{{create_date}}');
-- Now set the new hwm value that also uses the first records for evaluation
SET @hwm = '{{_hwm}}';
PRINT 'Current timestamp was updated to: #getvar(hwm)';
END
Declaring Parameters for MCP Tool Usage
You can create pipelines that can be exposed as MCP Tools. One of the key capabilities of MCP Tools for Agentic AI use is to expose optional parameters that can be used to filter data or perform certain operations or calculations before returning the data.
In this example, we are defining two parameters that can drive different results. The intent of this pipeline is to return information about existing databases and indexes. Parameters can be injected by MCP Tools using a specific syntax by adding a comment section on top of the script (see the MCP Tools section for details). During debugging, you can provide default values to parameters; this example declares an orderBy string parameter (with a default value set to rows) and a limit integer parameter with a default of 0. These are then used as part of IF blocks to shape the response.
DECLARE PARAM @orderBy = 'rows';
DECLARE PARAM @limit = 0;
SELECT * FROM DB [sql2017]
(
SELECT
t.name AS TableName,
s.name AS SchemaName,
p.rows AS Rows,
SUM(a.total_pages) * 8 AS TotalSpaceKB,
CAST(ROUND(((SUM(a.total_pages) * 8) / 1024.00), 2) AS NUMERIC(36, 2)) AS TotalSpaceMB
FROM
sys.tables t
INNER JOIN sys.indexes i ON t.object_id = i.object_id
INNER JOIN sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id
LEFT OUTER JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE
t.name NOT LIKE 'dt%'
AND t.is_ms_shipped = 0
AND i.object_id > 255
GROUP BY
t.name, s.name, p.rows
);
IF ('{{@orderBy}}' = 'rows')
BEGIN
APPLY FILTER '1=1' ORDER BY 'rows DESC';
END
ELSE
BEGIN
APPLY FILTER '1=1' ORDER BY 'TotalSpaceKB DESC';
END
;
IF ({{@limit}} > 0)
BEGIN
EXEC ON DB [sql2017]
(SELECT TOP {{@limit}} * FROM @pipelinedata())
WITH REPLACE;
END;
In this second example, we declare a severity parameter to filter on weather alerts provided by the US National Weather system. A filter is conditionally applied based on the length of the value provided for the severity. By default all severity types would be returned in this example. Note that we are also limiting the list of fields being returned to minimize the number of tokens an LLM would need to process the data.
DECLARE PARAM @severity = '';
SELECT
[properties.areaDesc],
[properties.category],
[properties.severity],
[properties.certainty],
[properties.urgency],
[properties.event],
[properties.headline],
[properties.description],
[properties.geocode],
[geometry.coordinates]
FROM HTTP [NWS Weather]
(GET /alerts/active)
WITH headers 'accept:application/geo+json'
APPLY TX 'features'
;
-- Apply an inline IF operation to filter records
-- A more efficient approach would be to use an IF block, but this approach demonstrates how to apply an inline row-based filter
APPLY FILTER ([properties.severity] = IIF(LEN('{{@severity}}') > 0, '{{@severity}}', [properties.severity]));