APPLY HTTP
Overview
The APPLY HTTP operation is used to make calls to HTTP/S endpoints as part of a data pipeline and provides options to enhance the current pipeline data set or replace it. Use this component when you would like to:
- Call an HTTP endpoint once then continue processing the data pipeline
- Call an HTTP endpoint once and add the HTTP response data as additional columns
- Call an HTTP endpoint once, transform the returned payload, and replace the pipeline data set with this one
- Call an HTTP endpoint once per row from the pipeline data set and add the HTTP response data as additional columns
- Call an HTTP endpoint once per row from the pipeline data set, transform the response of each request, and perform a LEFT or INNER join with the response
This component can be useful to enhance the current data with the output provided by an HTTP/S endpoint. For example, if the pipeline data contains a list of items with unique item IDs, you can use this component to make a call to an API that returns item details. Another useful scenario is to call an AI endpoint, such as an image description endpoint, and return information about the content of the image.
Syntax
Executes and inline HTTP operation and optionally enhances the data set with the result.
APPLY HTTP [CONNECTION]
(< GET | POST | PUT | PATCH | DELETE > ... )
{ WITH
{ PAGING < 'none' | 'offset' | 'page' | 'link' | 'token' >
{ PAGING_PARAM '...' }
{ PAGING_PATH '...' }
}
{ TIMEOUT <n> }
{ HEADERS '...' }
{ PAYLOAD (...) }
{ CONTENT_TYPE '...' }
{ RETURN_BYTES }
{ PROCESSING < 'continue' | 'replace' | 'inner join' | 'left join' | 'enhance' > }
{ WHEN '...' }
{ SHIFT_ROWS N }
{ CONTINUE_ON_ERRORS }
{ CONTINUE_ON_HTTPERROR '...' }
{
< { RETRY_ON '...' } | { RETRY_ONREGEX '...' } >
{ RETRY_PATH '...' }
{ RETRY_INTERVAL N }
{ RETRY_COUNT N }
}
}
{ TRACE INTO [CONNECTION]
TABLE '...' { WITH_PAYLOAD }
{ WITH_PAYLOAD }
{ STAGE ON [CONNECTION] '...' CREDENTIALS '...' }
{ ADD_TRACKING_FIELDS }
{ TRUNCATE | TRUNCATE_APPEND }
{ SKIP_PREVIEW }
}
{ APPLY TX '...'
{ WITH RAWCOL '...' }
}
;HEADERS |
An array of key/value pair headers to add to the HTTP request (ex: [ { "key1": "value1" }, { "key2": "value2" } } ]) |
PAYLOAD |
For POST, PUT and PATCH operations, represents the optional payload to use |
TIMEOUT |
A timeout value in seconds |
PAGING |
The paging strategy to use; must be one of: 'none', 'offset', 'page', 'link', 'token' |
PAGING_PARAM |
The paging parameter to add to the URL as a query string; if not specified, paging variables can be used in the URL or the PAYLOAD (@pagingindex, @pagingmarker, @recordcount, @recordindex) |
PAGING_PATH |
The path within the raw response to use to identify the paging token or link to use on the next call, or the node to use as the count of items returned when using an offset strategy (may be an object array or a node containing a value representing the total count of items retrieved so far) |
DEBUG_MAX_PAGES |
During preview operations, the maximum number of paging operations to perform |
CONTENT_TYPE |
The HTTP Request Content-Type setting for POST, PUT, and PATCH operations |
RETURN_BYTES |
Indicates that the HTTP call returns binary data (ex: an image or PDF document) instead of a string |
PROCESSING |
The processing mode to use: continue, replace, left join, inner join, enhance - default: continue |
SHIFT_ROWS |
When enhance is used, the number of rows to skip from the original data set; this option is useful when applying statistical operations to a dataset but the results has fewer records, such as a moving average calculation |
CONTINUE_ON_ERROR |
Continues processing even if errors are detected |
CONTINUE_ON_HTTP |
Continues processing even if the HTTP request returns specific HTTP codes (ex: '400,404') |
RETRY_ON |
Retry the HTTP operation if the text provided is found in the response body or in the node identified by the RETRY_PATH parameter |
RETRY_ONREGEX |
Retry the HTTP operation if the regular expression provided results in a positive match when applied on the response body or in the node identified by the RETRY_PATH parameter |
RETRY_PATH |
The path to limit the test use when searching in a JSON or XML document |
RETRY_INTERVAL |
The retry interval in seconds (default: 1) |
RETRY_COUNT |
The maximum number of retries to perform (default: 1) |
TRACE INTO |
Saves each request and response to a database table for raw data inspection and debugging purposes without the payload response unless WITH_PAYLOAD is also specified |
TABLE |
The table name in which each HTTP operation will be stored |
WITH_PAYLOAD |
When tracing is enabled, indicates that the response payload should also be logged |
STAGE ON |
Specifies the connection and folder/container to use in order to stage the data first depending on the target database |
CREDENTIALS |
When a stage environment is specified, this option is used to provide the necessary credentials |
ADD_TRACKING_FIELDS |
Adds internal tracking fields including executionId, pipeline name, execution date and more |
TRUNCATE |
When specified, indicates the trace table should first be truncated if it exists |
TRUNCATE_APPEND |
The target table is first truncated if it exists then appended to when multiple pages of data are being processed (ignore if ON COLUMNS is specified) |
SKIP_PREVIEW |
When tracing is enabled, indicates that nothing should be logged during preview operations |
WHEN |
Only perform the apply operation on a subset of the rows that match the filter provided and leave the other rows as-is |
RAWCOL |
When content transformation is enabled, indicates the column to add to the response that contains the raw content of each row |
Example 1
-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);
-- Call an HTTP/S cloud function and send the entire payload
-- as a JSON document, built as an array of objects containing
-- the guid, title, and link of the RSS feed, then continue
APPLY HTTP [CloudFunction]
(POST https://mycloudfunction.com/api/ready)
CONTENT_TYPE 'application/json'
PAYLOAD (@concatjsonarr({"guid" : "{{guid}}", "title": "{{title}}", "link": "{{link}}"}))
PROCESSING 'continue'
CONTINUE_ON_ERRORS
;
Example 2
-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);
-- Call an HTTP/S cloud function and send one row at a time
-- as a JSON document, then transform the response into
-- rows and columns and perform a LEFT JOIN using the source
-- data; add a "_raw" column for each row transformed that contains
-- the full response object for the row
APPLY HTTP [CloudFunction]
(POST https://mycloudfunction.com/api/ready)
CONTENT_TYPE 'application/json'
PAYLOAD ({"guid" : "{{guid}}", "title": "{{title}}", "link": "{{link}}"})
PROCESSING 'left join'
-- Transform the data using a left join operation with new columns, and
-- add the f_ prefix to new columns to avoid possible conflicts with the
-- existing ones - also add the _raw column to have the full payload available
APPLY TX 'f_=$.data' WITH RAWCOL '_raw'
CONTINUE_ON_ERRORS
;
Example 3
--
-- Get last 30 days of open tickets from Zendesk with pagination
-- Zendesk expects a Unix timestamp
--
SELECT * FROM HTTP [Enzo-ZenDesk] (GET /incremental/tickets.json?start_time=#toepoch(#dateadd(#now(), -30d)))
WITH
PAGING 'link'
PAGING_PATH 'next_page'
DEBUG_MAX_PAGES 2
APPLY TX 'tickets'
APPLY PIPELINE (
-- Inner pipeline that eliminates closed/solved tickets from the result
APPLY FILTER ([status] = 'solved' OR [status] = 'closed' );
)
;
-- Reorder by the updated_at field
APPLY FILTER '1=1' ORDER BY '[updated_at] DESC';
-- Apply a strong schema and extract custom fields from the current payload
-- called hours and billable using their unique identifiers
APPLY SCHEMA (
string [url],
int [id],
string [updated_at],
string [subject],
string [status],
long [requester_id],
long [submitter_id],
long [organization_id],
string [custom_fields],
string [fields],
bool billable = '@parsejson2({{custom_fields}}, $.[?(@.id==646985539)].value)',
string hours = '@parsejson2({{custom_fields}}, $.[?(@.id==646993666)].value)'
) STRICT_COLUMNS;
-- Keep billable hours
APPLY FILTER '[billable] = 1 AND [hours] > 0' ORDER BY '[updated_at] DESC';
-- Find the original requester and add the u_ prefix to the columns added
-- by this left join operation; since we are not using the APPLY TX option,
-- this call will return the standard HTTP output columns (u_payload, u_httpheaders...)
APPLY HTTP [Enzo-ZenDesk] (GET /users/{{requester_id}}.json)
WITH PROCESSING 'left join'
APPLY TX 'u_=user'
;
-- Enforce any additional schema on the new fields
APPLY SCHEMA (
[url],
[id],
[updated_at],
[subject],
[status],
[requester_id],
[submitter_id],
[organization_id],
[custom_fields],
[fields],
billable ,
hours,
[u_name],
[u_email],
[u_organization_id]
) STRICT_COLUMNS;
-- Finally continue to expand the data with organizational details
-- and add the o_ prefix to the fields added by this call.
APPLY HTTP [Enzo-ZenDesk] (GET /organizations/{{u_organization_id}}.json)
WITH PROCESSING 'left join'
APPLY TX 'o_=organization'
;