使用 Azure Synapse Link 中的 Apache Spark 3 与 Azure Cosmos DB 进行交互

本文介绍如何使用 Synapse Apache Spark 3 与 Azure Cosmos DB 进行交互。 客户可以在 Azure Synapse Link for Azure Cosmos DB 中使用 Scala、Python、SparkSQL 和 C# 进行分析、数据工程、数据科学和数据探索方案。

与 Azure Cosmos DB 进行交互时支持以下功能:

  • 利用 Synapse Apache Spark 3,可以近乎实时地分析通过 Azure Synapse Link 启用的 Azure Cosmos DB 容器中的数据,而不会影响事务工作负载的性能。 以下两个选项可用于查询 Spark 中的 Azure Cosmos DB 分析存储
    • 加载到 Spark 数据帧
    • 创建 Spark 表
  • 还可以利用 Synapse Apache Spark 将数据引入 Azure Cosmos DB。 需要注意的是,数据始终通过事务存储引入到 Azure Cosmos DB 容器中。 启用 Synapse Link 后,任何新的插入、更新和删除操作都将自动同步到分析存储。
  • Synapse Apache Spark 还支持将 Azure Cosmos DB 作为源和接收器的 Spark 结构化流。

以下部分将为你讲解语法结构。 你还可以查看 Learn 模块,了解如何使用适用于 Azure Synapse Analytics 的 Apache Spark 查询 Azure Cosmos DB。 Azure Synapse Analytics 工作区中的交互手势旨在提供一种简单易用的开箱即用体验。 在 Synapse 工作区的“数据”选项卡中右键单击 Azure Cosmos DB 容器时,可以看到手势。 借助笔势,可以快速生成代码,并根据需要对其进行定制。 手势非常适合通过单击一下来发现数据。

重要

应注意分析架构中可能会导致数据加载操作出现意外行为的某些约束。 例如,在分析架构中,只有前 1,000 个事务架构中的属性可用,空格的属性不可用,等等。如果遇到一些意外结果,请查看 分析存储架构约束 以了解更多详细信息。

查询 Azure Cosmos DB 分析存储

客户可以将分析存储数据加载到 Spark 数据帧或创建 Spark 表。

体验上的差异在于,Azure Cosmos DB 容器中的基础数据更改是否应自动反映在 Spark 中执行的分析中。 注册 Spark 数据帧或创建 Spark 表时,Spark 会提取分析存储元数据,以便高效下推。 务必注意,由于 Spark 遵循延迟评估策略。 需要采取行动,从 Spark 数据帧或 SparkSQL 查询中获取最后一个数据快照。

在“加载到 Spark 数据帧”的情况下,在 Spark 会话的整个生存期中都会缓存获取的元数据,因此,在创建数据帧时,将针对分析存储的快照评估在数据帧上调用的后续操作。

而在“创建 Spark 表”的情况下,分析存储状态的元数据不会缓存在 Spark 中,而是在针对 Spark 表的每个 SparkSQL 查询执行中重新加载。

最后,可以选择将快照加载到 Spark 数据帧或查询 Spark 表以获取最新快照。

注意

若要查询 Azure Cosmos DB for MongoDB 帐户,请详细了解分析存储中的完全保真架构表示形式,以及要使用的扩展属性名。

注意

所有 options 都区分大小写。

身份验证

现在,Spark 3.x 客户可以使用受信任的标识访问令牌或数据库帐户密钥向 Azure Cosmos DB 分析存储进行身份验证。 由于令牌的生存期较短,因此更安全,并通过 Cosmos DB RBAC 分配所需的权限。

连接器现在支持两种身份验证类型,MasterKeyAccessToken,用于 spark.cosmos.auth.type 属性。

主密钥身份验证

使用密钥通过 spark 读取数据帧:

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.accountKey" -> "<key>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

访问令牌身份验证

新的无密钥身份验证引入了对访问令牌的支持:

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.auth.type" -> "AccessToken",
    "spark.cosmos.auth.accessToken" -> "<accessToken>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

访问令牌身份验证需要角色分配

若要使用访问令牌方法,需要生成访问令牌。 由于访问令牌与 Azure 标识相关联,因此必须将正确的基于角色的访问控制(RBAC)分配给该标识。 角色分配位于数据平面级别,并且必须具有执行角色分配的最低控制平面权限。

Azure 门户中的“标识访问管理”(IAM)角色分配处于控制平面级别,不会影响数据平面上的角色分配。 数据平面角色分配只能通过 Azure CLI 使用。 为了从 Cosmos DB 中的分析存储读取数据,必须执行 readAnalytics 行动,该行动不属于任何预定义角色。 因此,我们必须创建自定义角色定义。 除了 readAnalytics 操作之外,还添加数据读取器所需的操作。 创建包含以下内容的 JSON 文件,并将其命名为 role_definition.json

{
  "RoleName": "CosmosAnalyticsRole",
  "Type": "CustomRole",
  "AssignableScopes": ["/"],
  "Permissions": [{
    "DataActions": [
      "Microsoft.DocumentDB/databaseAccounts/readAnalytics",
      "Microsoft.DocumentDB/databaseAccounts/readMetadata",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
    ]
  }]
}

访问令牌身份验证需要 Azure CLI

  • 登录到 Azure CLI: az login
  • 将具有您 Cosmos DB 帐户的默认订阅设置为: az account set --subscription <name or id>
  • 在所需的 Cosmos DB 帐户中创建角色定义: az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json
  • 复制并覆盖返回的角色 definition id/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid>
  • 获取要向其分配角色的标识的主体 ID。 标识可以是 Azure 应用注册、虚拟机或任何其他支持的 Azure 资源。 使用以下命令将角色分配给主体:az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"

注意

使用 Azure 应用注册时,请使用 Object Id 服务主体 ID。 此外,主体 ID 和 Cosmos DB 帐户必须位于同一租户中。

生成访问令牌 - Synapse Notebooks

Synapse Notebooks 的建议方法是将服务主体与证书一起使用以生成访问令牌。 单击此处了解更多信息。

The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
  "https://<cosmos-account-name>.documents.azure.cn/.default",
  "https://login.partner.microsoftonline.cn/" + tenantId, clientId, kvLinkedService, certName)

现在,可以使用此步骤中生成的访问令牌在身份验证类型设置为访问令牌时从分析存储中读取数据。

注意

使用 Azure 应用注册时,请使用应用程序(客户端 ID)。

注意

目前,Synapse 不支持在笔记本中使用 azure 标识包生成访问令牌。 此外,Synapse VHD 不包括 azure-identity 包及其依赖项。 单击此处了解更多信息。

加载到 Spark 数据帧

在此示例中,你将创建一个指向 Azure Cosmos DB 分析存储的 Spark 数据帧。 然后,可以通过调用 Spark 操作对 DataFrame 进行更多分析。 此操作不会影响事务存储。

“Python”中的语法如下所示:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

“Scala”中的等效语法如下所示:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

创建 Spark 表

在此示例中,你将创建一个 Spark 表,该表指向 Azure Cosmos DB 分析存储。 然后,可以通过对表调用 SparkSQL 查询来执行其他分析。 此操作不会影响事务存储或导致数据移动。 如果决定删除该 Spark 表,基础 Azure Cosmos DB 容器以及相应的分析存储不会受到影响。

此方案非常方便,可以通过第三方工具重复使用 Spark 表,并提供对运行时基础数据的访问。

创建 Spark 表的语法如下所示:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

注意

如果你有一些方案,其中基础 Azure Cosmos DB 容器的架构随时间而变化;且如果你希望更新的架构自动反映在针对 Spark 表的查询中,那么可通过将 Spark 表选项中的 spark.cosmos.autoSchemaMerge 选项设置为 true 来实现。

将 Spark 数据帧写入 Azure Cosmos DB 容器

在此示例中,将 Spark 数据帧写入 Azure Cosmos DB 容器。 此操作会影响事务性工作负载的性能,并消耗在 Azure Cosmos DB 容器或共享数据库上预配的请求单位。

“Python”中的语法如下所示:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

“Scala”中的等效语法如下所示:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

从容器加载流式处理数据帧

在此手势中,你将使用 Spark 流式处理功能将数据从容器加载到数据帧中。 数据将存储在连接到工作区的主数据湖帐户(和文件系统)中。

注意

如果想在 Synapse Apache Spark 中引用外部库,请访问此处了解详细。 例如,如果要将 Spark 数据帧引入到 Azure Cosmos DB for MongoDB 的容器,则可以 在此处使用适用于 Spark 的 MongoDB 连接器。

从 Azure Cosmos DB 容器加载流式处理数据帧

在此示例中,使用 Spark 的结构化流式处理,使用 Azure Cosmos DB 中的更改源功能将数据从 Azure Cosmos DB 容器加载到 Spark 流式处理数据帧中。 Spark 使用的检查点数据将存储在连接到工作区的主数据湖帐户(和文件系统)中。

“Python”中的语法如下所示:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

“Scala”中的等效语法如下所示:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

将流数据帧写入 Azure Cosmos DB 容器

在此示例中,将一个流数据帧写入 Azure Cosmos DB 容器。 该操作会影响事务工作负载的性能,并消耗在 Azure Cosmos DB 容器或共享数据库上预配的请求单位。 如果未创建 文件夹 /localWriteCheckpointFolder (在下面的示例中),则会自动创建它。

“Python”中的语法如下所示:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

“Scala”中的等效语法如下所示:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

后续步骤