使用 Kusto .NET SDK 引入数据

有两个适用于 .NET 的客户端库:引入库数据库。 有关 .NET SDK 的详细信息,请参阅关于 .NET SDK。 可以使用这些库在群集中引入(加载)数据并从代码中查询数据。 本文首先在测试群集中创建一个表和数据映射。 然后将引入排列到群集并验证结果。

先决条件

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • 群集和数据库。 创建群集和数据库

安装引入库

Install-Package Microsoft.Azure.Kusto.Ingest

添加身份验证和构造连接字符串

身份验证

SDK 使用 Microsoft Entra 租户 ID,以对应用程序进行身份验证。 要查找租户 ID,请使用以下 URL,并将域替换为 YourDomain

https://login.partner.microsoftonline.cn/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.partner.microsoftonline.cn/contoso.com/.well-known/openid-configuration/。 单击此 URL 以查看结果;第一行如下所示。

"authorization_endpoint":"https://login.partner.microsoftonline.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 6babcaad-604b-40ac-a9d7-9fd97c0b779f

此示例使用交互式 Microsoft Entra 用户身份验证来访问群集。 还可以使用通过证书或应用程序机密完成的 Microsoft Entra 应用程序身份验证。 在运行此代码之前,请确保为 tenantIdclusterUri 设置正确的值。

SDK 提供了一种简便方法,可将身份验证方法设置为连接字符串的一部分。 有关连接字符串的完整文档,请参阅连接字符串

注意

当前版本的 SDK 不支持 .NET Core 上的交互式用户身份验证。 如果需要,请改用 Microsoft Entra 用户名/密码或应用程序身份验证。

构造连接字符串

现在可以构造连接字符串。 你将在后续步骤中创建目标表和映射。

var kustoUri = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

设置源文件信息

设置源文件的路径。 此示例使用 Azure Blob 存储上托管的示例文件。 StormEvents 示例数据集包含美国国家环境信息中心的与天气相关的数据。

var blobPath = "https://kustosamples.blob.core.chinacloudapi.cn/samplefiles/StormEvents.csv";

在测试群集上创建表

创建与 StormEvents.csv 文件中的数据架构匹配的名为 StormEvents 的表。

提示

下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定义引入映射

将传入的 CSV 数据映射到创建表时使用的列名称。 在该表上预配 CSV 列映射对象

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

为表定义批处理策略

批处理传入数据可优化数据分片大小,该大小由引入批处理策略控制。 使用引入批处理策略管理命令修改策略。 使用此策略可以减少缓慢到达的数据的延迟。

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

建议为引入的数据定义一个 Raw Data Size 值,并将大小逐渐减小到 250 MB,同时检查性能是否有所提高。

可使用 Flush Immediately 属性跳过批处理,但不建议进行大规模引入时使用,因为这可能会导致性能不佳。

列入一条引入消息

将一条消息排入队列,以便从 Blob 存储中拉取数据并引入该数据。 将与引入群集建立连接,并创建另一个客户端来使用该终结点。

提示

下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.chinacloudapi.cn";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

验证数据已引入表中

等待五到十分钟,直到排入队列的引入已计划在群集中进行引入并加载数据。 然后运行以下代码,以获取 StormEvents 表中记录的计数。

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

运行故障排除查询

登录到 https://dataexplorer.azure.cn 并连接到群集。 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。 在运行之前替换数据库名称。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。 在运行之前替换数据库名称。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源

如果计划学习我们的其他文章,请保留已创建的资源。 否则,在数据库中运行以下命令以清除 StormEvents 表。

.drop table StormEvents