PUSH HTTP

Overview

The Push Http operation allows you to send the Upsert and Delete streams to an HTTP/S endpoint. This operation supports specifying both the Upsert and the Delete operation at the same time.

In some scenarios, you may need to send a single item at a time; use the BATCH option to control this behavior.

Syntax

Executes HTTP operations for upserted and/or deleted records one at a time or in batch.

PUSH INTO HTTP [CONNECTION] 
    { ON_UPSERT 
        (...)   -- HTTP Command to execute on upserted records
        { HEADERS '...' }
        { CONTENT_TYPE '...' }
        { PAYLOAD (...) }
        { TIMEOUT <n> }
        { BATCH <n> }
        { CONTINUE_ON_HTTP '...' } 
        { THROW_ON_REGEX '...' } 
        { TRACE INTO [CONNECTION] 
            { TABLE '...' { INCLUDE HTTP_RESPONSE } }
            { EXEC (...) }
        }
    { ON_DELETE 
        (...)      -- HTTP Command to execute on deleted records
        { HEADERS '...' }
        { CONTENT_TYPE '...' }
        { PAYLOAD (...) }
        { TIMEOUT <n> }
        { BATCH <n> }
        { CONTINUE_ON_HTTP '...' } 
        { THROW_ON_REGEX '...' } 
        { TRACE INTO [CONNECTION] 
            { TABLE '...' { INCLUDE HTTP_RESPONSE } }
            { EXEC (...) }
        }
    { WITH 
        { DISCARD_ON_SUCCESS }
        { DISCARD_ON_ERROR }
        { DLQ_ON_ERROR '...' }
        { RETRY < LINEAR | EXPONENTIAL > (<n>,<n>) }
    }

;

ON UPSERT

Section that defines the HTTP operation to execute when processing upserted records

ON DELETE

Section that defines the HTTP operation to execute when processing deleted records

COMMAND

The HTTP operation to execute in parenthesis; accepts DataZen functions and pipeline variables (ex: GET https://...)

HEADERS

An array of key/value pair headers to add to the HTTP request (ex: [ { "key1": "value1" }, { "key2": "value2" } } ])

PAYLOAD

For POST, PUT and PATCH operations, represents the optional payload to use; accepts field names, DataZen functions, and pipeline variables; this payload can be build with partially automated content generation using @concatjson, @concatjsonarr, @concatxml, @sql, @sql_union, @sql_unionall, or @sql_concat when the BATCH setting is greater than 1

TIMEOUT

A timeout value in seconds

BATCH

The number of records to process in a single call

CONTENT_TYPE

The HTTP Request Content-Type setting for POST, PUT, and PATCH operations

THROW_ON_REGEX

Throws an error if the regular expression provided matches the payload response even if the HTTP response indicates success

CONTINUE_ON_HTTP

Continues processing even if the HTTP request returns specific HTTP codes (ex: '400,404')

RETRY_ON

Retry the HTTP operation if the text provided is found in the response body or in the node identified by the RETRY_PATH parameter

RETRY_ONREGEX

Retry the HTTP operation if the regular expression provided results in a positive match when applied on the response body or in the node identified by the RETRY_PATH parameter

RETRY_INTERVAL

The retry interval in seconds (default: 1)

RETRY_COUNT

The maximum number of retries to perform (default: 1)

WITH_PAYLOAD

When tracing is enabled, indicates that the response payload should also be logged

DISCARD_ON_SUCCESS

Deletes the change log after successful completion of the push operation

DISCARD_ON_ERROR

Deletes the change log if the push operation failed

DLQ_ON_ERROR

Moves the change log to a subfolder or directory if the push operation failed

RETRY EXPONENTIAL

Retries the operation on failure up to N times, waiting P seconds exponentially longer every time (N,P)

RETRY LINEAR

Retries the operation on failure up to N times, waiting P seconds every time (N,P)

Example 1


-- Load the next inserts previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Make a call to a Slack Channel when a new RSS feed is detected
PUSH INTO HTTP [slackConnection] 
	ON_UPSERT 
		(POST chat.postMessage)
		BATCH 1
		CONTENT_TYPE 'application/x-www-form-urlencoded'
		PAYLOAD (channel=C000CDE0PAA&text=New RSS Feed: 
Title: {{title}})
;

Example 2


-- Load the next inserts previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Make a call to a cloud function and send an array of JSON documents containing
-- up to 100 objects at a time for both the Upsert and Delete streams
PUSH INTO HTTP [cloudFx] 
	ON_UPSERT 
		(POST /processData)
		BATCH 100
		CONTENT_TYPE 'application/json'
		PAYLOAD ([@concatjson({ 
"guid": "{{guid}}", 
"title": "title", 
"sendOn": "#utcnow()"
 })]
)
	ON_DELETE 
		(POST /processDeletes)
		BATCH 100
		CONTENT_TYPE 'application/json'
		PAYLOAD ([@concatjson({ 
"guid": "{{guid}}", 
"title": null, 
"sendOn": "#utcnow()"
 })]
)

	-- delete the change log upon success
	WITH DISCARD_ON_SUCCESS
;