Load and process data incrementally with Delta Live Tables flows
This article explains what flows are and how you can use flows in Delta Live Tables pipelines to incrementally process data from a source to a target streaming table. In Delta Live Tables, flows are defined in two ways:
- A flow is defined automatically when you create a query that updates a streaming table.
- Delta Live Tables also provides functionality to explicitly define flows for more complex processing such as appending to a streaming table from multiple streaming sources.
This article discusses the implicit flows that are created when you define a query to update a streaming table, and then provides details on the syntax to define more complex flows.
What is a flow?
In Delta Live Tables, a flow is a streaming query that processes source data incrementally to update a target streaming table. Most Delta Live Tables datasets you create in a pipeline define the flow as part of the query and do not require explicitly defining the flow. For example, you create a streaming table in Delta Live Tables in a single DDL command instead of using separate table and flow statements to create the streaming table:
Note
This CREATE FLOW
example is provided for illustrative purposes only and includes keywords that are not valid Delta Live Tables syntax.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
In addition to the default flow defined by a query, the Delta Live Tables Python and SQL interfaces provide append flow functionality. Append flow supports processing that requires reading data from multiple streaming sources to update a single streaming table. For example, you can use append flow functionality when you have an existing streaming table and flow and want to add a new streaming source that writes to this existing streaming table.
Use append flow to write to a streaming table from multiple source streams
Note
To use append flow processing, your pipeline must be configured to use the preview channel.
Use the @append_flow
decorator in the Python interface or the CREATE FLOW
clause in the SQL interface to write to a streaming table from multiple streaming sources. Use append flow for processing tasks such as the following:
- Add streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table combining regional data from every region you operate in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh. See Example: Write to a streaming table from multiple Kafka topics.
- Update a streaming table by appending missing historical data (backfilling). For example, you have an existing streaming table that is written to by an Apache Kafka topic. You also have historical data stored in a table that you need inserted exactly once into the streaming table, and you cannot stream the data because your processing includes performing a complex aggregation before inserting the data. See Example: Run a one-time data backfill.
- Combine data from multiple sources and write to a single streaming table instead of using the
UNION
clause in a query. Using append flow processing instead ofUNION
allows you to update the target table incrementally without running a full refresh update. See Example: Use append flow processing instead of UNION.
The target for the records output by the append flow processing can be an existing table or a new table. For Python queries, use the create_streaming_table() function to create a target table.
Important
- If you need to define data quality constraints with expectations, define the expectations on the target table as part of the
create_streaming_table()
function or on an existing table definition. You cannot define expectations in the@append_flow
definition. - Flows are identified by a flow name, and this name is used to identify streaming checkpoints. The use of the flow name to identify the checkpoint means the following:
- If an existing flow in a pipeline is renamed, the checkpoint does not carry over, and the renamed flow is effectively an entirely new flow.
- You cannot reuse a flow name in a pipeline, because the existing checkpoint won't match the new flow definition.
The following is the syntax for @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Example: Write to a streaming table from multiple Kafka topics
The following examples creates a streaming table named kafka_target
and writes to that streaming table from two Kafka topics:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
To learn more about the read_kafka()
table-valued function used in the SQL queries, see read_kafka in the SQL language reference.
Example: Run a one-time data backfill
The following examples run a query to append historical data to a streaming table:
Note
To ensure a true one-time backfill when the backfill query is part of a pipeline that runs on a scheduled basis or continuously, remove the query after running the pipeline once. To append new data if it arrives in the backfill directory, leave the query in place.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Example: Use append flow processing instead of UNION
Instead of using a query with a UNION
clause, you can use append flow queries to combine multiple sources and write to a single streaming table. Using append flow queries instead of UNION
allows you to append to a streaming table from multiple sources without running a full refresh.
The following Python example includes a query that combines multiple data sources with a UNION
clause:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
The following examples replace the UNION
query with append flow queries:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);