PUSH DRIVE

Overview

The Push Drive operation allows you to send the Upsert and Delete streams to one or more files in any supported drive. Unlike other targets, the Push Drive does not update the content of files; it assumes the target files are immutable.

If a file in the target drive already exists, it will be replaced automatically. In addition, if the container/folder does not exist, it will be created automatically.

Syntax

Saves the pipeline data set into a file (or files) as a push operation.

PUSH INTO DRIVE [CONNECTION] 
    FORMAT < PARQUET | CSV | JSON | XML | RAW >
    
    { DATE_FIELD '<field>' }
    { CONTAINER '...' }
    { FILE '...' }

    -- Parquet options
    { COMPRESSION < 'NONE' | 'SNAPPY' | 'GZIP' | 'LZ4RAW' | 'ZSTD' | 'BROTLI' | 'LZO' | 'LZ4' > }
    { ROWS_PER_GROUP <n> }

    -- CSV options
    { DELIMITER '...' }
    { COMMENT_TOKENS '...' }
    { FIXED_LENGTH '...' }
    { DATE_FORMAT '...' }
    { ADD_HEADER }
    { FORCE_QUOTED }
    { FLATTEN }
    { TRIM }

    -- JSON + XML options
    { FILE_PER_ROW }

    -- XML options
    { ROOT_NODE '...' }
    { ELEMENT '...' }
    
    -- Raw option
    { COLUMN '<field>' }

    { WITH 
        { DISCARD_ON_SUCCESS }
        { DISCARD_ON_ERROR }
        { DLQ_ON_ERROR '...' }
        { RETRY < LINEAR | EXPONENTIAL > (<n>,<n>) }
    }

;

FORMAT

The file format to use: PARQUET, CSV, JSON, XML, RAW

DATE_FIELD

The field to use from the pipepine data set as the date/time value when using date tokens in the FILE or CONTAINER name; if not specified, uses the current run date of the job pipeline

CONTAINER

Overrides the target folder, container or bucket to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly

FILE

The target file name of file name pattern to use; can use pipeline variables, field names, and DataZen functions; when field names are used, the data is automatically partitioned accordingly

COMPRESSION

When using the PARQUET format, specifies the compression to use (default: SNAPPY): 'NONE', 'SNAPPY', 'GZIP', 'LZ4RAW', 'ZSTD', 'BROTLI', 'LZO', 'LZ4'

ROWS_PER_GROUP

When using the PARQUET format, determines the number of rows per group in the parquet file (default: 5000)

DELIMITER

Column delimiter to use (for CSV only)

FIXED_LENGTH

Comma-separated list of fixed lengths when creating a fixed-length file; if specified, the DELIMITER option is ignored (for CSV only)

DATE_FORMAT

The date format to use when a column is a date/time data type; any valid formatter is accepted (ex: 'o' or 'yyyyMMdd HH:mm:ss.fff') (for CSV only)

ADD_HEADER

Adds a header row to the file (for CSV only)

FORCE_QUOTED

Adds double-quotes to field names and values (for CSV only)

FLATTEN

Removes line feeds from strings (for CSV only)

TRIM

Trims string values (for CSV only)

FILE_PER_ROW

Creates a single file per row of data (for JSON/XML only)

ROOT_NODE

Uses the root not name specified; default: root (for XML only)

ELEMENT

Uses the element name specified for each record; default: item (for XML only)

COLUMN

Uses the column name specified as the content to save (for RAW only)

DISCARD_ON_SUCCESS

Deletes the change log after successful completion of the push operation

DISCARD_ON_ERROR

Deletes the change log if the push operation failed

DLQ_ON_ERROR

Moves the change log to a subfolder or directory if the push operation failed

RETRY EXPONENTIAL

Retries the operation on failure up to N times, waiting P seconds exponentially longer every time (N,P)

RETRY LINEAR

Retries the operation on failure up to N times, waiting P seconds every time (N,P)

Example 1


-- Load the next inserts/updates previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT UPDATE DELETE ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD UPSERTS FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Send all changes into a single Parquet file
-- and override the default rows per group used in the file
PUSH INTO DRIVE [adls2.0] 
  FORMAT 'Parquet'
  CONTAINER 'logs'
  FILE 'rss_@executionid.parquet'
  ROWS_PER_GROUP 10000
;

Example 2


-- Load the next inserts/updates previously captured in a cloud folder using
-- CAPTURE 'mycdc' INSERT UPDATE DELETE ON KEYS 'guid' WITH PATH [adls2.0] 'container'

LOAD DELETES FROM [adls2.0] 'mycdc' PATH '/container' KEY_COLUMNS 'guid';

-- Send all deleted records into a single CSV file with header
PUSH INTO DRIVE [adls2.0] 
  FORMAT 'CSV'
  CONTAINER 'logs'
  FILE 'rss_deleted_@executionid.csv'
  DELIMITER ','
  ADD_HEADER
;