将一对多关系数据迁移到 Azure Cosmos DB for NoSQL 帐户中

适用于: NoSQL

若要从关系数据库迁移到 Azure Cosmos DB for NoSQL,可能需要更改数据模型以进行优化。

一种常见的转换方法是,通过将相关子项嵌入到一个 JSON 文档来反规范化数据。 本文探讨使用 Azure 数据工厂或 Azure Databricks 实现此目的的几个选项。 有关 Azure Cosmos DB 的数据建模的详细信息,请参阅 Azure Cosmos DB 中的数据建模

示例方案

假设 SQL 数据库中包含以下两个表:Orders 和 OrderDetails。

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

我们希望在迁移期间,将此一对多关系合并到一个 JSON 文档中。 若要创建单个文档,请使用 FOR JSON 创建 T-SQL 查询:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查询的结果将包括 Orders 表中的数据:

Screenshot of a query that results in details of various orders.

理想情况下,你希望使用单个 Azure 数据工厂 (ADF) 复制活动来查询用作源的 SQL 数据,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。 目前,无法在一个复制活动中执行所需的 JSON 转换。 如果我们尝试将上述查询的结果复制到 Azure Cosmos DB for NoSQL 容器中,将会看到文档的字符串属性形式的 OrderDetails 字段,而不是预期的 JSON 数组。

可通过以下方式之一解决当前的这种限制:

  • 使用包含两个复制活动的 Azure 数据工厂
    1. 将 SQL 中的 JSON 格式的数据提取到位于中间 Blob 存储位置的某个文本文件
    2. 将 JSON 文本文件中的数据加载到 Azure Cosmos DB 中的某个容器。
  • 使用 Azure Databricks 从 SQL 中读取数据并将其写入 Azure Cosmos DB - 我们将演示这两个选项。

让我们更详细地了解这些方法:

Azure 数据工厂

尽管我们无法将 OrderDetails 作为 JSON 数组嵌入到目标 Azure Cosmos DB 文档中,但可以使用两个独立的复制活动来解决该问题。

复制活动 #1:SqlJsonToBlobText

对于源数据,我们使用 SQL 查询通过 SQL Server OPENJSON 和 FOR JSON PATH 功能获取结果集,该结果集以单列的形式提供,每行包含一个 JSON 对象(表示订单):

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Screenshot of the preview values in the ADF copy operation.

对于 SqlJsonToBlobText 复制活动的接收器,我们选择了“分隔文本”并将其指向 Azure Blob 存储中的特定文件夹。 此接收器包含一个动态生成的唯一文件名(例如 @concat(pipeline().RunId,'.json'))。 我们的文本文件实际上并不是“分隔的”,并且我们不希望使用逗号将其分析成单独的列。 我们还希望保留双引号 (""),并将“列分隔符”设置为制表符 ("\t") 或数据中未出现其他字符,然后将“引号字符”设置为“无引号字符”。

Screenshot that highlights the Column delimiter and Quote character settings.

复制活动 #2:BlobJsonToCosmos

接下来,我们修改 ADF 管道:添加第二个复制活动,用于在 Azure Blob 存储中查找第一个活动创建的文本文件。 第二个复制活动将结果作为“JSON”源进行处理,将文本文件中找到的每个 JSON 行作为一个文档插入到 Azure Cosmos DB 接收器中。

Screenshot that highlights the JSON source file and the File path fields.

(可选)我们还将一个“删除”活动添加到了管道,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有旧文件。 现在,我们的 ADF 管道如下所示:

Screenshot that highlights the Delete activity.

触发上述管道后,会看到中间 Azure Blob 存储位置创建了一个文件,其中的每行包含一个 JSON 对象:

Screenshot that shows the created file that contains the JSON objects.

我们还会看到 Orders 文档,其中适当嵌入的 OrderDetails 已插入到 Azure Cosmos DB 集合中:

Screenshot that shows the order details as a part of the Azure Cosmos DB document

Azure Databricks

我们还可以在 Azure Databricks 中使用 Spark,将 SQL 数据库源中的数据复制到 Azure Cosmos DB 目标,而无需在 Azure Blob 存储中创建中间文本/JSON 文件。

注意

为简单清晰起见,代码片段显式包含了虚拟数据库密码,但在理想情况下,你应使用 Azure Databricks 机密。

首先,创建所需的 SQL 连接器 并将 Azure Cosmos DB 连接器库并连接到 Azure Databricks 群集。 重启群集以确保加载库。

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

接下来,我们为 Scala 和 Python 提供了两个示例。

Scala

在这里,我们将得到 SQL 查询的结果,并将“FOR JSON”输出到数据帧中:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.chinacloudapi.cn",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

接下来,我们将连接到 Azure Cosmos DB 数据库和集合:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.cn:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

最后,我们定义架构并使用 from_json 应用数据帧,然后将数据帧保存到 Cosmos DB 集合。

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to an Azure Cosmos DB collection.

Python

作为替代方法,如果源数据库不支持 FOR JSON 或类似操作,则你可能需要在 Spark 中执行 JSON 转换。 或者,可以对大型数据集使用并行操作。 这里提供了一个 PySpark 示例。 首先配置第一个单元中的源数据库和目标数据库连接:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

然后,我们将查询源数据库(在本案例中为 SQL Server)中的订单及订单明细记录,并将结果放入 Spark 数据帧。 我们还将创建一个列表,其中包含所有订单 ID 和并行操作的线程池:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

然后,创建一个函数用于将 Orders 写入目标 API for NoSQL 集合。 此函数筛选给定订单 ID 的所有订单详细信息,将其转换为 JSON 数组,并将该数组插入 JSON 文档。 然后,该 JSON 文档将按该顺序写入目标 API for NoSQL 容器:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://docs.azure.cn/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

最后,我们将在线程池上使用映射函数调用要并行执行的 Python writeOrder 函数,并传入先前创建的订单 ID 列表:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

在这两种方法中,最终都将在 Azure Cosmos DB 集合中的每个 Order 文档中获得正确保存的嵌入 OrderDetails:

Screenshot of the resulting data after migration.

后续步骤