Common data loading patterns using COPY INTO
Learn common patterns for using COPY INTO
to load data from file sources into Delta Lake.
There are many options for using COPY INTO
. You can also use temporary credentials with COPY INTO in combination with these patterns.
See COPY INTO for a full reference of all options.
Create target tables for COPY INTO
COPY INTO
must target an existing Delta table. In Databricks Runtime 11.3 LTS and above, setting the schema for these tables is optional for formats that support schema evolution:
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
Note that to infer the schema with COPY INTO
, you must pass additional options:
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
The following example creates a schemaless Delta table called my_pipe_data
and loads a pipe-delimited CSV with a header:
CREATE TABLE IF NOT EXISTS my_pipe_data;
COPY INTO my_pipe_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'delimiter' = '|',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Load JSON data with COPY INTO
The following example loads JSON data from five files in Azure Data Lake Storage Gen2 (ADLS Gen2) into the Delta table called my_json_data
. This table must be created before COPY INTO
can be executed. If any data had already been loaded from one of the files, the data will not be reloaded for that file.
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
Load Avro data with COPY INTO
The following example loads Avro data in ADLS Gen2 using additional SQL expressions as part of the SELECT
statement.
COPY INTO my_delta_table
FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = AVRO
Load CSV files with COPY INTO
The following example loads CSV files from Azure Data Lake Storage Gen2 under abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path/folder1
into a Delta table.
COPY INTO target_table
FROM (SELECT key, index, textData, 'constant_value'
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
FORMAT_OPTIONS('header' = 'true')
-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
Ignore corrupt files while loading data
If the data you're loading can't be read due to some corruption issue, those files can be skipped by setting ignoreCorruptFiles
to true
in the FORMAT_OPTIONS
.
The result of the COPY INTO
command returns how many files were skipped due to corruption in the num_skipped_corrupt_files
column. This metric also shows up in the operationMetrics
column under numSkippedCorruptFiles
after running DESCRIBE HISTORY
on the Delta table.
Corrupt files aren't tracked by COPY INTO
, so they can be reloaded in a subsequent run if the corruption is fixed. You can see which files are corrupt by running COPY INTO
in VALIDATE
mode.
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')
Note
ignoreCorruptFiles
is available in Databricks Runtime 11.3 LTS and above.