SQL CDC Introduction

Overview

SQL CDC is the integration scripting language created by Enzo Unified and is specifically designed to provide a scripting layer on top of the DataZen platform for rapid integration projects. The SQL CDC syntax is loosely based on the SQL syntax and provides commands specific to ETL processing and agnostic data consumption and ingestion. SQL CDC is designed to read from any system, perform any number of transformations if needed (either in-memory or by leveraging a database engine for more complex scenarios), and push the data (or changes to the data) into any other system (or systems) in the format expected by each target system.

SQL CDC can be used to implement any modern integration architectural pattern, including:

  • Point-to-point
  • Hub-and-Spoke
  • Webhook
  • Messaging
  • Orchestration & Choreography

Change Log

DataZen is designed around the concept of change logs. A change log is a portable, proprietary file format, that contains compressed data, schema information, metadata, and security artifacts. The dataset contained in the change log is source-agnotic and may be a complete initial load, a change capture (CDC) from a previous know set, and also may contain a secondary stream of deleted records. Change logs may be stored on a local server, or in cloud stores such as Azure Blobs or AWS S3 Buckets. Read pipelines create change logs and Write pipelines play them back.

Because of this design, change logs may be played, replayed, and sent to multiple targets in a fully decoupled manner using an eventually consistent replication model.

SQL CDC Scripts

A pipeline is implemented using SQL CDC scripts and contains either a SELECT or a PUSH operation, or both. In addition, a SQL CDC script may contain a number of data transformations (ETL) that shape the data as intended.

A SQL CDC script is made of one of more commands; available commands fall in these main categories:

  • A DATA operation (SELECT, PUSH, LOAD, CAPTURE)
    • These operations either load data into the pipeline or send data to a target or change log
    • Usually, a SQL CDC script uses of one of these patterns:
      • SELECT
      • SELECT + PUSH
      • SELECT + CAPTURE
      • SELECT + CAPTURE + PUSH
      • LOAD + PUSH
  • An INLINE ETL operation (APPLY TX, APPLY SCHEMA, HASH...)
  • A CONTROL operation (IF, APPLY PIPELINE)
  • A FLOW operation (TRIGGER, START, EXIT)
  • A DECLARE operation providing input parameters for MCP calls

Pipeline Types

Three types of pipelines can be created:

  • Read Pipeline: Reads data from a source system and loads the pipeline data
  • Write Pipeline: Loads the pipeline with data from the next available change log and pushes data to a target system
  • Direct Pipeline: Performs both a Read + Write pipeline as a single script

Notation

The SQL CDC documentation uses the following notation:

Notation Description
COMMAND A SQL CDC command
{   } An optional verb or command parameter
'...' A field name, string, or value to be used as the parameter
N An integer value
<    > A list of valid options separated by a pipe |
+ Repeated 0, 1, or any number of times

SQL CDC Basic syntax rules

  • DECLARE statements must appear before any other statement in a SQL CDC script and are not supported for Write scripts
  • A SQL CDC Script is made of one or more commands and must have a SELECT, SELECT + PUSH, or a LOAD + PUSH operation to be valid.
    • When LOAD is used, it must be the first operation in a SQL CDC script, and only a single LOAD operation can be specified
    • Multiple SELECT operations may be used
    • Only a single PUSH operation may be used
  • Each individual command MUST end with a semi-column to be valid.
  • Comments are allowed anywhere in the SQL CDC script using:
    • A comment block that starts with /* and ends with */
    • A comment line that starts with --

Using DataZen Functions and Pipeline Variables

SQL CDC scripts may contain DataZen functions and Pipeline variables embded in the code. These functions and variables are evaluated at various stages during the execution of the script.

Pipeline variables vary for Readers and Writers; see the documentation for additional information.

Pre-Parsing

The pre-parsing phase inspects a script for DECLARE operations only and evaluates any expressions found in these declarations.

DECLARE @myguid = '#rndguid()';
DECLARE @lastVal = '#getvar(lastValue)';

Once pre-parsing has completed, {{@myguid}} and {{@lastVal}} found SQL CDC script are replaced before proceeding.

Parsing

The parsing phase happens when the SQL CDC script is ready to be executed, immediately before executing commands. This phase evaluates the entire script and replaces Pipeline variables only; it does not execute DataZen functions.

The @sourceexecutionid Pipeline variable is not evaluated in this phase as it is not known at this stage

PRINT 'Job @jobkey is starting with exeuciontid: @executionid...';

Execution

DataZen functions and the @sourceexecutionid Pipeline variable are evaluated before executing each command. DataZen functions are evaluated per row when using the late-bound notation for supported commands.

  • Immediately before the execution of a command (ex: #rndguid())
  • During the parsing of each row in the Pipeline data for row-based ETL commands (ex: APPLY SCHEMA), late-bound functions are evaluated (ex: @rndguid())

Because late-bound functions are used to apply a DataZen function for each row of data, there may be a significant performance impact if not applied carefully. Consider switching your ETL approach using an EXEC operation to leverage the fully processing capabilities of a relational engine if performance degrades.

-- The uid column will have the same guid value for all rows
-- The rowuid column will have a different value for each row
APPLY SCHEMA (
 NVARCHAR(255) url,
 NVARCHAR(36)  uid = '#rndguid()',
 NVARCHAR(36)  rowuid = '@rndguid()'
);

Read Pipelines

Read pipelines are used to extract data from a source system, perform transformations if necessary, and save the data into a change log. The change log can then be loaded by one or more Write pipelines. Considering some of the INLINE ETL commands can also save data for point-to-point integration scenarios, the change log may not be necessary in which case the CAPTURE operation can be omitted.

Some of the SELECT operations support the concept of paging, such as reading from an HTTP source system. When paging is supported, the SELECT operation also provides an inner pipeline that can execute per-page of data. This can be useful for performance when large datasets are returned from the source system.

{ DECLARE } + 
{ INLINE ETL | START } +  
SELECT ... { APPLY PIPELINE ( { INLINE ETL | CONTROL OPERATION | START } + ) } 
{ INLINE ETL | CONTROL OPERATION | START | { APPEND } SELECT } +  
{ CAPTURE ... }
{ TRIGGER } +

Write Pipelines

Use a Write Pipeline when you need to send data previously captured by another pipeline or when you need to react to a webhook call. In addition, you may need to use this pattern when multiple target systems must be notified of changes in a multicast replication topology. When using this pattern, the LOAD operation must be the first command. Because target pipelines can have their own INLINE ETL, you can apply different target-specific transformations depending on the system. As an example, if one of the Write pipeline copies data to a Test environment, you may choose to filter out certain records or mask sensitive fields.

By design, each Writer keeps an internal Last Execution Id pointer that tells the replication engine which change log was last pushed successfully. As a result, a Write pipeline always knows which change log to operate on and ensures that no data will be missed.

While a Read pipeline may contain multiple SELECT commands, Write pipelines must specify one and only one PUSH command.

LOAD
{ INLINE ETL | CONTROL OPERATION | START | { APPEND } SELECT } +  
PUSH ...
{ TRIGGER } +

Direct Pipelines

A Direct Pipeline combines a Read and Write pipeline together into a single script. The Writer is then referred to as a Direct Writer. You may create additional Write pipelines separately, but the Direct Writer will be executed first. When using this pattern, the LOAD command is not supported. Creating a Direct Pipeline involves a script with a SELECT and a PUSH operation. Similarly to Write pipelines, a target-specific ETL section may be defined for Direct Writers.

By design, and similarly to a dedicated Write pipeline, each Writer keeps an internal Last Execution Id pointer that tells the replication engine which change log was last pushed successfully. As a result, a Write pipeline always knows which change log to operate on and ensures that no data will be missed.

TRIGGER commands must appear at the end of the entire script.

When you create a Direct pipeline, the Write pipeline section of the script always executes after completion of the Read pipeline even if no data has been captured. However, dedicated Write pipelines must be started declatively using a TRIGGER operation from another pipeline, by a Schedule, or programmatically using the DataZen REST API.

Keep in mind that while a Direct pipeline script looks like a single pipeline, it is actually the combination of two entirely independent scripts. The Read script section ends with the CAPTURE operation, and the Write script section starts at the first command right after the CAPTURE command. If no CAPTURE command is specified, the Write section starts at the PUSH command. DECLARE commands are only applied to the Read section of a SQL CDC script.

{ DECLARE } + 
{ INLINE ETL | START } +  
SELECT ... { APPLY PIPELINE ( { INLINE ETL | CONTROL OPERATION | START } + ) } 
{ INLINE ETL | CONTROL OPERATION | START | { APPEND } SELECT } +  
{ CAPTURE ... }
{ INLINE ETL | CONTROL OPERATION | START | { APPEND } SELECT } +  
PUSH ...
{ TRIGGER } +

Additional Considerations

Adding Fields


You can add any number of fields to the SELECT operation; these fields are added after the data has been extracted from the source system. These fields are always treated as a string; however, you can use the APPLY SCHEMA operation in an after pipeline to convert fields to specific data types. You can combine the "*" with additional fields; DataZen functions are supported along with late-bound (per row) function calls. In this example, after the read operation against the source system an automatic transformation on the data node is performed to turn the JSON response into rows and columns; the addition of the new fields is performed last, allowing the uppername field to be calculated for each row. The SELECT operation also supports the @rowid() function, which adds 1 to each row, allowing you to create a unique row identifier if needed.

SELECT [uid] = '@rowid()', [uppername] = '@toupper({{name}})', * 
FROM HTTP [Samsara]
(GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active)
WITH 	HWM 'updatedAtTime'
 	HWM_PARAM 'startTime'  
	HWM_AS_DATETIME
	PAGING 'token' 
	PAGING_PARAM 'after' 
	PAGING_PATH '$.pagination.endCursor'
	DEBUG_MAX_PAGES 2 
APPLY TX 'data'
; 
                    

Multiple Reads

You can execute multiple read operations anywhere, although only the first one will support the built-in high watermark feature. If you use multiple SELECT operations in your ETL script, the pipeline data will be replaced automatically. However, using the APPEND SELECT operation will union the current pipeline data with the output of the select operation. For example, this operation combines two outputs from the HTTP endpoint. Using this approach works well for APIs that return a forward-only continuation token, like the Sage Intacct API.

SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=VA);
APPEND SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=NC);
APPLY TX 'features' ON 'payload';
                    
When the source system is an HTTP Endpoint, you can also store a list of items in a database table, and use the records as parameters that will loop on the endpoint. Assuming a table contains the state column, the following would loop on all states found in the table and combine all records accordingly; the state column will also be added to the final output:

USING (SELECT [state] FROM [USStates]) ON DB [sqlconnection] WITH INCLUDE_FIELDS 
SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area={{@param.state}})
APPLY TX 'features';
                    

Complex SQL ETL

To apply complex SQL-based transformations on the fly, use the EXEC ON DB operation. Note that this operation only runs on SQL Server database at the moment. The following script retrieves all available RSS feeds, and executes an inline SQL operation using the @pipelinedata() function to further transform the data, and replaces the pipeline data with the new output.

SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
EXEC ON DB [sqldb] (SELECT TOP 50 CAST([guid] as NVARCHAR(36)) [guid], CAST([title] as NVARCHAR(500)) [title] FROM @pipelinedata()) WITH REPLACE;
                    
Note that you can execute full database scripts, including stored procedures.

Schema Enforcement

Instead of using the EXEC ON DB approach to make simple changes to your schema, it is usually more efficient to use the APPLY SCHEMA operation:

SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY SCHEMA (
    NVARCHAR(36) [guid],
    NVARCHAR(500) [title]
);

ETL Components: Inline vs. After vs. Block

ETL Components are operations that modify data or perform an action outside of the main SELECT, PUSH, CAPTURE or LOAD operation. In other words, ETL Components execute in the middle of the main operations. ETL Components can be added on their own or within a pipeline block. The following examples show you the three ways to use Components:

Inline Block
When added as part of a read operation block, ETL Components execute "per batch" or "per page" of data being retrieved. In other words, they may execute multiple times depending on the read operation. This option is most useful for HTTP Readers but can also be used for DB readers.

Inner Pipelines are only supported as part of the first SELECT operation.

SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) 
    APPLY TX (//item)
    APPLY PIPELINE (
        -- one or more ETL Components in this block will execute per page of data retrieved; 
        -- using this approach allows you to execute an ETL block on a chunk of data 
        APPLY SCHEMA (
            NVARCHAR(36) [guid],
            NVARCHAR(500) [title]
        ); 
    );
                    
After, Ah-Hoc
When added after a SELECT or LOAD operation, ETL Components can specified on their own. Each ETL component runs on the entire data set available in the data pipeline sequentially.

SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) 
    APPLY TX (//item);

-- one or more ETL Components in this section will execute against the entire pipeline data set
-- after the read operation has completed. 
APPLY SCHEMA (
    NVARCHAR(36) [guid],
    NVARCHAR(500) [title]
);
                    
After, Pipeline Block
An after pipeline can also be included in a block. See the Apply Pipeline Block below for more information.

SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) 
    APPLY TX (//item);

-- You can declare a pipeline block. This allows you to control execution of an entire block
APPLY PIPELINE (
    -- one or more ETL Components in this section will execute against the entire pipeline data set
    -- after the read operation has completed. 
    APPLY SCHEMA (
        NVARCHAR(36) [guid],
        NVARCHAR(500) [title]
    );
);
Combining ETL Components
You can use all the methods described above together into a single script. In this example, we make a call to a Samsara endpoint, use a paging token provided by the HTTP API to fetch multiple pages of data, execute an inner pipeline, an after read pipeline, and an after write pipeline before pushing the data.

SELECT * FROM HTTP [Samsara]
(GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active)
WITH 	HWM 'updatedAtTime'
 	HWM_PARAM 'startTime'  
	HWM_AS_DATETIME
	PAGING 'token' 
	PAGING_PARAM 'after' 
	PAGING_PATH '$.pagination.endCursor'
	DEBUG_MAX_PAGES 2 
APPLY TX 'data'
APPLY PIPELINE (
	-- Execute per page of data
	APPLY SCHEMA (
		string(50) id,
		string(255) name,
		string(2) licenseState
	) WITH STRICT_COLUMNS;
)
; 

-- Execute on the entire data set
APPLY PIPELINE (

	APPLY FILTER ([licenseState]='NC');	
	PRINT 'We have some data!' LEVEL 'WARNING';
	
) BREAK_ON_EMPTY;
                    

Apply Pipeline Block

As discussed above, you can group multiple ETL operations within a single block. Doing so allows you to disable a block entirely quickly or to set a block-level option: BREAK_ON_EMPTY. This option will force-exit the entire pipeline if at any point a transformation returns 0 records. You can also add comments to the SQL CDC script using either /*...*/ or -- at the start of a line.

/*
    This comment block will be ignored during execution
*/
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY PIPELINE (
   -- Apply a schema on two fields, but leave the remaining fields untouched 
   APPLY SCHEMA (
       NVARCHAR(36) [guid],
       NVARCHAR(500) [title]
   );
   PRINT 'This will print to the output log';
   
   --
   -- Apply an in-memory filter to demonstrate how to force-exit the pipeline
   --
   APPLY FILTER SQL '1=0';
   
   PRINT 'This will NOT print to the output log and the pipeline will force-exit';

) BREAK_ON_EMPTY;

Capture

If no CDC operation is applied on the source data, all the records identified will be saved into a change log. However, a CDC operation can optionally be applied to identify changes from the last execution of the pipeline, and if changes are detected, a change log is created containing the changes only. In this example, a CDC operation is performed using the guid column and the change log is to be stored in the path specified (this can also be a cloud connection to an AWS S3 bucket or an Azure ADLS 2.0 container).

SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
APPLY SCHEMA (
       NVARCHAR(36) [guid],
       NVARCHAR(500) [title]
   );

-- Eliminate records that have been previously captured based on the guid column
-- and store the change log on a local drive. Also, in case the HTTP endpoint returns 
-- multiple guids (yes, some HTTP APIs return duplicate records), remove them without generating an error.
CAPTURE 'pipeline001' INSERT UPDATE ON KEYS 'guid' WITH PATH 'c:\tmp\logs\' IGNORE_DUPS;

-- Start another pipeline when some data has been captured; wait 5 seconds
TRIGGER 'pipeline2' ON SUCCESS_WITH_DATA DELAY 5;

Environment Variables

You can access and update environment variables within SQL CDC scripts. Environment variables can be used to store secrets that need to be used as part of scripts or payloads, to track custom high watermarks, or to save state information for advanced integration needs.

Environment Variables differ from Parameters in the way they are used. In addition, script parameters are always read-only and are not persisted.

In the following example, we use an environment variable called hwm to keep track of the current date/time. The variable was created manually before executing this script. We are also updating it after completion of the SELECT operation if any records were retrieved.

PRINT 'Current timestamp: #getvar(hwm)';

SELECT * FROM DB [sql2017]
(
	-- order by date desc to simplify the ETL script
	SELECT * FROM sys.tables	
	WHERE create_date > '#getvar(hwm)'
	ORDER BY create_date DESC
);

-- skip this section while debugging
IF ('@preview' = 0)
BEGIN
	-- Add a calculated value to the data set, which uses the first record for evaluation
	ADD COLUMN (string _hwm = '{{create_date}}');
	
    -- Now set the new hwm value that also uses the first records for evaluation
	SET @hwm = '{{_hwm}}';

	PRINT 'Current timestamp was updated to: #getvar(hwm)';

END
                

Declaring Parameters for MCP Tool Usage

You can create pipelines that can be exposed as MCP Tools. One of the key capabilities of MCP Tools for Agentic AI use is to expose optional parameters that can be used to filter data or perform certain operations or calculations before returning the data.

In this example, we are defining two parameters that can drive different results. The intent of this pipeline is to return information about existing databases and indexes. Parameters can be injected by MCP Tools using a specific syntax by adding a comment section on top of the script (see the MCP Tools section for details). During debugging, you can provide default values to parameters; this example declares an orderBy string parameter (with a default value set to rows) and a limit integer parameter with a default of 0. These are then used as part of IF blocks to shape the response.

DECLARE PARAM @orderBy = 'rows';
DECLARE PARAM @limit = 0;

SELECT * FROM DB [sql2017]
(
 SELECT     
  t.name AS TableName,
      s.name AS SchemaName,
      p.rows AS Rows,
      SUM(a.total_pages) * 8 AS TotalSpaceKB, 
      CAST(ROUND(((SUM(a.total_pages) * 8) / 1024.00), 2) AS NUMERIC(36, 2)) AS TotalSpaceMB
 FROM 
      sys.tables t
  INNER JOIN sys.indexes i ON t.object_id = i.object_id
  INNER JOIN sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
  INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id
  LEFT OUTER JOIN sys.schemas s ON t.schema_id = s.schema_id
 WHERE 
  t.name NOT LIKE 'dt%' 
      AND t.is_ms_shipped = 0
      AND i.object_id > 255 
 GROUP BY 
      t.name, s.name, p.rows
	
);

IF ('{{@orderBy}}' = 'rows')
BEGIN
 APPLY FILTER '1=1' ORDER BY 'rows DESC';
END
ELSE
BEGIN
 APPLY FILTER '1=1' ORDER BY 'TotalSpaceKB DESC';
END
;

IF ({{@limit}} > 0)
BEGIN
 EXEC ON DB [sql2017]
 (SELECT TOP {{@limit}} * FROM @pipelinedata())
 WITH REPLACE;
END;

                

In this second example, we declare a severity parameter to filter on weather alerts provided by the US National Weather system. A filter is conditionally applied based on the length of the value provided for the severity. By default all severity types would be returned in this example. Note that we are also limiting the list of fields being returned to minimize the number of tokens an LLM would need to process the data.

DECLARE PARAM @severity = '';

SELECT  
 [properties.areaDesc],
 [properties.category],
 [properties.severity],
 [properties.certainty],
 [properties.urgency],
 [properties.event],
 [properties.headline],
 [properties.description],
 [properties.geocode],
 [geometry.coordinates]
FROM HTTP [NWS Weather]
(GET /alerts/active)
WITH headers 'accept:application/geo+json'
APPLY TX 'features'
;

-- Apply an inline IF operation to filter records
-- A more efficient approach would be to use an IF block, but this approach demonstrates how to apply an inline row-based filter
APPLY FILTER ([properties.severity] = IIF(LEN('{{@severity}}') > 0, '{{@severity}}', [properties.severity]));