适用于 Python 的 Databricks SQL 连接器
适用于 Python 的 Databricks SQL 连接器是一个 Python 库,让你能够使用 Python 代码在 Azure Databricks 群集和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 - Python 数据库 API 规范 v2.0。
注意
Python 的 Databricks SQL 连接器还包括适用于 Azure Databricks 的 SQLAlchemy 方言。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用。
要求
- 运行 Python >=3.8 和<=3.11 的开发计算机。
- Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境。
- 现有群集或 SQL 仓库。
开始使用
通过运行
pip install databricks-sql-connector
或python -m pip install databricks-sql-connector
,在开发计算机上安装适用于 Python 的 Databricks SQL 连接器库。收集想要使用的群集或 SQL 仓库的以下信息:
群集
- 群集的服务器主机名。 从群集的“高级选项”>“JDBC/ODBC”选项卡的“服务器主机名”值中可以获取此主机名。
- 群集的 HTTP 路径。 从群集的“高级选项”>“JDBC/ODBC”选项卡的“HTTP 路径”值中可以获取此路径。
SQL 仓库
身份验证
适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:
适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:
Databricks 个人访问令牌身份验证
要将适用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌,如下所示:
- 在 Azure Databricks 工作区中,单击顶部栏中的 Azure Databricks 用户名,然后从下拉列表中选择“设置”。
- 单击“开发人员”。
- 在“访问令牌”旁边,单击“管理”。
- 单击“生成新令牌”。
- (可选)输入有助于将来识别此令牌的注释,并将令牌的默认生存期更改为 90 天。 若要创建没有生存期的令牌(不建议),请将“生存期(天)”框留空(保留空白)。
- 单击“生成” 。
- 将显示的令牌复制到安全位置,然后单击“完成”。
注意
请务必将复制的令牌保存到安全的位置。 请勿与他人共享复制的令牌。 如果丢失了复制的令牌,你将无法重新生成完全相同的令牌, 而必须重复此过程来创建新令牌。 如果丢失了复制的令牌,或者认为令牌已泄露,Databricks 强烈建议通过单击“访问令牌”页上令牌旁边的垃圾桶(撤销)图标立即从工作区中删除该令牌。
如果你无法在工作区中创建或使用令牌,可能是因为工作区管理员已禁用令牌或未授予你创建或使用令牌的权限。 请与工作区管理员联系,或参阅以下主题:
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
,设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。DATABRICKS_TOKEN
,设置为 Azure Databricks 个人访问令牌。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth 计算机到计算机 (M2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 计算机到计算机 (M2M) 身份验证。 还必须安装用于 Python 0.18.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要将 Databricks SQL Connector for Python 与 OAuth M2M 身份验证配合使用,必须执行以下操作:
在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。
若要创建服务主体及其 OAuth 机密,请参阅使用 OAuth (OAuth M2M) 通过服务主体对 Azure Databricks 的访问进行身份验证。 记下服务主体的 UUID 或 应用程序 ID 值,以及服务主体的OAuth 机密的机密值。
授予该服务主体对群集或仓库的访问权限。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。DATABRICKS_CLIENT_ID
,设置为服务主体的 UUID 或应用程序 ID 值。DATABRICKS_CLIENT_SECRET
,设置为服务主体的 OAuth 机密的“机密”值。
若要设置环境变量,请参阅操作系统的文档。
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...
OAuth 用户到计算机 (U2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
,设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...
示例
以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接,以及配置日志记录。
注意
以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证。
这些代码示例从以下环境变量中检索它们的 server_hostname
、http_path
和 access_token
连接变量值:
DATABRICKS_SERVER_HOSTNAME
,表示要求中的“服务器主机名”值。DATABRICKS_HTTP_PATH
,表示要求中的“HTTP 路径”值。DATABRICKS_TOKEN
,表示要求中的访问令牌。
可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。
查询数据
以下代码示例演示如何调用适用于 Python 的 Databricks SQL 连接器在群集或 SQL 仓库上运行基本 SQL 命令。 此命令返回 samples
目录的 nyctaxi
架构中的trips
表中的前两行。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
插入数据
以下示例演示如何插入少量数据(数千行):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
要插入大量数据,请先将数据上传到云存储,然后执行 COPY INTO 命令。
查询元数据
可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
管理游标和连接
最好关闭不再使用的任何连接和游标。 这可以释放 Azure Databricks 群集和 Databricks SQL 仓库上的资源。
可以使用上下文管理器(在前面示例中使用的 with
语法)来管理资源,或显式调用 close
:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
管理 Unity Catalog 卷中的文件
Databricks SQL 连接器可让你将本地文件写入 Unity Catalog 卷、从卷下载文件以及从卷中删除文件,如下例所示:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
配置日志记录
Databricks SQL 连接器使用 Python 的标准日志记录模块。 可以配置类似于以下内容的日志记录级别:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
测试
若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Python 模拟库(如 unittest.mock)。
例如,如果以下名为“helpers.py
”的文件,其中包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 get_connection_personal_access_token
函数,以及使用连接从 samples
目录的 nyctaxi
架构中的 trips
表中获取指定数目的数据行的 select_nyctaxi_trips
函数:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
另外,假设以下名为 main.py
的文件调用 get_connection_personal_access_token
和 select_nyctaxi_trips
函数:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
以下名为 test_helpers.py
的文件测试 select_nyctaxi_trips
函数是否返回预期的响应。 此测试将模拟 Connection
对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 该测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)
# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data
# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5
select_nyctaxi_trips
函数包含 SELECT
语句,因此不会更改 trips
表的状态,在此示例中并不是一定需要模拟。 但是,模拟让你能够快速运行测试,而无需等待与工作区建立实际连接。 此外,通过模拟,可以多次针对可能更改表状态的函数运行模拟测试,例如 INSERT INTO
、UPDATE
和 DELETE FROM
。
API 参考
程序包
databricks-sql-connector
使用情况:pip install databricks-sql-connector
另请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector。
模块
databricks.sql
使用情况:from databricks import sql
类
选定的类包括:
类 |
---|
Connection 一种 Azure Databricks 计算资源的会话。 |
Cursor 一种遍历数据记录的机制。 |
Row SQL 查询结果中的一行数据。 |
Connection
类
若要创建 Connection
对象,请使用以下参数调用 databricks.sql.connect
方法:
参数 |
---|
server_hostname 类型: str 群集或 SQL 仓库的服务器主机名。 要获取服务器主机名,请参阅本文前面部分的说明。 此参数是必需的。 示例: adb-1234567890123456.7.databricks.azure.cn |
http_path 类型: str 群集或 SQL 仓库的 HTTP 路径。 要获取 HTTP 路径,请参阅本文前面部分的说明。 此参数是必需的。 例如: sql/protocolv1/o/1234567890123456/1234-567890-test123 适用于群集。SQL 仓库的 /sql/1.0/warehouses/a1b234c567d8e9fa 。 |
access_token , auth_type 类型: str 有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证。 |
session_configuration 类型: dict[str, Any] Spark 会话配置参数的字典。 设置一个等效于使用 SET key=val SQL 命令的配置。 运行 SQL 命令 SET -v 可以获取可用配置的完整列表。默认为 None 。此参数是可选的。 示例: {"spark.sql.variable.substitute": True} |
http_headers 类型: List[Tuple[str, str]]] 在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None 。此参数是可选的。 从版本 2.0 开始 |
catalog 类型: str 用于连接的初始目录。 默认为 None (在这种情况下,将使用默认目录,通常为 hive_metastore )。此参数是可选的。 从版本 2.0 开始 |
schema 类型: str 用于连接的初始架构。 默认为 None (在这种情况下,将使用默认架构 default )。此参数是可选的。 从版本 2.0 开始 |
use_cloud_fetch 类型: bool True 将提取请求直接发送到云对象存储以下载数据区块。 False 默认将提取请求直接发送到 Azure Databricks。如果 use_cloud_fetch 设置为 True 但网络访问被阻止,则提取请求将失败。从版本 2.8 开始 |
选定的 Connection
方法包括:
方法 |
---|
close 关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error 。无参数。 没有返回值。 |
cursor 返回一个新的 Cursor 对象,该对象允许遍历数据库中的各种记录。无参数。 |
Cursor
类
若要创建 Cursor
对象,请调用 Connection
类的 cursor
方法。
选定的 Cursor
属性包括:
特性 |
---|
arraysize 与 fetchmany 方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值为 10000 。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。读写访问。 |
description 包含 tuple 对象的 Python list 。 每个 tuple 对象都包含 7 个值,并且每个 tuple 对象的前 2 个项目包含如下所示的描述单个结果列的信息:- name :列的名称。- type_code :表示列的类型的字符串。 例如,整数列的类型代码为 int 。每个 7 项目 tuple 对象的剩余 5 个项目未实现,并且其值未定义。 它们通常以 4 个None 值的形式返回,后跟一个 True 值。只读访问。 |
选定的 Cursor
方法包括:
方法 |
---|
cancel 中断运行游标启动的任何数据库查询或命令。 要释放服务器上关联的资源,请在 调用 cancel 方法后,调用 close 方法。无参数。 没有返回值。 |
close 关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。 无参数。 没有返回值。 |
execute 准备并运行数据库查询或命令。 无返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2' ) 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2', { 'pickup_zip': '10019' } ) parameters 类型:字典 要与 operation 参数一起使用的参数序列。此参数是可选的。 默认为 None 。 |
executemany 使用 seq_of_parameters 实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。无返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 seq_of_parameters 类型: dict 的 list 与 operation 参数一起使用的多个参数集的序列。此参数是必需的。 |
catalogs 执行有关目录的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 目录的名称。无参数。 无返回值。 从版本 1.0 开始 |
schemas 执行有关架构的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_SCHEM 。 键入:str 。 架构的名称。- 字段名称: TABLE_CATALOG 。 键入:str 。 架构所属的目录。无返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 |
tables 执行有关表和视图的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 表所属的目录。- 字段名称: TABLE_SCHEM 。 键入:str 。 表所属的架构。- 字段名称: TABLE_NAME 。 键入:str 。 表的名称。- 字段名称: TABLE_TYPE 。 键入:str 。 关系类型,例如 VIEW 或 TABLE (适用于 Databricks Runtime 10.4 LTS 和更高版本以及 Databricks SQL;对更低版本的 Databricks Runtime 使用会返回空字符串)。无返回值。 从版本 1.0 开始 参数 catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 table_types 类型: List[str] 要匹配的表类型列表,例如 TABLE 或 VIEW 。此参数是可选的。 |
columns 执行有关列的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 列所属的目录。- 字段名称: TABLE_SCHEM 。 键入:str 。 列所属的架构。- 字段名称: TABLE_NAME 。 键入:str 。 列所属的表的名称。- 字段名称: COLUMN_NAME 。 键入:str 。 列的名称。无返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 column_name 类型: str 要检索其相关信息的列名称。 % 字符解释为通配符。此参数是可选的。 |
fetchall 获取查询的所有(或所有剩余)行。 无参数。 以 Python list 形式返回查询的所有(或所有剩余)行,该形式属于 Row 对象。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。 |
fetchmany 获取查询的后续行。 以 Row 对象的 Python list 的形式返回查询后续行中 size 行及以前的行(如果没有指定 size ,则返回 arraysize 属性)。如果要提取的行数少于 size ,将返回所有剩余的行。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany(10) |
fetchone 获取数据集的下一行。 无参数。 将数据集的下一行以 Python tuple 对象的形式作为单个序列返回,或者如果没有更多可用数据,则返回 None 。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。 |
fetchall_arrow 以 PyArrow Table 对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用 fetchmany_arrow ,以减少内存消耗。无参数。 以 PyArrow 表的形式返回查询的所有(或所有剩余)行。 如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。从版本 2.0 开始 |
fetchmany_arrow 以 PyArrow Table 对象的形式获取查询后续行。以 Python PyArrow 的形式返回查询后续行中 size 参数行及以前的行(如果没有指定 size ,则返回 arraysize 属性)Table 对象。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。从版本 2.0 开始 参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany_arrow(10) |
Row
类
行类是一个类似于元组的数据结构,表示单个结果行。
如果行包含名为 "my_column"
的列,则你可以通过 row.my_column
访问 row
的 "my_column"
字段。 还可以使用数字索引来访问字段,例如 row[0]
。
如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"]
来访问。
从版本 1.0 开始
选定的 Row
方法包括:
方法 |
---|
asDict 返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。 无参数。 返回字段的 dict 。 |
类型转换
下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。
Apache Spark SQL 数据类型 | Python 数据类型 |
---|---|
array |
numpy.ndarray |
bigint |
int |
binary |
bytearray |
boolean |
bool |
date |
datetime.date |
decimal |
decimal.Decimal |
double |
float |
int |
int |
map |
str |
null |
NoneType |
smallint |
int |
string |
str |
struct |
str |
timestamp |
datetime.datetime |
tinyint |
int |
疑难解答
tokenAuthWrapperInvalidAccessToken: Invalid access token
消息
问题:运行代码时看到类似于 Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token
的消息。
可能的原因:传递给 access_token
的值不是有效的 Azure Databricks 个人访问令牌。
建议的解决方法:检查传递给 access_token
的值是否正确,然后重试。
gaierror(8, 'nodename nor servname provided, or not known')
消息
问题:运行代码时看到类似于 Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')
的消息。
可能的原因:传递给 server_hostname
的值不是正确的主机名。
建议的解决方法:检查传递给 server_hostname
的值是否正确,然后重试。
有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息。
IpAclError
消息
问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 Error during request to server: IpAclValidation
。
可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。
建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表。
其他资源
有关详细信息,请参阅: