MERGE

Overview

The Merge operation compares records available in the data pipeline with a target table in a supported database engine, and perform an Insert, Update, and/or a Delete operation based on the options specified. Similarly to the SINK operation, the target table will be created automatically if needed, but the table schema will not be upgraded unless the SCHEMA_DRIFT option is also specified; this is a different behavior than the SINK operation by design.

If none of the operations are specified (INSERTS, UPDATES, DELETES), all of them are assumed.

The DELETES option should be used only if the data pipeline is expected to always return all available records from the source system. To avoid accidental deletes, consider adding an EXIT condition first if the record count is 0 or unexpetedly low.

The Merge operation is designed to work with Key Columns; if no key columns are provided, the Merge command will ignore UPDATES and DELETES, and all the records from the data pipeline will be appended to the table.

When using the Merge operation in an Inner Pipeline, the use of the DELETES option may yield unexpected results because the operations in this block may be executed with partial record sets.

Syntax

Implements an automated merge operation for Inserts, Updates, and Deletes based on a unique row identifier

MERGE INTO [CONNECTION] 
    TABLE '...' 
    { WITH  
        { KEY COLUMNS '...' }
        { { INSERTS } { UPDATES } { DELETES } }
        { STAGE ON [CONNECTION] '...' CREDENTIALS '...' }
        { SCHEMA_DRIFT }
        { SCHEMA_UPGRADE }
    }
;

TABLE

The table name to use; may contain DataZen functions and pipeline variables; the table will be created automatically is not found using the schema of the pipeline data

STAGE ON

Specifies the connection and folder/container to use in order to stage the data first depending on the target database

CREDENTIALS

When a stage environment is specified, this option is used to provide the necessary credentials

KEY COLUMNS

A comma-separated list of fields to use as the unique record identifier

SCHEMA_DRIFT

When specified, this option will attempt to add columns if needed

SCHEMA_UPGRADE

When specified, this option will attempt to upgrade data types; some database engines have limited support for this operation

INSERTS

When specified, this option will add missing records based on the key columns provided, or all records if no key columns are specified

UPDATES

When specified, this option will update existing records based on the key columns provided; key columns must be provided for this option

DELETES

When specified, this option will delete records if not found in the data pipeline based on the key columns provided; key columns must be provided for this option

Example 1

-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);

-- merges the data set into a table but does not delete records 
-- that may be missing from the source 
-- the table will be created if it does not exist and columns will 
-- be adjusted or added in necessary
MERGE INTO [sql2017] TABLE '[feed].[rssdata]'
WITH 
	KEY COLUMNS 'guid' 
	INSERTS UPDATES
	SCHEMA_DRIFT
;

Example 2

-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);

-- merges the data set into a table in a Fabric environment
-- the table will be created if it does not exist and columns will 
-- be adjusted or added in necessary; the ADLS connection provided
-- will be used to stage the data first; credentials stored in the 
-- environment variable called adlsSecret will be used 

-- When no operations are specified, all three are assumed: 
-- INSERTS UPDATES DELETES 

MERGE INTO [fabric] TABLE '[feed].[rssdata]'
WITH 
	STAGE ON [adls2.0] 'stagedata' 
	CREDENTIALS (#getvar(adlsSecret))
	KEY COLUMNS 'guid' 
	SCHEMA_DRIFT
	SCHEMA_UPGRADE 
;