将 JSON 格式的示例数据引入 Azure 数据资源管理器

本文介绍如何将 JSON 格式的数据引入 Azure 数据资源管理器数据库。 首先你将引入简单的原始映射 JSON 示例,再引入多行 JSON,然后处理包含数组和字典的更复杂 JSON 架构。 这些示例详细演示了使用 Kusto 查询语言 (KQL)、C# 或 Python 引入 JSON 格式数据的过程。

注意

不建议在生产场景中使用 .ingest 管理命令。 而是使用数据连接器,或使用其中一个 Kusto 客户端库以编程方式引入数据。

先决条件

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • Azure 数据资源管理器群集和数据库。 创建群集和数据库

JSON 格式

Azure 数据资源管理器支持两种 JSON 文件格式:

  • json:行分隔的 JSON。 输入数据中的每一行只包含一条 JSON 记录。 此格式支持对注释和括在单引号中的属性进行分析。 有关详细信息,请参阅 JSON 行
  • multijson:多行 JSON。 分析器将忽略行分隔符,并读取从前一位置到有效 JSON 末尾的一条记录。

注意

使用获取数据体验引入时,默认格式为 multijson。 该格式可以处理多行 JSON 记录和 JSON 记录数组。 遇到分析错误时,将放弃整个文件。 若要忽略无效的 JSON 记录,请选择“忽略数据格式错误”选项,该选项会将格式切换到 json (JSON 行)。

如果使用 JSON 行格式 (json),则分析期间会跳过不表示有效 JSON 记录的行。

引入和映射 JSON 格式的数据

引入 JSON格式的数据需要使用引入属性指定格式。 引入 JSON 数据需要执行映射,以将 JSON 源条目映射到其目标列。 引入数据时,将 IngestionMapping 属性与其 ingestionMappingReference(用于预定义的映射)引入属性或其 IngestionMappings 属性结合使用。 本文将使用 ingestionMappingReference 引入属性,该属性是在用于引入的表中预定义的。 以下示例首先将 JSON 记录作为原始数据引入到包含单个列的表中。 接下来,使用映射将每个属性引入到其映射列中。

简单 JSON 示例

以下示例是采用平面结构的简单 JSON。 数据包含多个设备收集的温度和湿度信息。 每条记录已使用 ID 和时间戳进行标记。

{
    "timestamp": "2019-05-02 15:23:50.0369439",
    "deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
    "messageId": "7f316225-839a-4593-92b5-1812949279b3",
    "temperature": 31.0301639051317,
    "humidity": 62.0791099602725
}

引入原始 JSON 记录

此示例将 JSON 记录作为原始数据引入到包含单个列的表中。 使用查询和更新策略的数据操作是在引入数据后执行的。

使用 Kusto 查询语言引入原始 JSON 格式的数据。

  1. 登录 https://dataexplorer.azure.cn

  2. 选择“添加群集”。

  3. 在“添加群集”对话框中,以 https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/ 格式输入群集 URL,然后选择“添加”。

  4. 粘贴以下命令,然后选择“运行”以创建表。

    .create table RawEvents (Event: dynamic)
    

    此查询将创建一个表,其中包含单个动态数据类型的 Event 列。

  5. 创建 JSON 映射。

    .create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
    

    此命令创建一个映射,以将 JSON 根路径 $ 映射到 Event 列。

  6. 将数据引入 RawEvents 表中。

    .ingest into table RawEvents ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
    

引入映射的 JSON 记录

此示例引入 JSON 记录数据。 每个 JSON 属性映射到表中的单个列。

  1. 创建一个新表,该表采用类似于 JSON 输入数据的架构。 我们将对下面的所有示例和引入命令使用此表。

    .create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
    
  2. 创建 JSON 映射。

    .create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
    

    在此映射中,根据表架构的定义,timestamp 条目将作为 datetime 数据类型引入到 Time 列。

  3. 将数据引入 Events 表中。

    .ingest into table Events ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
    

    文件“simple.json”包含几条行分隔的 JSON 记录。 格式为 json,在引入命令中使用的映射是创建的 FlatEventMapping

引入多行 JSON 记录

此示例引入多行 JSON 记录。 每个 JSON 属性映射到表中的单个列。 文件“multilined.json”包含几条缩进的 JSON 记录。 格式 multijson 指示按 JSON 结构读取记录。

将数据引入 Events 表中。

.ingest into table Events ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'

引入包含数组的 JSON 记录

数组数据类型是按顺序排列的值集合。 JSON 数组的引入由更新策略来完成。 JSON 将按原样引入到中间表。 更新策略针对 RawEvents 表运行某个预定义的函数,并将结果重新引入到目标表。 我们将引入采用以下结构的数据:

{
    "records":
    [
        {
            "timestamp": "2019-05-02 15:23:50.0000000",
            "deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
            "messageId": "7f316225-839a-4593-92b5-1812949279b3",
            "temperature": 31.0301639051317,
            "humidity": 62.0791099602725
        },
        {
            "timestamp": "2019-05-02 15:23:51.0000000",
            "deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
            "messageId": "57de2821-7581-40e4-861e-ea3bde102364",
            "temperature": 33.7529423105311,
            "humidity": 75.4787976739364
        }
    ]
}
  1. 使用 mv-expand 运算符创建一个 update policy 函数用于扩展 records 的集合,使集合中的每个值收到一个单独的行。 我们将使用表 RawEvents 作为源表,使用 Events 作为目标表。

    .create function EventRecordsExpand() {
        RawEvents
        | mv-expand records = Event.records
        | project
            Time = todatetime(records["timestamp"]),
            Device = tostring(records["deviceId"]),
            MessageId = tostring(records["messageId"]),
            Temperature = todouble(records["temperature"]),
            Humidity = todouble(records["humidity"])
    }
    
  2. 该函数收到的架构必须与目标表的架构相匹配。 使用 getschema 运算符检查架构。

    EventRecordsExpand() | getschema
    
  3. 将更新策略添加到目标表。 此策略将自动对 RawEvents 中间表中的任何新引入数据运行查询,并将结果引入到 Events 表中。 定义零保留期策略,以避免持久保存中间表。

    .alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
    
  4. 将数据引入 RawEvents 表中。

    .ingest into table RawEvents ('https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
    
  5. 检查 Events 表中的数据。

    Events