PUSH DB

Overview

The Push Db operation allows you to send data to any database target by providing Upsert and Delete scripts.

SQL Server targets support automatic script creation and schema drifting. Snowflake and PostgreSQL and MySQL endpoints support automatic script creation, but not schema drifting at this time.

Syntax

Executes SQL operations for upserted and/or deleted records one at a time or in batch.

PUSH INTO DB [CONNECTION] 
    { TABLE '...' } 
    { AUTO_MERGE { SCHEMA_DRIFT } }
    { ON_UPSERT 
        (...)   -- SQL Command to execute on upserted records        
        { TIMEOUT <n> }
        { BATCH <n> }
        }
    { ON_DELETE 
        (...)      -- SQL Command to execute on deleted records
        { TIMEOUT <n> }
        { BATCH <n> }
        }
    { ON_INIT (...) }
    { ON_COMPLETION (...) }
    { WITH 
        { DISCARD_ON_SUCCESS }
        { DISCARD_ON_ERROR }
        { DLQ_ON_ERROR '...' }
        { RETRY < LINEAR | EXPONENTIAL > (<n>,<n>) }
    }
;

TABLE

The target table name to use; may contain DataZen functions and pipeline variables to distribute records across multiple tables; used as the name of the object to create if it does not exist

AUTO_MERGE

Automatically ingests data and generates database scripts automatically; available for some database engines only

SCHEMA_DRIFT

Automatically adjusts column names and data types if the schema changes over time; for SQL Server targets only

ON UPSERT

Section that defines the SQL operation to execute when processing upserted records

ON DELETE

Section that defines the SQL operation to execute when processing deleted records

ON INIT

Preliminary SQL operation that executes before each batch of data

ON COMPLETION

SQL script that executes after the completion of each batch of data; the @success variable is available (either set to 'success', 'failure')

COMMAND

The SQL operation to execute in parenthesis; accepts DataZen functions and pipeline variables; ignored if AUTO_MERGE is used

TIMEOUT

A timeout value in seconds

BATCH

The number of records to process in a single call

RETRY_INTERVAL

The retry interval in seconds (default: 1)

RETRY_COUNT

The maximum number of retries to perform (default: 1)

WITH_PAYLOAD

When tracing is enabled, indicates that the response payload should also be logged

DISCARD_ON_SUCCESS

Deletes the change log after successful completion of the push operation

DISCARD_ON_ERROR

Deletes the change log if the push operation failed

DLQ_ON_ERROR

Moves the change log to a subfolder or directory if the push operation failed

RETRY EXPONENTIAL

Retries the operation on failure up to N times, waiting P seconds exponentially longer every time (N,P)

RETRY LINEAR

Retries the operation on failure up to N times, waiting P seconds every time (N,P)

Example 1


-- Load the next inserts/updates previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT UPDATE DELETE ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Send all changes, including deleted records, to a target SQL Server table
-- with automatic merge and schema drifting support
PUSH INTO DB [sql2017] 
	TABLE 'dbo..rssTarget'
	AUTO_MERGE 
	SCHEMA_DRIFT
;

Example 2


-- Load the next inserts/updates previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT UPDATE DELETE ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Send all inserts and updates to a target Databricks table using an ODBC Driver
-- and leverage the @sql() command to build the source data set, with up to 1000
-- records at a time - leverage DataBrick's schema evolution option
PUSH INTO DB [databricksodbc] 
	TABLE 'rssTarget'
	BATCH 1000
	ON UPSERT (
MERGE WITH SCHEMA EVOLUTION 
INTO databasesodbc 
USING 
    ( 
    @sql_union(SELECT  '{{title}}' as title, {{link}} as link, '{{guid}}' as guid)
    ) source 
ON  source.guid =  databasesodbc.guid
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT * 
;
	)
;