APPLY HTTP

Overview

The APPLY HTTP operation is used to make calls to HTTP/S endpoints as part of a data pipeline and provides options to enhance the current pipeline data set or replace it. Use this component when you would like to:

  • Call an HTTP endpoint once then continue processing the data pipeline
  • Call an HTTP endpoint once and add the HTTP response data as additional columns
  • Call an HTTP endpoint once, transform the returned payload, and replace the pipeline data set with this one
  • Call an HTTP endpoint once per row from the pipeline data set and add the HTTP response data as additional columns
  • Call an HTTP endpoint once per row from the pipeline data set, transform the response of each request, and perform a LEFT or INNER join with the response

This component can be useful to enhance the current data with the output provided by an HTTP/S endpoint. For example, if the pipeline data contains a list of items with unique item IDs, you can use this component to make a call to an API that returns item details. Another useful scenario is to call an AI endpoint, such as an image description endpoint, and return information about the content of the image.

Syntax

Executes and inline HTTP operation and optionally enhances the data set with the result.

APPLY HTTP [CONNECTION] 
  (< GET | POST | PUT | PATCH | DELETE > ... )
  { WITH 
      { PAGING < 'none' | 'offset' | 'page' | 'link' | 'token' >
        { PAGING_PARAM '...' }
        { PAGING_PATH '...' }
      }
      { TIMEOUT <n> }
      { HEADERS '...' }
      { PAYLOAD (...) }
      { CONTENT_TYPE '...' }
      { RETURN_BYTES }

      { PROCESSING < 'continue' | 'replace' | 'inner join' | 'left join' | 'enhance' > }
      { WHEN '...' }
      { SHIFT_ROWS N }
      { CONTINUE_ON_ERRORS }
      { CONTINUE_ON_HTTPERROR '...' }
      {
        < { RETRY_ON '...'  } | { RETRY_ONREGEX '...' } >
        { RETRY_PATH '...'  }
        { RETRY_INTERVAL N  }
        { RETRY_COUNT N }
      }
  }
  { TRACE INTO [CONNECTION] 
    TABLE '...' { WITH_PAYLOAD }
    { WITH_PAYLOAD }
    { STAGE ON [CONNECTION] '...' CREDENTIALS '...' }
    { ADD_TRACKING_FIELDS }
    { TRUNCATE | TRUNCATE_APPEND }
    { SKIP_PREVIEW }
  }
  { APPLY TX '...' 
    { WITH RAWCOL '...' } 
  }
;

HEADERS

An array of key/value pair headers to add to the HTTP request (ex: [ { "key1": "value1" }, { "key2": "value2" } } ])

PAYLOAD

For POST, PUT and PATCH operations, represents the optional payload to use

TIMEOUT

A timeout value in seconds

PAGING

The paging strategy to use; must be one of: 'none', 'offset', 'page', 'link', 'token'

PAGING_PARAM

The paging parameter to add to the URL as a query string; if not specified, paging variables can be used in the URL or the PAYLOAD (@pagingindex, @pagingmarker, @recordcount, @recordindex)

PAGING_PATH

The path within the raw response to use to identify the paging token or link to use on the next call, or the node to use as the count of items returned when using an offset strategy (may be an object array or a node containing a value representing the total count of items retrieved so far)

DEBUG_MAX_PAGES

During preview operations, the maximum number of paging operations to perform

CONTENT_TYPE

The HTTP Request Content-Type setting for POST, PUT, and PATCH operations

RETURN_BYTES

Indicates that the HTTP call returns binary data (ex: an image or PDF document) instead of a string

PROCESSING

The processing mode to use: continue, replace, left join, inner join, enhance - default: continue

SHIFT_ROWS

When enhance is used, the number of rows to skip from the original data set; this option is useful when applying statistical operations to a dataset but the results has fewer records, such as a moving average calculation

CONTINUE_ON_ERROR

Continues processing even if errors are detected

CONTINUE_ON_HTTP

Continues processing even if the HTTP request returns specific HTTP codes (ex: '400,404')

RETRY_ON

Retry the HTTP operation if the text provided is found in the response body or in the node identified by the RETRY_PATH parameter

RETRY_ONREGEX

Retry the HTTP operation if the regular expression provided results in a positive match when applied on the response body or in the node identified by the RETRY_PATH parameter

RETRY_PATH

The path to limit the test use when searching in a JSON or XML document

RETRY_INTERVAL

The retry interval in seconds (default: 1)

RETRY_COUNT

The maximum number of retries to perform (default: 1)

TRACE INTO

Saves each request and response to a database table for raw data inspection and debugging purposes without the payload response unless WITH_PAYLOAD is also specified

TABLE

The table name in which each HTTP operation will be stored

WITH_PAYLOAD

When tracing is enabled, indicates that the response payload should also be logged

STAGE ON

Specifies the connection and folder/container to use in order to stage the data first depending on the target database

CREDENTIALS

When a stage environment is specified, this option is used to provide the necessary credentials

ADD_TRACKING_FIELDS

Adds internal tracking fields including executionId, pipeline name, execution date and more

TRUNCATE

When specified, indicates the trace table should first be truncated if it exists

TRUNCATE_APPEND

The target table is first truncated if it exists then appended to when multiple pages of data are being processed (ignore if ON COLUMNS is specified)

SKIP_PREVIEW

When tracing is enabled, indicates that nothing should be logged during preview operations

WHEN

Only perform the apply operation on a subset of the rows that match the filter provided and leave the other rows as-is

RAWCOL

When content transformation is enabled, indicates the column to add to the response that contains the raw content of each row

Example 1

-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);

-- Call an HTTP/S cloud function and send the entire payload
-- as a JSON document, built as an array of objects containing
-- the guid, title, and link of the RSS feed, then continue 

APPLY HTTP [CloudFunction]
	(POST https://mycloudfunction.com/api/ready)
	CONTENT_TYPE 'application/json'
	PAYLOAD (@concatjsonarr({"guid" : "{{guid}}", "title": "{{title}}", "link": "{{link}}"}))
	PROCESSING 'continue'
	CONTINUE_ON_ERRORS
	;

Example 2

-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (//item);

-- Call an HTTP/S cloud function and send one row at a time 
-- as a JSON document, then transform the response into 
-- rows and columns and perform a LEFT JOIN using the source 
-- data; add a "_raw" column for each row transformed that contains
-- the full response object for the row

APPLY HTTP [CloudFunction]
	(POST https://mycloudfunction.com/api/ready)
	CONTENT_TYPE 'application/json'
	PAYLOAD ({"guid" : "{{guid}}", "title": "{{title}}", "link": "{{link}}"})
	PROCESSING 'left join'

	-- Transform the data using a left join operation with new columns, and 
	-- add the f_ prefix to new columns to avoid possible conflicts with the 
	-- existing ones - also add the _raw column to have the full payload available
	APPLY TX 'f_=$.data' WITH RAWCOL '_raw'

	CONTINUE_ON_ERRORS
	;

Example 3

--
-- Get last 30 days of open tickets from Zendesk with pagination
-- Zendesk expects a Unix timestamp 
--

SELECT * FROM HTTP [Enzo-ZenDesk] (GET /incremental/tickets.json?start_time=#toepoch(#dateadd(#now(), -30d)))
WITH 	
	PAGING 'link'
	PAGING_PATH 'next_page'
	DEBUG_MAX_PAGES  2
APPLY TX 'tickets'
APPLY PIPELINE (
	-- Inner pipeline that eliminates closed/solved tickets from the result
	APPLY FILTER ([status] = 'solved' OR [status] = 'closed' );
)
;

-- Reorder by the updated_at field 
APPLY FILTER '1=1' ORDER BY '[updated_at] DESC';

-- Apply a strong schema and extract custom fields from the current payload
-- called hours and billable using their unique identifiers 
APPLY SCHEMA (
	string [url],
	int [id],
	string [updated_at],
	string [subject],
	string [status],
	long [requester_id],
	long [submitter_id],
	long [organization_id],
	string [custom_fields],
	string [fields],
	bool billable = '@parsejson2({{custom_fields}}, $.[?(@.id==646985539)].value)',
	string hours = '@parsejson2({{custom_fields}}, $.[?(@.id==646993666)].value)'

) STRICT_COLUMNS;

-- Keep billable hours
APPLY FILTER '[billable] = 1 AND [hours] > 0' ORDER BY '[updated_at] DESC';

-- Find the original requester and add the u_ prefix to the columns added
-- by this left join operation; since we are not using the APPLY TX option,
-- this call will return the standard HTTP output columns (u_payload, u_httpheaders...)
APPLY HTTP [Enzo-ZenDesk] (GET /users/{{requester_id}}.json)
WITH PROCESSING 'left join'
APPLY TX 'u_=user'
;

-- Enforce any additional schema on the new fields 
APPLY SCHEMA (
	[url],
	[id],
	[updated_at],
	[subject],
	[status],
	[requester_id],
	[submitter_id],
	[organization_id],
	[custom_fields],
	[fields],
	billable ,
	hours,
	[u_name],
	[u_email],
	[u_organization_id] 
) STRICT_COLUMNS;

-- Finally continue to expand the data with organizational details 
-- and add the o_ prefix to the fields added by this call. 
APPLY HTTP [Enzo-ZenDesk] (GET /organizations/{{u_organization_id}}.json)
WITH PROCESSING 'left join'
APPLY TX 'o_=organization'
;