PUSH HTTP
Overview
The Push Http operation allows you to send the Upsert and Delete streams to an HTTP/S endpoint. This operation supports specifying both the Upsert and the Delete operation at the same time.
In some scenarios, you may need to send a single item at a time; use the BATCH
option
to control this behavior.
Syntax
Executes HTTP operations for upserted and/or deleted records one at a time or in batch.
PUSH INTO HTTP [CONNECTION] { ON_UPSERT (...) -- HTTP Command to execute on upserted records { HEADERS '...' } { CONTENT_TYPE '...' } { PAYLOAD (...) } { TIMEOUT <n> } { BATCH <n> } { CONTINUE_ON_HTTP '...' } { THROW_ON_REGEX '...' } { TRACE INTO [CONNECTION] { TABLE '...' { INCLUDE HTTP_RESPONSE } } { EXEC (...) } } { ON_DELETE (...) -- HTTP Command to execute on deleted records { HEADERS '...' } { CONTENT_TYPE '...' } { PAYLOAD (...) } { TIMEOUT <n> } { BATCH <n> } { CONTINUE_ON_HTTP '...' } { THROW_ON_REGEX '...' } { TRACE INTO [CONNECTION] { TABLE '...' { INCLUDE HTTP_RESPONSE } } { EXEC (...) } } { WITH { DISCARD_ON_SUCCESS } { DISCARD_ON_ERROR } { DLQ_ON_ERROR '...' } { RETRY < LINEAR | EXPONENTIAL > (<n>,<n>) } } ;
ON UPSERT |
Section that defines the HTTP operation to execute when processing upserted records |
ON DELETE |
Section that defines the HTTP operation to execute when processing deleted records |
COMMAND |
The HTTP operation to execute in parenthesis; accepts DataZen functions and pipeline variables (ex: GET https://...) |
HEADERS |
An array of key/value pair headers to add to the HTTP request (ex: [ { "key1": "value1" }, { "key2": "value2" } } ]) |
PAYLOAD |
For POST, PUT and PATCH operations, represents the optional payload to use; accepts field names, DataZen functions, and pipeline variables; this payload can be build with partially automated content generation using @concatjson, @concatjsonarr, @concatxml, @sql, @sql_union, @sql_unionall, or @sql_concat when the BATCH setting is greater than 1 |
TIMEOUT |
A timeout value in seconds |
BATCH |
The number of records to process in a single call |
CONTENT_TYPE |
The HTTP Request Content-Type setting for POST, PUT, and PATCH operations |
THROW_ON_REGEX |
Throws an error if the regular expression provided matches the payload response even if the HTTP response indicates success |
CONTINUE_ON_HTTP |
Continues processing even if the HTTP request returns specific HTTP codes (ex: '400,404') |
RETRY_ON |
Retry the HTTP operation if the text provided is found in the response body or in the node identified by the RETRY_PATH parameter |
RETRY_ONREGEX |
Retry the HTTP operation if the regular expression provided results in a positive match when applied on the response body or in the node identified by the RETRY_PATH parameter |
RETRY_INTERVAL |
The retry interval in seconds (default: 1) |
RETRY_COUNT |
The maximum number of retries to perform (default: 1) |
WITH_PAYLOAD |
When tracing is enabled, indicates that the response payload should also be logged |
DISCARD_ON_SUCCESS |
Deletes the change log after successful completion of the push operation |
DISCARD_ON_ERROR |
Deletes the change log if the push operation failed |
DLQ_ON_ERROR |
Moves the change log to a subfolder or directory if the push operation failed |
RETRY EXPONENTIAL |
Retries the operation on failure up to N times, waiting P seconds exponentially longer every time (N,P) |
RETRY LINEAR |
Retries the operation on failure up to N times, waiting P seconds every time (N,P) |
Example 1
-- Load the next inserts previously captured in a cloud folder using -- CAPTURE 'mycdc' INSERT ON KEYS 'guid' WITH PATH [adls2.0] 'container' LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid'; -- Make a call to a Slack Channel when a new RSS feed is detected PUSH INTO HTTP [slackConnection] ON_UPSERT (POST chat.postMessage) BATCH 1 CONTENT_TYPE 'application/x-www-form-urlencoded' PAYLOAD (channel=C000CDE0PAA&text=New RSS Feed: Title: {{title}}) ;
Example 2
-- Load the next inserts previously captured in a cloud folder using -- CAPTURE 'mycdc' INSERT ON KEYS 'guid' WITH PATH [adls2.0] 'container' LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid'; -- Make a call to a cloud function and send an array of JSON documents containing -- up to 100 objects at a time for both the Upsert and Delete streams PUSH INTO HTTP [cloudFx] ON_UPSERT (POST /processData) BATCH 100 CONTENT_TYPE 'application/json' PAYLOAD ([@concatjson({ "guid": "{{guid}}", "title": "title", "sendOn": "#utcnow()" })] ) ON_DELETE (POST /processDeletes) BATCH 100 CONTENT_TYPE 'application/json' PAYLOAD ([@concatjson({ "guid": "{{guid}}", "title": null, "sendOn": "#utcnow()" })] ) -- delete the change log upon success WITH DISCARD_ON_SUCCESS ;