从 Git 文件夹或工作区文件导入 Python 模块
可以将 Python 代码存储在 Databricks Git 文件夹或工作区文件中,然后将该 Python 代码导入到增量实时表管道中。 有关在 Databricks Git 文件夹或工作区文件中使用模块的详细信息,请参阅使用 Python 和 R 模块。
注意
无法从存储在 Databricks Git 文件夹或工作区文件中的笔记本导入源代码。 可以在创建或编辑管道时直接添加笔记本。 请参阅配置增量实时表管道。
将 Python 模块导入到增量实时表管道
以下示例演示如何从工作区文件将数据集查询作为 Python 模块导入。 尽管此示例说明了如何使用工作区文件来存储管道源代码,但你可以将其与存储在 Git 文件夹中的源代码配合使用。
若要运行此示例,请使用以下步骤:
单击 Azure Databricks 工作区边栏中的 “工作区”以打开工作区浏览器。
使用工作区浏览器选择 Python 模块的目录。
单击所选目录最右侧列中的 ,然后单击“创建”>“文件”。
输入文件的名称,例如
clickstream_raw_module.py
。 文件编辑器随即打开。 若要创建一个用于将源数据读入表中的模块,请在编辑器窗口中输入以下内容:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
若要创建一个模块来创建包含已准备好的数据的新表,请在同一目录中创建一个新文件,输入该文件的名称(例如
clickstream_prepared_module.py
),然后在新编辑器窗口中输入以下内容:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
接下来,创建一个管道笔记本。 转到 Azure Databricks 登陆页,选择“创建笔记本”,或单击边栏中的“新建”,然后选择“笔记本”。 还可以在工作区浏览器中创建笔记本,方法是单击 ,然后单击“创建”>“笔记本”。
为笔记本命名,并确认“Python”是默认语言。
单击 “创建” 。
在笔记本中输入示例代码。
注意
如果笔记本从工作区文件路径或 Git 文件夹路径导入与笔记本目录不同的模块或包,则必须使用
sys.path.append()
手动将路径追加到文件。如果从 Git 文件夹导入文件,则必须在路径前面添加
/Workspace/
。 例如sys.path.append('/Workspace/...')
。 省略路径中的/Workspace/
会导致错误。如果模块或包存储在笔记本所在的同一目录中,则无需手动追加路径。 从 Git 文件夹的根目录导入时,也不需要手动追加路径,因为根目录会自动追加到路径。
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
将
<module-path>
替换为包含要导入的 Python 模块的目录的路径。使用新笔记本创建管道。
若要运行管道,请在“管道详细信息”页中单击“启动”。
还可以将 Python 代码作为包导入。 增量实时表笔记本中的以下代码片段从笔记本所在目录中的 dlt_packages
目录导入 test_utils
包。 dlt_packages
目录包含文件 test_utils.py
和 __init__.py
,test_utils.py
定义函数 create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)