适用于 Python 的 Databricks SQL 连接器

适用于 Python 的 Databricks SQL 连接器是一个 Python 库,让你能够使用 Python 代码在 Azure Databricks 群集和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 - Python 数据库 API 规范 v2.0

注意

Python 的 Databricks SQL 连接器还包括适用于 Azure Databricks 的 SQLAlchemy 方言。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用

要求

  • 运行 Python >=3.8 和<=3.11 的开发计算机。
  • Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境
  • 现有群集SQL 仓库

开始使用

  • 通过运行 pip install databricks-sql-connectorpython -m pip install databricks-sql-connector,在开发计算机上安装适用于 Python 的 Databricks SQL 连接器库。

  • 收集想要使用的群集或 SQL 仓库的以下信息:

    群集

    SQL 仓库

    • SQL 仓库的服务器主机名。 从 SQL 仓库的“连接详细信息”选项卡的“服务器主机名”值中可以获取此主机名。
    • SQL 仓库的 HTTP 路径。 从 SQL 仓库的“连接详细信息”选项卡的“HTTP 路径”值中可以获取此路径。

身份验证

适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:

适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:

Databricks 个人访问令牌身份验证

要将适用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌。 为此,请遵循适用于工作区用户的 Azure Databricks 个人访问令牌中的步骤。

要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。
  • DATABRICKS_TOKEN,设置为 Azure Databricks 个人访问令牌。

若要设置环境变量,请参阅操作系统的文档。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuth 计算机到计算机 (M2M) 身份验证

适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 计算机到计算机 (M2M) 身份验证。 还必须安装用于 Python 0.18.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdkpython -m pip install databricks-sdk)。

要将 Databricks SQL Connector for Python 与 OAuth M2M 身份验证配合使用,必须执行以下操作:

  1. 在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。

    若要创建服务主体及其 OAuth 机密,请参阅使用 OAuth (OAuth M2M) 通过服务主体对 Azure Databricks 的访问进行身份验证。 记下服务主体的 UUID应用程序 ID 值,以及服务主体的 OAuth 机密的机密值。

  2. 授予该服务主体对群集或仓库的访问权限。

    若要向服务主体授予访问群集或仓库的权限,请参阅计算权限管理 SQL 仓库

要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME 设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。
  • DATABRICKS_CLIENT_ID,设置为服务主体的 UUID应用程序 ID 值。
  • DATABRICKS_CLIENT_SECRET,设置为服务主体的 OAuth 机密的“机密”值。

若要设置环境变量,请参阅操作系统的文档。

from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os

server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")

def credential_provider():
  config = Config(
    host          = f"https://{server_hostname}",
    client_id     = os.getenv("DATABRICKS_CLIENT_ID"),
    client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
  return oauth_service_principal(config)

with sql.connect(server_hostname      = server_hostname,
                 http_path            = os.getenv("DATABRICKS_HTTP_PATH"),
                 credentials_provider = credential_provider) as connection:
# ...

OAuth 用户到计算机 (U2M) 身份验证

适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdkpython -m pip install databricks-sdk)。

要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。

若要设置环境变量,请参阅操作系统的文档。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 auth_type       = "databricks-oauth") as connection:
# ...

示例

以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接,以及配置日志记录。

注意

以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证

这些代码示例从以下环境变量中检索它们的 server_hostnamehttp_pathaccess_token 连接变量值:

  • DATABRICKS_SERVER_HOSTNAME,表示要求中的“服务器主机名”值。
  • DATABRICKS_HTTP_PATH,表示要求中的“HTTP 路径”值。
  • DATABRICKS_TOKEN,表示要求中的访问令牌。

可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。

查询数据

以下代码示例演示如何调用适用于 Python 的 Databricks SQL 连接器在群集或 SQL 仓库上运行基本 SQL 命令。 此命令返回 samples 目录的 nyctaxi 架构中的trips 表中的前两行。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
    result = cursor.fetchall()

    for row in result:
      print(row)

插入数据

以下示例演示如何插入少量数据(数千行):

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])

    cursor.execute(f"INSERT INTO squares VALUES {values}")

    cursor.execute("SELECT * FROM squares LIMIT 10")

    result = cursor.fetchall()

    for row in result:
      print(row)

要插入大量数据,请先将数据上传到云存储,然后执行 COPY INTO 命令。

查询元数据

可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())

管理游标和连接

最好关闭不再使用的任何连接和游标。 这可以释放 Azure Databricks 群集和 Databricks SQL 仓库上的资源。

可以使用上下文管理器(在前面示例中使用的 with 语法)来管理资源,或显式调用 close

from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

管理 Unity Catalog 卷中的文件

Databricks SQL 连接器可让你将本地文件写入 Unity Catalog 、从卷下载文件以及从卷中删除文件,如下例所示:

from databricks import sql
import os

# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname            = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path                  = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token               = os.getenv("DATABRICKS_TOKEN"),
                 staging_allowed_local_path = "/tmp/") as connection:

  with connection.cursor() as cursor:

    # Write a local file to the specified path in a volume.
    # Specify OVERWRITE to overwrite any existing file in that path.
    cursor.execute(
      "PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
    )

    # Download a file from the specified path in a volume.
    cursor.execute(
      "GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
    )

    # Delete a file from the specified path in a volume.
    cursor.execute(
      "REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
    )

配置日志记录

Databricks SQL 连接器使用 Python 的标准日志记录模块。 可以配置类似于以下内容的日志记录级别:

from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
                    level    = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
   logging.debug(row)

cursor.close()
connection.close()

测试

若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Python 模拟库(如 unittest.mock)。

例如,如果以下名为“helpers.py”的文件,其中包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 get_connection_personal_access_token 函数,以及使用连接从 samples 目录的 nyctaxi 架构中的 trips 表中获取指定数目的数据行的 select_nyctaxi_trips 函数:

# helpers.py

from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor

def get_connection_personal_access_token(
  server_hostname: str,
  http_path: str,
  access_token: str
) -> Connection:
  return sql.connect(
    server_hostname = server_hostname,
    http_path = http_path,
    access_token = access_token
  )

def select_nyctaxi_trips(
  connection: Connection,
  num_rows: int
) -> List[Row]:
  cursor: Cursor = connection.cursor()
  cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  result: List[Row] = cursor.fetchall()
  return result

另外,假设以下名为 main.py 的文件调用 get_connection_personal_access_tokenselect_nyctaxi_trips 函数:

# main.py

from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips

connection: Connection = get_connection_personal_access_token(
  server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
  http_path = os.getenv("DATABRICKS_HTTP_PATH"),
  access_token = os.getenv("DATABRICKS_TOKEN")
)

rows: List[Row] = select_nyctaxi_trips(
  connection = connection,
  num_rows = 2
)

for row in rows:
  print(row)

以下名为 test_helpers.py 的文件测试 select_nyctaxi_trips 函数是否返回预期的响应。 此测试将模拟 Connection 对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 该测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。

# test_helpers.py

import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec

@pytest.fixture
def mock_data() -> List[Row]:
  return [
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
      tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
      trip_distance = 4.94,
      fare_amount = 19.0,
      pickup_zip = 10282,
      dropoff_zip = 10171
    ),
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
      tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
      trip_distance = 0.28,
      fare_amount = 3.5,
      pickup_zip = 10110,
      dropoff_zip = 10110
    )
  ]

def test_select_nyctaxi_trips(mock_data: List[Row]):
  # Create a mock Connection.
  mock_connection = create_autospec(Connection)

  # Set the mock Connection's cursor().fetchall() to the mock data.
  mock_connection.cursor().fetchall.return_value = mock_data

  # Call the real function with the mock Connection.
  response: List[Row] = select_nyctaxi_trips(
    connection = mock_connection,
    num_rows = 2)

  # Check the value of one of the mocked data row's columns.
  assert response[1].fare_amount == 3.5

select_nyctaxi_trips 函数包含 SELECT 语句,因此不会更改 trips 表的状态,在此示例中并不是一定需要模拟。 但是,模拟让你能够快速运行测试,而无需等待与工作区建立实际连接。 此外,通过模拟,可以多次针对可能更改表状态的函数运行模拟测试,例如 INSERT INTOUPDATEDELETE FROM

API 参考

程序包

databricks-sql-connector

使用情况:pip install databricks-sql-connector

另请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector

模块

databricks.sql

使用情况:from databricks import sql

选定的类包括:

Connection

一种 Azure Databricks 计算资源的会话。
Cursor

一种遍历数据记录的机制。
Row

SQL 查询结果中的一行数据。

Connection

若要创建 Connection 对象,请使用以下参数调用 databricks.sql.connect 方法:

参数
server_hostname

类型:str

群集或 SQL 仓库的服务器主机名。 要获取服务器主机名,请参阅本文前面部分的说明。

此参数是必需的。

示例: adb-1234567890123456.7.databricks.azure.cn
http_path

类型:str

群集或 SQL 仓库的 HTTP 路径。 要获取 HTTP 路径,请参阅本文前面部分的说明。

此参数是必需的。

例如:
sql/protocolv1/o/1234567890123456/1234-567890-test123 适用于群集。
SQL 仓库的 /sql/1.0/warehouses/a1b234c567d8e9fa
access_tokenauth_type

类型:str

有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证
session_configuration

类型:dict[str, Any]

Spark 会话配置参数的字典。 设置一个等效于使用 SET key=val SQL 命令的配置。 运行 SQL 命令 SET -v 可以获取可用配置的完整列表。

默认为 None

此参数是可选的。

示例: {"spark.sql.variable.substitute": True}
http_headers

类型:List[Tuple[str, str]]]

在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None

此参数是可选的。

从版本 2.0 开始
catalog

类型:str

用于连接的初始目录。 默认为 None(在这种情况下,将使用默认目录,通常为 hive_metastore)。

此参数是可选的。

从版本 2.0 开始
schema

类型:str

用于连接的初始架构。 默认为 None(在这种情况下,将使用默认架构 default)。

此参数是可选的。

从版本 2.0 开始
use_cloud_fetch

类型:bool

True 将提取请求直接发送到云对象存储以下载数据区块。 False 默认将提取请求直接发送到 Azure Databricks。

如果 use_cloud_fetch 设置为 True 但网络访问被阻止,则提取请求将失败。

从版本 2.8 开始

选定的 Connection 方法包括:

方法
close

关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error

无参数。

没有返回值。
cursor

返回一个新的 Cursor 对象,该对象允许遍历数据库中的各种记录。

无参数。

Cursor

若要创建 Cursor 对象,请调用 Connection 类的 cursor 方法。

选定的 Cursor 属性包括:

特性
arraysize

fetchmany 方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值为 10000。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。

读写访问。
description

包含 tuple 对象的 Python list。 每个 tuple 对象都包含 7 个值,并且每个 tuple 对象的前 2 个项目包含如下所示的描述单个结果列的信息:

- name:列的名称。
- type_code:表示列的类型的字符串。 例如,整数列的类型代码为 int

每个 7 项目 tuple 对象的剩余 5 个项目未实现,并且其值未定义。 它们通常以 4 个
None 值的形式返回,后跟一个 True 值。

只读访问。

选定的 Cursor 方法包括:

方法
cancel

中断运行游标启动的任何数据库查询或命令。 要释放服务器上关联的资源,请在
调用 cancel 方法后,调用 close 方法。

无参数。

没有返回值。
close

关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。

无参数。

没有返回值。
execute

准备并运行数据库查询或命令。

无返回值。

参数:

operation

类型:str

要准备并运行的查询或命令。

此参数是必需的。

不使用 parameters 参数的示例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2'
)

不使用 parameters 参数的示例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2',
{ 'pickup_zip': '10019' }
)

parameters

类型:字典

要与 operation 参数一起使用的参数序列。

此参数是可选的。 默认为 None
executemany

使用 seq_of_parameters 实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。

无返回值。

参数:

operation

类型:str

要准备并运行的查询或命令。

此参数是必需的。

seq_of_parameters

类型:dictlist


operation 参数一起使用的多个参数集的序列。

此参数是必需的。
catalogs

执行有关目录的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。

结果集中的重要字段包括:

- 字段名称:TABLE_CAT。 键入:str。 目录的名称。

无参数。

无返回值。

从版本 1.0 开始
schemas

执行有关架构的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。

结果集中的重要字段包括:

- 字段名称:TABLE_SCHEM。 键入:str。 架构的名称。
- 字段名称:TABLE_CATALOG。 键入:str。 架构所属的目录。

无返回值。

从版本 1.0 开始

参数:

catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。

schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。
tables

执行有关表和视图的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。

结果集中的重要字段包括:

- 字段名称:TABLE_CAT。 键入:str。 表所属的目录。
- 字段名称:TABLE_SCHEM。 键入:str。 表所属的架构。
- 字段名称:TABLE_NAME。 键入:str。 表的名称。
- 字段名称:TABLE_TYPE。 键入:str。 关系类型,例如 VIEWTABLE(适用于 Databricks Runtime 10.4 LTS 和更高版本以及 Databricks SQL;对更低版本的 Databricks Runtime 使用会返回空字符串)。

无返回值。

从版本 1.0 开始

参数

catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。

schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。

table_name

类型:str

要检索其相关信息的表名称。 % 字符解释为通配符。

此参数是可选的。

table_types

类型:List[str]

要匹配的表类型列表,例如 TABLEVIEW

此参数是可选的。
columns

执行有关列的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。

结果集中的重要字段包括:

- 字段名称:TABLE_CAT。 键入:str。 列所属的目录。
- 字段名称:TABLE_SCHEM。 键入:str。 列所属的架构。
- 字段名称:TABLE_NAME。 键入:str。 列所属的表的名称。
- 字段名称:COLUMN_NAME。 键入:str。 列的名称。

无返回值。

从版本 1.0 开始

参数:

catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。

schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。

table_name

类型:str

要检索其相关信息的表名称。 % 字符解释为通配符。

此参数是可选的。

column_name

类型:str

要检索其相关信息的列名称。 % 字符解释为通配符。

此参数是可选的。
fetchall

获取查询的所有(或所有剩余)行。

无参数。

以 Python list 形式返回查询的所有(或所有剩余)行,
该形式属于 Row 对象。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error
fetchmany

获取查询的后续行。

Row 对象的 Python list 的形式返回查询后续行中 size 行及以前的行(如果没有指定 size,则返回 arraysize 属性)。

如果要提取的行数少于 size,将返回所有剩余的行。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

参数:

size

类型:int

要获取的后续行数。

此参数是可选的。 如果未指定,则使用 arraysize 属性的值。

示例: cursor.fetchmany(10)
fetchone

获取数据集的下一行。

无参数。

将数据集的下一行以 Python
tuple 对象的形式作为单个序列返回,或者如果没有更多可用数据,则返回 None

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error
fetchall_arrow

以 PyArrow Table 对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用 fetchmany_arrow,以减少内存消耗。

无参数。

以 PyArrow 表的形式返回查询的所有(或所有剩余)行。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

从版本 2.0 开始
fetchmany_arrow

以 PyArrow Table 对象的形式获取查询后续行。

以 Python PyArrow 的形式返回查询后续行中 size 参数行及以前的行(如果没有指定 size,则返回 arraysize 属性)
Table 对象。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

从版本 2.0 开始

参数:

size

类型:int

要获取的后续行数。

此参数是可选的。 如果未指定,则使用 arraysize 属性的值。

示例: cursor.fetchmany_arrow(10)

Row

行类是一个类似于元组的数据结构,表示单个结果行。 如果行包含名为 "my_column" 的列,则你可以通过 row.my_column 访问 row"my_column" 字段。 还可以使用数字索引来访问字段,例如 row[0]。 如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"] 来访问。

从版本 1.0 开始

选定的 Row 方法包括:

方法
asDict

返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。

无参数。

返回字段的 dict

类型转换

下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。

Apache Spark SQL 数据类型 Python 数据类型
array numpy.ndarray
bigint int
binary bytearray
boolean bool
date datetime.date
decimal decimal.Decimal
double float
int int
map str
null NoneType
smallint int
string str
struct str
timestamp datetime.datetime
tinyint int

疑难解答

tokenAuthWrapperInvalidAccessToken: Invalid access token 消息

问题:运行代码时看到类似于 Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token 的消息。

可能的原因:传递给 access_token 的值不是有效的 Azure Databricks 个人访问令牌。

建议的解决方法:检查传递给 access_token 的值是否正确,然后重试。

gaierror(8, 'nodename nor servname provided, or not known') 消息

问题:运行代码时看到类似于 Error during request to server: gaierror(8, 'nodename nor servname provided, or not known') 的消息。

可能的原因:传递给 server_hostname 的值不是正确的主机名。

建议的解决方法:检查传递给 server_hostname 的值是否正确,然后重试。

有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息

IpAclError 消息

问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 Error during request to server: IpAclValidation

可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。

建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表

其他资源

有关详细信息,请参阅: