SINK INTO DRIVE
Overview
This operation dumps the current pipeline data set into one or more files. Dumping the data in files can be useful for generating immutable data sets, such as Parquet files, that can later be ingested using an external pipeline engine, such as dbt or Azure Fabric.
Creating Parquet files in cloud stores and leveraging the native cloud database pipeline ingestion engine usually offers much improved performance.
Syntax
Saves the pipeline data set into a file (or files).
SINK INTO DRIVE [CONNECTION] FORMAT < PARQUET | CSV | JSON | XML | RAW > { DATE_FIELD '<field>' } { CONTAINER '...' } { FILE '...' } -- Parquet options { COMPRESSION < 'NONE' | 'SNAPPY' | 'GZIP' | 'LZ4RAW' | 'ZSTD' | 'BROTLI' | 'LZO' | 'LZ4' > } { ROWS_PER_GROUP <n> } -- CSV options { DELIMITER '...' } { COMMENT_TOKENS '...' } { FIXED_LENGTH '...' } { DATE_FORMAT '...' } { ADD_HEADER } { FORCE_QUOTED } { FLATTEN } { TRIM } -- JSON + XML options { FILE_PER_ROW } -- XML options { ROOT_NODE '...' } { ELEMENT '...' } -- Raw option { COLUMN '<field>' } ;
FORMAT |
The file format to use: PARQUET, CSV, JSON, XML, RAW |
DATE_FIELD |
The field to use from the pipepine data set as the date/time value when using date tokens in the FILE or CONTAINER name; if not specified, uses the current run date of the job pipeline |
CONTAINER |
Overrides the target folder, container or bucket to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly |
FILE |
The target file name of file name pattern to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly |
COMPRESSION |
When using the PARQUET format, specifies the compression to use (default: SNAPPY): 'NONE', 'SNAPPY', 'GZIP', 'LZ4RAW', 'ZSTD', 'BROTLI', 'LZO', 'LZ4' |
ROWS_PER_GROUP |
When using the PARQUET format, determines the number of rows per group in the parquet file (default: 5000) |
DELIMITER |
Column delimiter to use (for CSV only) |
FIXED_LENGTH |
Comma-separated list of fixed lengths when creating a fixed-length file; if specified, the DELIMITER option is ignored (for CSV only) |
DATE_FORMAT |
The date format to use when a column is a date/time data type; any valid formatter is accepted (ex: 'o' or 'yyyyMMdd HH:mm:ss.fff') (for CSV only) |
ADD_HEADER |
Adds a header row to the file (for CSV only) |
FORCE_QUOTED |
Adds double-quotes to field names and values (for CSV only) |
FLATTEN |
Removes line feeds from strings (for CSV only) |
TRIM |
Trims string values (for CSV only) |
FILE_PER_ROW |
Creates a single file per row of data (for JSON/XML only) |
ROOT_NODE |
Uses the root not name specified; default: root (for XML only) |
ELEMENT |
Uses the element name specified for each record; default: item (for XML only) |
COLUMN |
Uses the column name specified as the content to save (for RAW only) |
REPLACE |
Replaces the outcome of this operation as the new pipeline data set |
Example 1
-- Get data from an RSS Feed SELECT * FROM HTTP [Rss Feed] (GET /econi.xml) APPLY TX (--item); -- Only capture newly created feeds since the last call -- Note: the capture may contains hundreds of records CAPTURE 'rssdata' INSERT ON KEYS 'guid' WITH PATH [adls] '/logs' ; -- Dump the data in a single Parquet file -- and replaces the pipeline data with the outcome of this operation -- since the PUSH operation should execute once for the parquet file SINK INTO DRIVE [adls] FORMAT 'PARQUET' CONTAINER 'inbound' FILE '/load/data_@executionid.parquet' COMPRESSION 'snappy' REPLACE ; -- Push the above parquet file into an Azure Fabric data warehouse -- With the REPLACE operation above, the pipeline now only has 1 record -- so this operation will be executed only once PUSH INTO DB [mywarehouse] -- specifying the table here allows DataZen to create the target table -- if it doesn't exist TABLE 'rsstable' ON_UPSERT ( COPY INTO [rsstable] FROM 'https:--myadlsendpoint.dfs.core.windows.net/inbound/load/data_@executionid.parquet' WITH ( FILE_TYPE = 'PARQUET', CREDENTIAL=(IDENTITY= 'Shared Access Signature', SECRET='#getvar(adls-sas)'), COMPRESSION = 'Snappy' ) );