使用增量实时表加载数据
可以使用增量实时表在 Azure Databricks 上从 Apache Spark 支持的任何数据源加载数据。 可以在增量实时表中针对返回 Spark 数据帧的任何查询定义数据集(表和视图),包括流式处理数据帧和 Pandas for Spark 数据帧。 对于数据引入任务,Databricks 建议为大多数用例使用流式表。 流式表非常适合用于通过自动加载程序或 Kafka 等消息总线从云对象存储引入数据。 以下示例演示了一些常用模式。
重要
并非所有数据源都支持 SQL。 可以在增量实时表管道中混合使用 SQL 和 Python 笔记本,以便将 SQL 用于除引入之外的所有操作。
若要详细了解如何使用默认未打包在 Delta Live Tables 中的库,请参阅管理 Delta Live Tables 管道的 Python 依赖项。
从云对象存储加载文件
对于从云对象存储引入数据的大多数任务,Databricks 建议将自动加载程序与增量实时表配合使用。 自动加载程序和增量实时表能够以增量方式和幂等方式加载进入云存储的、不断增长的数据。 以下示例使用自动加载程序从 CSV 和 JSON 文件创建数据集:
注意
若要在启用了 Unity Catalog 的管道中使用自动加载程序加载文件,必须使用外部位置。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用。
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
警告
如果将自动加载程序与文件通知配合使用,并对管道或流式表运行完全刷新,则必须手动清理资源。 可以使用笔记本中的 CloudFilesResourceManager 执行清理。
从消息总线加载数据
可以将增量实时表管道配置为使用流式表从消息总线引入数据。 Databricks 建议将流式表与连续执行和增强型自动缩放相结合,以最高效地引入来自消息总线的低延迟加载。 请参阅使用增强型自动缩放以优化增量实时表管道的群集利用率。
例如,以下代码将流式表配置为从 Kafka 引入数据:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
可以使用纯 SQL 编写下游操作来对这些数据执行流式转换,如以下示例所示:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
有关使用事件中心的示例,请参阅使用 Azure 事件中心作为增量实时表数据源。
请参阅配置流式处理数据源。
从外部系统加载数据
增量实时表支持从 Azure Databricks 所支持的任何数据源加载数据。 请参阅连接到数据源。 也可使用 Lakehouse Federation 为受支持的数据源加载外部数据。 由于 Lakehouse Federation 需要 Databricks Runtime 13.3 LTS 或更高版本,因此,若要使用 Lakehouse Federation,管道必须配置为使用预览通道。
某些数据源在 SQL 中没有等效支持。 如果无法将 Lakehouse 联合身份验证与其中一个数据源配合使用,则可以使用 Python 笔记本从源引入数据。 可以将 Python 和 SQL 源代码添加到同一增量实时表管道。 以下示例声明了一个具体化视图,用于访问远程 PostgreSQL 表中数据的当前状态:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
从云对象存储加载小型或静态数据集
可以使用 Apache Spark 加载语法加载小型或静态数据集。 增量实时表支持 Azure Databricks 上的 Apache Spark 所支持的所有文件格式。 要获取完整列表,请参阅“数据格式”选项。
以下示例演示如何加载 JSON 以创建增量实时表表:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
注意
SELECT * FROM format.`path`;
SQL 构造对于 Azure Databricks 上的所有 SQL 环境是通用的。 这是结合使用 SQL 和增量实时表进行直接文件访问的建议模式。
使用管道中的机密安全访问存储凭据
可以使用 Azure Databricks 机密来存储凭据(例如访问密钥或密码)。 若要在管道中配置机密,请在管道设置群集配置中使用一个 Spark 属性。 请参阅配置增量实时表管道的计算。
以下示例使用机密来存储通过自动加载程序从 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户读取输入数据所需的访问密钥。 同样可以使用这种方法来配置管道所需的任何机密,例如用于访问 S3 的 AWS 密钥,或用于访问 Apache Hive 元存储的密码。
若要详细了解如何使用 Azure Data Lake Storage Gen2,请参阅连接到 Azure Data Lake Storage Gen2 和 Blob 存储。
注意
必须将 spark.hadoop.
前缀添加到用于设置机密值的 spark_conf
配置密钥。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
<storage-account-name>
替换为 ADLS Gen2 存储帐户名称。<scope-name>
替换为 Azure Databricks 机密范围名称。<secret-name>
替换为包含 Azure 存储帐户访问密钥的密钥的名称。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
替换
- 将
<container-name>
替换为用于存储输入数据的 Azure 存储帐户容器的名称。 - 将
<storage-account-name>
替换为 ADLS Gen2 存储帐户名称。 - 将
<path-to-input-dataset>
替换为输入数据集的路径。
从 Azure 事件中心加载数据
Azure 事件中心是一种数据流式处理服务,提供 Apache Kafka 兼容接口。 可以使用 Delta Live Tables 运行时中包含的结构化流式处理 Kafka 连接器从 Azure 事件中心加载消息。 若要详细了解如何从 Azure 事件中心加载和处理消息,请参阅使用 Azure 事件中心作为 Delta Live Tables 数据源。