适用于 Python 的 Databricks SQL 连接器
适用于 Python 的 Databricks SQL 连接器是一个 Python 库,让你能够使用 Python 代码在 Azure Databricks 群集和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 - Python 数据库 API 规范 v2.0。
注意
Python 的 Databricks SQL 连接器还包括适用于 Azure Databricks 的 SQLAlchemy 方言。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用。
要求
- 运行 Python >=3.8 和<=3.11 的开发计算机。
- Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境。
- 现有群集或 SQL 仓库。
开始使用
通过运行
pip install databricks-sql-connector
或python -m pip install databricks-sql-connector
,在开发计算机上安装适用于 Python 的 Databricks SQL 连接器库。收集想要使用的群集或 SQL 仓库的以下信息:
群集
- 群集的服务器主机名。 从群集的“高级选项”>“JDBC/ODBC”选项卡的“服务器主机名”值中可以获取此主机名。
- 群集的 HTTP 路径。 从群集的“高级选项”>“JDBC/ODBC”选项卡的“HTTP 路径”值中可以获取此路径。
SQL 仓库
身份验证
适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:
适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:
Databricks 个人访问令牌身份验证
要将适用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌。 为此,请遵循适用于工作区用户的 Azure Databricks 个人访问令牌中的步骤。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
,设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。DATABRICKS_TOKEN
,设置为 Azure Databricks 个人访问令牌。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth 计算机到计算机 (M2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 计算机到计算机 (M2M) 身份验证。 还必须安装用于 Python 0.18.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要将 Databricks SQL Connector for Python 与 OAuth M2M 身份验证配合使用,必须执行以下操作:
在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。
若要创建服务主体及其 OAuth 机密,请参阅使用 OAuth (OAuth M2M) 通过服务主体对 Azure Databricks 的访问进行身份验证。 记下服务主体的 UUID 或应用程序 ID 值,以及服务主体的 OAuth 机密的机密值。
授予该服务主体对群集或仓库的访问权限。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。DATABRICKS_CLIENT_ID
,设置为服务主体的 UUID 或应用程序 ID 值。DATABRICKS_CLIENT_SECRET
,设置为服务主体的 OAuth 机密的“机密”值。
若要设置环境变量,请参阅操作系统的文档。
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...
OAuth 用户到计算机 (U2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:
DATABRICKS_SERVER_HOSTNAME
,设置为你的群集或 SQL 仓库的服务器主机名值。DATABRICKS_HTTP_PATH
,设置为你的群集或 SQL 仓库的 HTTP 路径值。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...
示例
以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接,以及配置日志记录。
注意
以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证。
这些代码示例从以下环境变量中检索它们的 server_hostname
、http_path
和 access_token
连接变量值:
DATABRICKS_SERVER_HOSTNAME
,表示要求中的“服务器主机名”值。DATABRICKS_HTTP_PATH
,表示要求中的“HTTP 路径”值。DATABRICKS_TOKEN
,表示要求中的访问令牌。
可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。
查询数据
以下代码示例演示如何调用适用于 Python 的 Databricks SQL 连接器在群集或 SQL 仓库上运行基本 SQL 命令。 此命令返回 samples
目录的 nyctaxi
架构中的trips
表中的前两行。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
插入数据
以下示例演示如何插入少量数据(数千行):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
要插入大量数据,请先将数据上传到云存储,然后执行 COPY INTO 命令。
查询元数据
可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
管理游标和连接
最好关闭不再使用的任何连接和游标。 这可以释放 Azure Databricks 群集和 Databricks SQL 仓库上的资源。
可以使用上下文管理器(在前面示例中使用的 with
语法)来管理资源,或显式调用 close
:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
管理 Unity Catalog 卷中的文件
Databricks SQL 连接器可让你将本地文件写入 Unity Catalog 卷、从卷下载文件以及从卷中删除文件,如下例所示:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
配置日志记录
Databricks SQL 连接器使用 Python 的标准日志记录模块。 可以配置类似于以下内容的日志记录级别:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
测试
若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Python 模拟库(如 unittest.mock)。
例如,如果以下名为“helpers.py
”的文件,其中包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 get_connection_personal_access_token
函数,以及使用连接从 samples
目录的 nyctaxi
架构中的 trips
表中获取指定数目的数据行的 select_nyctaxi_trips
函数:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
另外,假设以下名为 main.py
的文件调用 get_connection_personal_access_token
和 select_nyctaxi_trips
函数:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
以下名为 test_helpers.py
的文件测试 select_nyctaxi_trips
函数是否返回预期的响应。 此测试将模拟 Connection
对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 该测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)
# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data
# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5
select_nyctaxi_trips
函数包含 SELECT
语句,因此不会更改 trips
表的状态,在此示例中并不是一定需要模拟。 但是,模拟让你能够快速运行测试,而无需等待与工作区建立实际连接。 此外,通过模拟,可以多次针对可能更改表状态的函数运行模拟测试,例如 INSERT INTO
、UPDATE
和 DELETE FROM
。
API 参考
程序包
databricks-sql-connector
使用情况:pip install databricks-sql-connector
另请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector。
模块
databricks.sql
使用情况:from databricks import sql
类
选定的类包括:
类 |
---|
Connection 一种 Azure Databricks 计算资源的会话。 |
Cursor 一种遍历数据记录的机制。 |
Row SQL 查询结果中的一行数据。 |
Connection
类
若要创建 Connection
对象,请使用以下参数调用 databricks.sql.connect
方法:
参数 |
---|
server_hostname 类型: str 群集或 SQL 仓库的服务器主机名。 要获取服务器主机名,请参阅本文前面部分的说明。 此参数是必需的。 示例: adb-1234567890123456.7.databricks.azure.cn |
http_path 类型: str 群集或 SQL 仓库的 HTTP 路径。 要获取 HTTP 路径,请参阅本文前面部分的说明。 此参数是必需的。 例如: sql/protocolv1/o/1234567890123456/1234-567890-test123 适用于群集。SQL 仓库的 /sql/1.0/warehouses/a1b234c567d8e9fa 。 |
access_token ,auth_type 类型: str 有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证。 |
session_configuration 类型: dict[str, Any] Spark 会话配置参数的字典。 设置一个等效于使用 SET key=val SQL 命令的配置。 运行 SQL 命令 SET -v 可以获取可用配置的完整列表。默认为 None 。此参数是可选的。 示例: {"spark.sql.variable.substitute": True} |
http_headers 类型: List[Tuple[str, str]]] 在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None 。此参数是可选的。 从版本 2.0 开始 |
catalog 类型: str 用于连接的初始目录。 默认为 None (在这种情况下,将使用默认目录,通常为 hive_metastore )。此参数是可选的。 从版本 2.0 开始 |
schema 类型: str 用于连接的初始架构。 默认为 None (在这种情况下,将使用默认架构 default )。此参数是可选的。 从版本 2.0 开始 |
use_cloud_fetch 类型: bool True 将提取请求直接发送到云对象存储以下载数据区块。 False 默认将提取请求直接发送到 Azure Databricks。如果 use_cloud_fetch 设置为 True 但网络访问被阻止,则提取请求将失败。从版本 2.8 开始 |
选定的 Connection
方法包括:
方法 |
---|
close 关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error 。无参数。 没有返回值。 |
cursor 返回一个新的 Cursor 对象,该对象允许遍历数据库中的各种记录。无参数。 |
Cursor
类
若要创建 Cursor
对象,请调用 Connection
类的 cursor
方法。
选定的 Cursor
属性包括:
特性 |
---|
arraysize 与 fetchmany 方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值为 10000 。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。读写访问。 |
description 包含 tuple 对象的 Python list 。 每个 tuple 对象都包含 7 个值,并且每个 tuple 对象的前 2 个项目包含如下所示的描述单个结果列的信息:- name :列的名称。- type_code :表示列的类型的字符串。 例如,整数列的类型代码为 int 。每个 7 项目 tuple 对象的剩余 5 个项目未实现,并且其值未定义。 它们通常以 4 个None 值的形式返回,后跟一个 True 值。只读访问。 |
选定的 Cursor
方法包括:
方法 |
---|
cancel 中断运行游标启动的任何数据库查询或命令。 要释放服务器上关联的资源,请在 调用 cancel 方法后,调用 close 方法。无参数。 没有返回值。 |
close 关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。 无参数。 没有返回值。 |
execute 准备并运行数据库查询或命令。 无返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2' ) 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2', { 'pickup_zip': '10019' } ) parameters 类型:字典 要与 operation 参数一起使用的参数序列。此参数是可选的。 默认为 None 。 |
executemany 使用 seq_of_parameters 实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。无返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 seq_of_parameters 类型: dict 的 list 与 operation 参数一起使用的多个参数集的序列。此参数是必需的。 |
catalogs 执行有关目录的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 目录的名称。无参数。 无返回值。 从版本 1.0 开始 |
schemas 执行有关架构的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_SCHEM 。 键入:str 。 架构的名称。- 字段名称: TABLE_CATALOG 。 键入:str 。 架构所属的目录。无返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 |
tables 执行有关表和视图的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 表所属的目录。- 字段名称: TABLE_SCHEM 。 键入:str 。 表所属的架构。- 字段名称: TABLE_NAME 。 键入:str 。 表的名称。- 字段名称: TABLE_TYPE 。 键入:str 。 关系类型,例如 VIEW 或 TABLE (适用于 Databricks Runtime 10.4 LTS 和更高版本以及 Databricks SQL;对更低版本的 Databricks Runtime 使用会返回空字符串)。无返回值。 从版本 1.0 开始 参数 catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 table_types 类型: List[str] 要匹配的表类型列表,例如 TABLE 或 VIEW 。此参数是可选的。 |
columns 执行有关列的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括: - 字段名称: TABLE_CAT 。 键入:str 。 列所属的目录。- 字段名称: TABLE_SCHEM 。 键入:str 。 列所属的架构。- 字段名称: TABLE_NAME 。 键入:str 。 列所属的表的名称。- 字段名称: COLUMN_NAME 。 键入:str 。 列的名称。无返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 column_name 类型: str 要检索其相关信息的列名称。 % 字符解释为通配符。此参数是可选的。 |
fetchall 获取查询的所有(或所有剩余)行。 无参数。 以 Python list 形式返回查询的所有(或所有剩余)行,该形式属于 Row 对象。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。 |
fetchmany 获取查询的后续行。 以 Row 对象的 Python list 的形式返回查询后续行中 size 行及以前的行(如果没有指定 size ,则返回 arraysize 属性)。如果要提取的行数少于 size ,将返回所有剩余的行。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany(10) |
fetchone 获取数据集的下一行。 无参数。 将数据集的下一行以 Python tuple 对象的形式作为单个序列返回,或者如果没有更多可用数据,则返回 None 。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。 |
fetchall_arrow 以 PyArrow Table 对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用 fetchmany_arrow ,以减少内存消耗。无参数。 以 PyArrow 表的形式返回查询的所有(或所有剩余)行。 如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。从版本 2.0 开始 |
fetchmany_arrow 以 PyArrow Table 对象的形式获取查询后续行。以 Python PyArrow 的形式返回查询后续行中 size 参数行及以前的行(如果没有指定 size ,则返回 arraysize 属性)Table 对象。如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error 。从版本 2.0 开始 参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany_arrow(10) |
Row
类
行类是一个类似于元组的数据结构,表示单个结果行。
如果行包含名为 "my_column"
的列,则你可以通过 row.my_column
访问 row
的 "my_column"
字段。 还可以使用数字索引来访问字段,例如 row[0]
。
如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"]
来访问。
从版本 1.0 开始
选定的 Row
方法包括:
方法 |
---|
asDict 返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。 无参数。 返回字段的 dict 。 |
类型转换
下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。
Apache Spark SQL 数据类型 | Python 数据类型 |
---|---|
array |
numpy.ndarray |
bigint |
int |
binary |
bytearray |
boolean |
bool |
date |
datetime.date |
decimal |
decimal.Decimal |
double |
float |
int |
int |
map |
str |
null |
NoneType |
smallint |
int |
string |
str |
struct |
str |
timestamp |
datetime.datetime |
tinyint |
int |
疑难解答
tokenAuthWrapperInvalidAccessToken: Invalid access token
消息
问题:运行代码时看到类似于 Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token
的消息。
可能的原因:传递给 access_token
的值不是有效的 Azure Databricks 个人访问令牌。
建议的解决方法:检查传递给 access_token
的值是否正确,然后重试。
gaierror(8, 'nodename nor servname provided, or not known')
消息
问题:运行代码时看到类似于 Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')
的消息。
可能的原因:传递给 server_hostname
的值不是正确的主机名。
建议的解决方法:检查传递给 server_hostname
的值是否正确,然后重试。
有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息。
IpAclError
消息
问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 Error during request to server: IpAclValidation
。
可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。
建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表。
其他资源
有关详细信息,请参阅: