SINK INTO DRIVE

Overview

This operation dumps the current pipeline data set into one or more files. Dumping the data in files can be useful for generating immutable data sets, such as Parquet files, that can later be ingested using an external pipeline engine, such as dbt or Azure Fabric.

Creating Parquet files in cloud stores and leveraging the native cloud database pipeline ingestion engine usually offers much improved performance.

Syntax

Saves the pipeline data set into a file (or files).

SINK INTO DRIVE [CONNECTION] 
    FORMAT < PARQUET | CSV | JSON | XML | RAW >
    
    { DATE_FIELD '<field>' }
    { CONTAINER '...' }
    { FILE '...' }

    -- Parquet options
    { COMPRESSION < 'NONE' | 'SNAPPY' | 'GZIP' | 'LZ4RAW' | 'ZSTD' | 'BROTLI' | 'LZO' | 'LZ4' > }
    { ROWS_PER_GROUP <n> }

    -- CSV options
    { DELIMITER '...' }
    { COMMENT_TOKENS '...' }
    { FIXED_LENGTH '...' }
    { DATE_FORMAT '...' }
    { ADD_HEADER }
    { FORCE_QUOTED }
    { FLATTEN }
    { TRIM }

    -- JSON + XML options
    { FILE_PER_ROW }

    -- XML options
    { ROOT_NODE '...' }
    { ELEMENT '...' }
    
    -- Raw option
    { COLUMN '<field>' }
;

FORMAT

The file format to use: PARQUET, CSV, JSON, XML, RAW

DATE_FIELD

The field to use from the pipepine data set as the date/time value when using date tokens in the FILE or CONTAINER name; if not specified, uses the current run date of the job pipeline

CONTAINER

Overrides the target folder, container or bucket to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly

FILE

The target file name of file name pattern to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly

COMPRESSION

When using the PARQUET format, specifies the compression to use (default: SNAPPY): 'NONE', 'SNAPPY', 'GZIP', 'LZ4RAW', 'ZSTD', 'BROTLI', 'LZO', 'LZ4'

ROWS_PER_GROUP

When using the PARQUET format, determines the number of rows per group in the parquet file (default: 5000)

DELIMITER

Column delimiter to use (for CSV only)

FIXED_LENGTH

Comma-separated list of fixed lengths when creating a fixed-length file; if specified, the DELIMITER option is ignored (for CSV only)

DATE_FORMAT

The date format to use when a column is a date/time data type; any valid formatter is accepted (ex: 'o' or 'yyyyMMdd HH:mm:ss.fff') (for CSV only)

ADD_HEADER

Adds a header row to the file (for CSV only)

FORCE_QUOTED

Adds double-quotes to field names and values (for CSV only)

FLATTEN

Removes line feeds from strings (for CSV only)

TRIM

Trims string values (for CSV only)

FILE_PER_ROW

Creates a single file per row of data (for JSON/XML only)

ROOT_NODE

Uses the root not name specified; default: root (for XML only)

ELEMENT

Uses the element name specified for each record; default: item (for XML only)

COLUMN

Uses the column name specified as the content to save (for RAW only)

REPLACE

Replaces the outcome of this operation as the new pipeline data set

Example 1

-- Get data from an RSS Feed
SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (--item);

-- Only capture newly created feeds since the last call
-- Note: the capture may contains hundreds of records
CAPTURE 'rssdata' 
	INSERT ON KEYS 'guid' 
	WITH PATH [adls] '/logs'
	;

-- Dump the data in a single Parquet file  
-- and replaces the pipeline data with the outcome of this operation
-- since the PUSH operation should execute once for the parquet file
SINK INTO DRIVE [adls] 
	FORMAT 'PARQUET'
	CONTAINER 'inbound'
	FILE '/load/data_@executionid.parquet'
	COMPRESSION 'snappy'
	REPLACE
	;
	
-- Push the above parquet file into an Azure Fabric data warehouse
-- With the REPLACE operation above, the pipeline now only has 1 record 
-- so this operation will be executed only once

PUSH INTO DB [mywarehouse]
-- specifying the table here allows DataZen to create the target table
-- if it doesn't exist
TABLE 'rsstable'
ON_UPSERT 
(
  COPY INTO [rsstable]
  FROM 'https:--myadlsendpoint.dfs.core.windows.net/inbound/load/data_@executionid.parquet'
  WITH ( 
    FILE_TYPE = 'PARQUET',
    CREDENTIAL=(IDENTITY= 'Shared Access Signature', SECRET='#getvar(adls-sas)'),
    COMPRESSION = 'Snappy'
  )
);