Unity Catalog 中 (UDF) 的用户定义函数

重要

此功能目前以公共预览版提供。

Unity Catalog 中的用户定义函数 (UDF) 扩展了 Azure Databricks 中的 SQL 和 Python 功能。 它们允许在计算环境中定义、使用以及安全地共享和管理自定义函数。

在 Unity Catalog 中注册为函数的 Python UDF 在范围和支持方面不同于作用域为笔记本或 SparkSession 的 PySpark UDF。 请参阅用户定义标量函数 - Python

有关完整的 SQL 语言参考,请参阅 CREATE FUNCTION (SQL 和 Python)

要求

若要在 Unity Catalog 中使用 UDF,必须满足以下要求:

  • 要在 Unity Catalog 中注册的 UDF 中使用 Python 代码,必须使用专业 SQL 仓库或运行 Databricks Runtime 13.3 LTS 或更高版本的群集。
  • 如果视图包含 UC Python UDF,它将在 SQL 经典仓库上失败。

在 Unity Catalog 中创建 UDF

若要在 Unity Catalog 中创建 UDF,用户需要对架构拥有 USAGE 和 CREATE 权限,对目录拥有 USAGE 权限。 有关更多详细信息,请参阅 Unity Catalog

若要运行 UDF,用户需要对 UDF 拥有 EXECUTE 权限。 用户还需要对架构和目录具有 USAGE 权限。

以下示例会将新函数注册到 my_schema Unity Catalog 架构:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight DOUBLE, height DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
AS
SELECT weight / (height * height);

适用于 Unity Catalog 的 Python UDF 使用语句偏移量,由双美元符号 ($$).括起来。 还需要指定数据类型映射。 以下示例注册计算体重指数的 UDF:

CREATE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;

现在可以在 SQL 查询或 PySpark 代码中使用此 Unity Catalog 函数:

SELECT person_id, my_catalog.my_schema.calculate_bmi(weight_kg, height_m) AS bmi
FROM person_data;

在 PySpark 中使用 Unity Catalog UDF

from pyspark.sql.functions import expr

result = df.withColumn("bmi", expr("my_catalog.my_schema.calculate_bmi(weight_kg, height_m)"))
display(result)

升级会话范围的 UDF

注意

Unity Catalog 中 Python UDF 的语法和语义不同于注册到 SparkSession 的 Python UDF。 请参阅用户定义标量函数 - Python

假设 Azure Databricks 笔记本中有以下基于会话的 UDF:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def greet(name):
    return f"Hello, {name}!"

# Using the session-based UDF
result = df.withColumn("greeting", greet("name"))
result.show()

若要将其注册为 Unity Catalog 函数,请使用 SQL CREATE FUNCTION 语句,如以下示例所示:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.greet(name STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return f"Hello, {name}!"
$$

在 Unity Catalog 中共享 UDF

UDF 的权限基于应用于注册 UDF 的目录、架构或数据库的访问控制进行管理。 有关详细信息,请参阅 Unity Catalog

使用 Azure Databricks SQL 或 Azure Databricks 工作区 UI 向用户或组授予权限(建议)。

工作区 UI 中的权限

  1. 找到存储 UDF 的目录和架构,然后选择 UDF。
  2. 在 UDF 设置中查找“权限”选项。 添加用户或组并指定他们应具有的访问权限类型,例如 EXECUTE 或 MANAGE。

![工作区 UI 中的权限](../_static/images/udf/high res udf permission.gif)

使用 Azure Databricks SQL 的权限

以下示例授予用户对函数的 EXECUTE 权限。

GRANT EXECUTE ON FUNCTION my_catalog.my_schema.calculate_bmi TO user@example.com;

若要删除权限,请使用 REVOKE 命令,如以下示例所示:

REVOKE EXECUTE ON FUNCTION my_catalog.my_schema.calculate_bmi FROM user@example.com;

UDF 的最佳做法

对于需要可供所有用户访问的 UDF,Databricks 建议使用相应的访问控制创建专用目录和架构。

对于特定于团队的 UDF,请使用团队目录中的专用架构进行存储和管理。

Azure Databricks 建议在 UDF 的文档字符串中包含以下信息:

  • 当前版本号
  • 用于跟踪各个版本的修改的更改日志
  • UDF 的用途、参数和返回值
  • 如何使用 UDF 的示例

下面是 UDF 最佳做法的示例:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
COMMENT "Calculates Body Mass Index (BMI) from weight and height."
LANGUAGE PYTHON
AS $$
 """
Parameters:
calculate_bmi (version 1.2):
- weight_kg (float): Weight of the individual in kilograms.
- height_m (float): Height of the individual in meters.

Returns:
- float: The calculated BMI.

Example Usage:

SELECT calculate_bmi(weight, height) AS bmi FROM person_data;

Change Log:
- 1.0: Initial version.
- 1.1: Improved error handling for zero or negative height values.
- 1.2: Optimized calculation for performance.

 Note: BMI is calculated as weight in kilograms divided by the square of height in meters.
 """
if height_m <= 0:
 return None  # Avoid division by zero and ensure height is positive
return weight_kg / (height_m ** 2)
$$;

用于访问外部 API 的 UDF

可以使用 UDF 从 SQL 访问外部 API。 下面的示例使用 Python requests 库发出 HTTP 请求。

注意

Python UDF 使用无服务器计算或通过共享访问模式配置的计算,允许通过端口 80、443 和 53 传输 TCP/UDP 网络流量。

CREATE FUNCTION my_catalog.my_schema.get_food_calories(food_name STRING)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
import requests

api_url = f"https://example-food-api.com/nutrition?food={food_name}"
response = requests.get(api_url)

if response.status_code == 200:
   data = response.json()
   # Assuming the API returns a JSON object with a 'calories' field
   calories = data.get('calories', 0)
   return calories
else:
   return None  # API request failed

$$;

使用 UDF 实现安全性和合规性

使用 Python UDF 实现自定义词汇切分、数据掩码、数据修订或加密机制。

以下示例在保持长度和域的同时屏蔽电子邮件地址的标识:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.mask_email(email STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
parts = email.split('@')
masked_username = username[0] + '*' * (len(username) - 2) + username[-1]
return f"{masked_username}@{domain}"
$$

以下示例在动态视图定义中应用此 UDF:

-- First, create the view
CREATE OR REPLACE VIEW my_catalog.my_schema.masked_customer_view AS
SELECT
  id,
  name,
  my_catalog.my_schema.mask_email(email) AS email
FROM my_catalog.my_schema.customer_data;

-- Now you can query the view
SELECT * FROM my_catalog.my_schema.masked_customer_view;
+---+------------+------------------------+------------------------+
| id|        name|                   email|           masked_email |
+---+------------+------------------------+------------------------+
|  1|    John Doe|   john.doe@example.com |  j*******e@example.com |
|  2| Alice Smith|alice.smith@company.com |a**********h@company.com|
|  3|   Bob Jones|    bob.jones@email.org |   b********s@email.org |
+---+------------+------------------------+------------------------+

限制

  • 可以在 Python UDF 中定义任意数量的 Python 函数,但必须返回标量值。
  • Python 函数必须独立处理 NULL 值,并且所有类型映射都必须遵循 Azure Databricks SQL 语言映射。
  • 可以导入 Azure Databricks 包含的标准 Python 库,但不能包括自定义库或外部依赖项。
  • 如果未指定目录或架构,则 Python UDF 将注册到当前活动架构。
  • Python UDF 在安全隔离的环境中执行,无权访问文件系统或内部服务。