SQL CDC Introduction
Overview
This feature is in preview
SQL CDC is a scripting language that uses a lightweight version of
the SQL language to facilitate access to a large number of
endpoints, sources and targets, including HTTP/S platforms, and provides data engineering functions that
enable advanced ETL processing scenarios. Once an SQL CDC pipeline has been created, it can
be managed like any other pipeline in DataZen, and can be scheduled for execution or triggered
from other pipelines. The implementation of this design follows a prescribed architecture established by our DataZen implementation
approach and is centered on a Change Log integration paradigm. As a result, the SQL CDC language has specific
constraints that ensure pipelines are built for ease of maintenance and flexibility.
Three fundamental implementation patterns are supported with SQL CDC:
- Read Pipeline: Allows you to read source data with optional change cature and transformation of data
A simple SQL HTTP read pipeline that captures a change log for new and updated records would look like this:
SELECT * FROM HTTP [connection] (GET /test); ADD COLUMN 'capturedOn' WITH EXPRESSION '#utcnow(s)'; CAPTURE 'test' INSERT UPDATE ON KEYS 'id' WITH PATH [adls] 'logs';
- Write Pipeline: Allows you to load previously created change logs and forward them
to a target of your choice with optional further target-specific transformations
A simple SQL write pipeline that pushes changes captured in a change log into a database table would look like this:
LOAD UPSERTS FROM [adls] 'test' PATH 'logs' WITH KEY_COLUMNS 'id'; ADD COLUMN 'pushedOn' WITH EXPRESSION '#utcnow(s)'; PUSH INTO DB [mysql] TABLE 'testdata' AUTO_MERGE;
- Read + Write Pipeline (a.k.a. Direct Pipeline):
Allows you to read source data with optional change cature and transformation of data, and forward the change logs
to a target of your choice with optional further target-specific transformations
A simple SQL direct pipeline that reads from two database tables and creates a Parquet file of newly created records in the cloud, with SSN Masking on the target, would look like this:
SELECT * FROM DB [postgresql] (SELECT * FROM table;); APPEND SELECT * FROM DB [postgresql] (SELECT * FROM table2;); CAPTURE 'test2' INSERT ON KEYS 'id' WITH PATH [adls] 'logs'; MASK COLUMN 'ssn' AS SSN KEEP '0,4'; PUSH INTO DRIVE [adls] FORMAT PARQUET FILE 'myfile_@executionid.parquet' COMPRESSION 'snappy';
SQL CDC allows you to transform data inline, without the need to stage your data in most instances. Transforming data is done through a Component, which can be used in three different places depending on integration objectives: as part of the SELECT operation, after the SELECT operation but before the CAPTURE command, and after either the CAPTURE or LOAD operation:
Inside SELECT | When included inside a SELECT operation, the transformation block within APPLY PIPELINE executes one or more components, one after the other, for every page of data being read from the source |
After SELECT, before CAPTURE | When added after a SELECT operation, but before a CAPTURE operation, the transformation block within APPLY PIPELINE, or any individually component specified will execute one after the other, for the entire data set collected after the completion of the SELECT command |
After CAPTURE, LOAD | When added after a CAPTURE or LOAD operation, but before a PUSH operation, the transformation block within APPLY PIPELINE, or any individually component specified will execute one after the other, for the entire data set available at that time before the PUSH operation starts; it should be noted that if both UPSERT and DELETE streams are available, the pipeline components will execute on both streams. If no data leaves the transformation components, the PUSH operation will not be executed. |
Read Pipeline
SQL CDC is built as a data pipeline engine that is made of a Read Pipeline, one or more data engineering functions, a optionally followed by a change capture operation. The output of a read pipeline is a change log that can then be played against one or more targets, using write pipelines.
Syntax
The SQL CDC batch syntax for a read pipeline is as follows:
{ DECLARE PARAM ... } { USING ... } SELECT ... APPLY PIPELINE (...) ; { APPLY PIPELINE (...) }; { CAPTURE ... }; { TRIGGER ... };
The USING operation is only supported for SELECT HTTP operations.
Applying a pipeline inside the SELECT operation for HTTP, Databases, and Big Data sources implies that all its transformations will be executed for every page of data retrieved from the source; by contrast, any transformation declared after the SELECT command will execute once for the entire data set retrieved. Both can be used in the same batch.
The CAPTURE operation is optional, but if ommited, the change log will not be persisted upon batch completion.
Comments
The following read pipeline gets the latest RSS feed available
from an HTTP/S connection previously defined, called RSS Feed
,
and automatically transforms the XML output into rows and columns using the
item
nodes returned.
SELECT TOP 100 * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item);
Adding Fields
You can add any number of fields to the SELECT operation; these fields are added after the data has been extracted from the source system, so you can add calculated fields. These fields are always treated as a string; you can use the
APPLY SCHEMA
operation in an after pipeline to convert fields to specific data types.
You can combine the "*" with additional fields;
DataZen functions are supported along with late-bound (per row) function calls. In this example, after the read operation
against the source system an automatic transformation on the data node is performed to turn the JSON response into
rows and columns; the addition of the new fields is performed last, allowing the uppername
field to be
calculated for each row. The SELECT operation also supports the @rowid()
function, which adds 1 to each
row, allowing you to create a unique row identifier if needed.
SELECT [uid] = '@rowid()', [uppername] = '@toupper({{name}})', * FROM HTTP [Samsara] (GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active) WITH HWM 'updatedAtTime' HWM_PARAM 'startTime' HWM_AS_DATETIME PAGING 'token' PAGING_PARAM 'after' PAGING_PATH '$.pagination.endCursor' DEBUG_MAX_PAGES 2 APPLY TX 'data' ;
Multiple Reads
You can execute multiple read operations anywhere, although only the first one will support the built-in high watermark feature. If you use multipleSELECT
operations in your ETL script, the pipeline data will be replaced automatically. However,
using the APPEND SELECT
operation will union the current pipeline data with the output of the select operation.
For example, this operation combines two outputs from the HTTP endpoint. Using this approach works well for APIs that return
a forward-only continuation token, like the Sage Intacct API.
SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=VA); APPEND SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area=NC); APPLY TX 'features' ON 'payload';When the source system is an HTTP Endpoint, you can also store a list of items in a database table, and use the records as parameters that will loop on the endpoint. Assuming a table contains the
state
column, the
following would loop on all states found in the table and combine all records accordingly; the state column will
also be added to the final output:
USING (SELECT [state] FROM [USStates]) ON DB [sqlconnection] WITH INCLUDE_FIELDS SELECT * FROM HTTP [NWS Weather] (GET /alerts/active?area={{@param.state}}) APPLY TX 'features';
Complex SQL ETL
To apply complex SQL-based transformations on the fly, use theEXEC ON DB
operation. Note that this operation only runs on SQL Server
database at the moment. The following script retrieves all available RSS feeds,
and executes an inline SQL operation using the @pipelinedata()
function to
further transform the data, and replaces the pipeline data with the new output.
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); EXEC ON DB [sqldb] (SELECT TOP 50 CAST([guid] as NVARCHAR(36)) [guid], CAST([title] as NVARCHAR(500)) [title] FROM @pipelinedata()) WITH REPLACE;Note that you can execute full database scripts, including stored procedures.
Schema Enforcement
Instead of using theEXEC ON DB
approach to make simple changes to your schema, it is usually more efficient
to use the APPLY SCHEMA
operation:
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] );
ETL Components: Inline vs. After vs. Block
ETL Components
are operations that modify data or perform an action outside of the main
SELECT, PUSH, CAPTURE or LOAD operation. In other words, ETL Components execute in the middle of the main operations.
ETL Components can be added on their own or within a pipeline block.
The following examples show you the three ways to use Components:
Inline Block
When added as part of a read operation block, ETL Components execute "per batch" or "per page" of data being retrieved. In other words, they may execute multiple times depending on the read operation. This option is most useful for HTTP Readers but can also be used for DB readers.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) APPLY TX (//item) APPLY PIPELINE ( -- one or more ETL Components in this block will execute per page of data retrieved; -- using this approach allows you to execute an ETL block on a chunk of data APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] ); );After, Ah-Hoc
When added after a
SELECT
or LOAD
operation, ETL Components can specified on their own.
Each ETL component runs on the entire data set available in the data pipeline sequentially.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) APPLY TX (//item); -- one or more ETL Components in this section will execute against the entire pipeline data set -- after the read operation has completed. APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] );After, Pipeline Block
An after pipeline can also be included in a block. See the Apply Pipeline Block below for more information.
SELECT * FROM HTTP [RSS Feed] [RSS Feed] (GET /econi.xml) APPLY TX (//item); -- You can declare a pipeline block. This allows you to control execution of an entire block APPLY PIPELINE ( -- one or more ETL Components in this section will execute against the entire pipeline data set -- after the read operation has completed. APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] ); );Combining ETL Components
You can use all the methods described above together into a single script. In this example, we make a call to a Samsara endpoint, use a paging token provided by the HTTP API to fetch multiple pages of data, execute an inner pipeline, an after read pipeline, and an after write pipeline before pushing the data.
SELECT * FROM HTTP [Samsara] (GET /fleet/drivers?filterBy=drivers&limit=50&driverActivationStatus=active) WITH HWM 'updatedAtTime' HWM_PARAM 'startTime' HWM_AS_DATETIME PAGING 'token' PAGING_PARAM 'after' PAGING_PATH '$.pagination.endCursor' DEBUG_MAX_PAGES 2 APPLY TX 'data' APPLY PIPELINE ( -- Execute per page of data APPLY SCHEMA ( string(50) id, string(255) name, string(2) licenseState ) WITH STRICT_COLUMNS; ) ; -- Execute on the entire data set APPLY PIPELINE ( APPLY FILTER ([licenseState]='NC'); PRINT 'We have some data!' LEVEL 'WARNING'; ) BREAK_ON_EMPTY;
Apply Pipeline Block
As discussed above, you can group multiple ETL operations within a single block. Doing so allows you to disable a block entirely quickly or to set a block-level option:BREAK_ON_EMPTY
.
This option will force-exit the entire pipeline if at any point a transformation returns 0 records.
You can also add comments to the SQL CDC script using either /*...*/
or --
at the start of a line.
/* This comment block will be ignored during execution */ SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); APPLY PIPELINE ( -- Apply a schema on two fields, but leave the remaining fields untouched APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] ); PRINT 'This will print to the output log'; -- -- Apply an in-memory filter to demonstrate how to force-exit the pipeline -- APPLY FILTER SQL '1=0'; PRINT 'This will NOT print to the output log and the pipeline will force-exit'; ) BREAK_ON_EMPTY;
Capture
If no CDC operation is applied on the source data, all the records identified will be saved into a change log. However, a CDC operation can optionally be applied to identify changes from the last execution of the pipeline, and if changes are detected, a change log is created containing the changes only. In this example, a CDC operation is performed using theguid
column and the change log is
to be stored in the path specified (this can also be a cloud connection to an AWS S3 bucket or
an Azure ADLS 2.0 container).
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); APPLY SCHEMA ( NVARCHAR(36) [guid], NVARCHAR(500) [title] ); -- Eliminate records that have been previously captured based on the guid column -- and store the change log on a local drive. Also, in case the HTTP endpoint returns -- multiple guids (yes, some HTTP APIs return duplicate records), remove them without generating an error. CAPTURE 'pipeline001' INSERT UPDATE ON KEYS 'guid' WITH PATH 'c:\tmp\logs\' IGNORE_DUPS; -- Start another pipeline when some data has been captured; wait 5 seconds TRIGGER 'pipeline2' ON SUCCESS_WITH_DATA DELAY 5;
Write Pipeline
Write pipelines start by reading the last available change log previously created by a read pipeline, apply optional ETL processing, and push the data to a specific target.
Syntax
The SQL CDC batch syntax for a write pipeline is as follows:
LOAD ... { APPLY ... }; PUSH ... { TRIGGER ... }
The LOAD operation must the first operation in a write pipeline and only a single load operation is allowed.
Any number of transformations is allowed following the LOAD operation.
A single PUSH operation is allowed in a Write pipeline and only TRIGGERS may be declared afterwards.
Comments
The first step involves loading a change log using the LOAD
command. By default, this operation loads the next available change log; this information
is stored as metadata in the pipeline definition.
LOAD FROM 'pipeline001' PATH 'c:\tmp\logs\';Once loaded, you can apply ETL functions similarly to the read pipeline, and finally call the PUSH operation. In this example, a different file name is created every time the pipeline executes, and the data is further distributed across folders in ADLS 2.0. Also, the
LOAD
operation
uses a specific change log timestamp (execution id) when previewing data on the screen.
LOAD FROM 'pipeline001' PATH 'c:\tmp\logs\' WITH PREVIEW_FILE '1758142467502'; ADD COLUMN 'savedOn' WITH EXPRESSION '#utcnow()'; PUSH INTO DRIVE [adls2.0] FORMAT 'parquet' COMPRESSION 'snappy' CONTAINER 'scada' FILE '[yyyy]/[mm]/[dd]/target_@executionid.parquet';
Direct Pipeline
Direct pipelines have both a read and write pipeline combined into a single SQL CDC Batch operation. Although the read pipeline still creates change logs when data is available, direct pipelines have the option to discard the change log upon successful execution of the write operation.
Syntax
The SQL CDC batch syntax for a write pipeline is as follows:
{ USING ... } SELECT ... APPLY PIPELINE (...) ; { APPLY ... }; { CAPTURE ... }; { APPLY ... }; PUSH ... { TRIGGER ... }
The USING operation is only supported for SELECT HTTP operations.
Any number of transformations is allowed following the SELECT operation. If a CAPTURE operation is specified, any number of additional transformations is allowed following the CAPTURE operation; these transformations will not be captured.
A single PUSH operation is allowed and only TRIGGERS may be declared afterwards.
Comments
Here is an example of a SQL CDC batch operation that reads and writes captured changes:
SELECT * FROM HTTP [RSS Feed] (GET /econi.xml) APPLY TX (//item); CAPTURE 'pipeline001' INSERT UPDATE ON KEYS 'guid' WITH PATH 'c:\tmp\logs\'; PUSH INTO DRIVE [adls2.0] FORMAT 'parquet' COMPRESSION 'snappy' CONTAINER 'scada' FILE '[yyyy]/[mm]/[dd]/target_@executionid.parquet';