创建可使用排队引入功能获取数据的应用

适用于:✅Azure 数据资源管理器

Kusto 能够通过批处理管理器优化和批处理引入的数据,从而处理大量数据引入。 批处理管理器会在引入的数据到达其目标表之前聚合数据,从而实现更高效的处理和改进的性能。 批处理通常以 1 GB 的原始数据、1000 个单独文件或默认 5 分钟的处理时间为单位完成。 可以在数据库和表级别更新批处理策略,这通常可缩短批处理时间并减少延迟。 有关引入批处理的详细信息,请参阅 IngestionBatching 策略以编程方式更改表级别引入批处理策略

注意

批处理还会考虑各种因素,例如目标数据库和表、运行引入的用户以及与引入关联的各种属性(例如特殊标记)。

在本文中,学习如何:

先决条件

开始之前

  • 使用以下方法之一创建 MyStormEvents 表,并且由于仅引入少量数据,因此将其引入批处理策略超时设置为 10 秒:

    1. 通过在管理命令中运行第一个应用,在数据库中创建名为 MyStormEvents 的目标表。
    2. 通过在管理命令中运行第二个应用,将引入批处理策略超时设置为 10 秒。 在运行应用之前,请将超时值更改为 00:00:10

    注意

    新的批处理策略设置可能需要几分钟才能传播到批处理管理器。

  • 下载 stormevent.csv 示例数据文件。 该文件包含 1000 条风暴事件记录。

注意

以下示例假定引入数据的列与目标表的架构之间存在简单匹配。 如果引入的数据与表架构不匹配,则必须使用引入映射使数据的列与表架构保持一致。

将文件排队以供引入和查询结果

在首选 IDE 或文本编辑器中,使用适合首选语言的约定创建名为“基本引入”的项目或文件。 将 stormevent.csv 文件放置在与应用相同的位置。

注意

在以下示例中,会使用两个客户端,一个用于查询群集,另一个用于将数据引入群集。 对于客户端库支持的语言,由于两个客户端共享同一个用户提示验证器,因此会生成单个用户提示,而不是每个客户端一个用户提示。

添加以下代码:

  1. 创建连接到群集并打印 MyStormEvents 表中的行数的客户端应用。 你将使用此计数作为基线,以便与每个引入方法后的行数进行比较。 将 <your_cluster_uri><your_database> 占位符分别替换为群集 URI 和数据库名称。

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. 在可能的情况下,使用与群集 URI 共享相同的身份验证凭据创建定义数据引入 URI 的连接字符串生成器对象。 将 <your_ingestion_uri> 占位符替换为数据引入 URI。

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. 通过将 stormevent.csv 文件添加到批处理队列来引入该文件。 你将使用以下对象和属性:

    • QueuedIngestClient 用于创建引入客户端。
    • IngestionProperties 用于设置引入属性。
    • DataFormat 用于将文件格式指定为 CSV
    • ignore_first_record 使用以下逻辑指定是否忽略 CSV 中的第一行和类似的文件类型:
      • True:忽略第一行。 使用此选项可从表格文本数据中删除标题行。
      • False:将第一行作为常规行引入。

    注意

    引入支持的最大文件大小为 6 GB。 建议引入 100 MB 到 1 GB 的文件。

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. 引入文件后查询表中的行数,并显示引入的最后一行。

    注意

    为了让引入完成,请等待 30 秒,再查询表。 对于 C#,请等待 60 秒,以便有时间以异步方式将文件添加到引入队列。

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

完整代码应如下所示:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

运行应用

在命令行界面中,使用以下命令运行应用:

# Change directory to the folder that contains the management commands project
dotnet run .

你应会看到如下所示的结果:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

将内存中数据排队以用于引入和查询结果

可以通过创建包含数据的流,然后排队进行引入,以从内存中引入数据。

例如,可以修改应用以替换 从文件引入 代码,如下所示:

  1. 将流描述符包添加到文件顶部的导入。

    不需要任何其他包。

  2. 添加包含要引入的数据的内存中字符串。

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. 将引入属性设置为不忽略第一条记录,因为内存中字符串没有标题行。

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. 通过将内存中数据添加到批处理队列来引入数据。 如果可能,请提供原始数据的大小。

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

更新代码的概述应如下所示:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

运行应用时,应看到类似于以下所示的结果。 请注意,引入后,表中的行数增加了 1。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

将 blob 排队以用于引入和查询结果

可以从 Azure 存储 Blob、Azure Data Lake 文件和 Amazon S3 文件引入数据。

例如,可以通过将 从内存引入 代码替换为以下内容,修改应用:

  1. 首先将 stormevent.csv 文件上传到存储帐户,并生成具有读取权限的 URI,例如,使用 Azure Blob 的 SAS 令牌

  2. 将 Blob 描述符包添加到文件顶部的导入。

    不需要任何其他包。

  3. 使用 Blob URI 创建 Blob 描述符,设置引入属性,然后从 Blob 引入数据。 将 <your_blob_uri> 占位符替换为 blob URI。

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

更新代码的概述应如下所示:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

运行应用时,应看到类似于以下所示的结果。 请注意,引入后,表中的行数增加了 1000。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

下一步