本文介绍如何使用事件网格数据连接将 blob 从存储帐户引入到 Azure 数据资源管理器。 你将创建一个用于设置 Azure 事件网格订阅的事件网格数据连接。 事件网格订阅通过 Azure 事件中心将事件从存储帐户路由到 Azure 数据资源管理器。
若要了解如何在 Azure 门户中或使用 ARM 模板创建连接,请参阅创建事件网格数据连接。
有关如何从事件网格引入 Azure 数据资源管理器的一般信息,请参阅连接到事件网格。
注意
若要使用事件网格连接实现最佳性能,请通过 Blob 元数据设置 rawSizeBytes
引入属性。 有关详细信息,请参阅引入属性。
有关基于以前的 SDK 版本的代码示例,请参阅存档的文章。
先决条件
创建事件网格数据连接
在本部分,我们将在事件网格与 Azure 数据资源管理器表之间建立连接。
安装 Microsoft.Azure.Management.Kusto NuGet 包。
创建用于身份验证的 Microsoft Entra 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。
运行以下代码。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(databaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridConnectionName = "myeventgridconnect";
//The event hub and storage account that are created as part of the Prerequisites
var eventHubResourceId = new ResourceIdentifier("/subscriptions/<storageAccountSubscriptionId>/resourceGroups/<storageAccountResourceGroupName>/providers/Microsoft.Storage/storageAccounts/<storageAccountName>");
var storageAccountResourceId = new ResourceIdentifier("/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>");
var consumerGroup = "$Default";
var location = AzureLocation.chinaeast2;
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = KustoEventGridDataFormat.Csv;
var blobStorageEventType = BlobStorageEventType.MicrosoftStorageBlobCreated;
var databaseRouting = KustoDatabaseRouting.Multi;
var eventGridConnectionData = new KustoEventGridDataConnection
{
StorageAccountResourceId = storageAccountResourceId, EventHubResourceId = eventHubResourceId,
ConsumerGroup = consumerGroup, TableName = tableName, Location = location, MappingRuleName = mappingRuleName,
DataFormat = dataFormat, BlobStorageEventType = blobStorageEventType, DatabaseRouting = databaseRouting
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, eventGridConnectionName, eventGridConnectionData);
设置 |
建议的值 |
字段说明 |
tenantId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
租户 ID。 也称为目录 ID。 |
subscriptionId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
用于创建资源的订阅 ID。 |
clientId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
可以访问租户中资源的应用程序的客户端 ID。 |
clientSecret |
PlaceholderClientSecret |
可以访问租户中资源的应用程序的客户端密码。 |
resourceGroupName |
testrg |
包含群集的资源组的名称。 |
clusterName |
mykustocluster |
群集的名称。 |
databaseName |
mykustodatabase |
群集中目标数据库的名称。 |
eventGridConnectionName |
myeventgridconnect |
所需的数据连接名称。 |
tableName |
StormEvents |
目标数据库中目标表的名称。 |
mappingRuleName |
StormEvents_CSV_Mapping |
与目标表相关的列映射的名称。 |
dataFormat |
csv |
消息的数据格式。 |
eventHubResourceId |
资源 ID |
将事件网格配置为发送事件的事件中心的资源 ID。 |
storageAccountResourceId |
资源 ID |
包含要引入数据的存储帐户的资源 ID。 |
consumerGroup |
$Default |
事件中心的使用者组。 |
location |
中国东部 2 |
数据连接资源的位置。 |
blobStorageEventType |
Microsoft.Storage.BlobCreated |
触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。 |
databaseRouting |
多或单 |
连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。 |
安装所需的库。
pip install azure-common
pip install azure-mgmt-kusto
创建用于身份验证的 Microsoft Entra 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。
运行以下代码。
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventGridDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub and storage account that are created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx"
storage_account_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx"
consumer_group = "$Default"
location = "China East 2"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
database_routing = "Multi"
blob_storage_event_type = "Microsoft.Storage.BlobCreated"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.begin_create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventGridDataConnection(storage_account_resource_id=storage_account_resource_id, event_hub_resource_id=event_hub_resource_id,
consumer_group=consumer_group, table_name=table_name, location=location, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing,
blob_storage_event_type=blob_storage_event_type))
# The creation of the connection is async. Validation errors are only visible if you wait for the results.
poller.wait()
print(poller.result())
设置 |
建议的值 |
字段说明 |
tenant_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
租户 ID。 也称为目录 ID。 |
subscription_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
用于创建资源的订阅 ID。 |
client_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
可以访问租户中资源的应用程序的客户端 ID。 |
client_secret |
xxxxxxxxxxxxxx |
可以访问租户中资源的应用程序的客户端密码。 |
resource_group_name |
testrg |
包含群集的资源组的名称。 |
cluster_name |
mykustocluster |
群集的名称。 |
database_name |
mykustodatabase |
群集中目标数据库的名称。 |
data_connection_name |
myeventhubconnect |
所需的数据连接名称。 |
table_name |
StormEvents |
目标数据库中目标表的名称。 |
mapping_rule_name |
StormEvents_CSV_Mapping |
与目标表相关的列映射的名称。 |
database_routing |
多或单 |
连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。 |
data_format |
csv |
消息的数据格式。 |
event_hub_resource_id |
资源 ID |
将事件网格配置为发送事件的事件中心的资源 ID。 |
storage_account_resource_id |
资源 ID |
包含要引入的数据的存储帐户的资源 ID。 |
consumer_group |
$Default |
事件中心的使用者组。 |
location |
中国东部 2 |
数据连接资源的位置。 |
blob_storage_event_type |
Microsoft.Storage.BlobCreated |
触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。 |
使用事件网格数据连接
本部分介绍如何在创建 Blob 或重命名 Blob 后,触发从 Azure Blob 存储或 Azure Data Lake Gen 2 到群集的引入。
根据用于上传 Blob 的存储 SDK 类型选择相关的选项卡。
以下代码示例使用 Azure Blob 存储 SDK 将文件上传到 Azure Blob 存储。 上传会触发事件网格数据连接,该连接将数据引入到 Azure 数据资源管理器中。
var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();
以下代码示例使用 Azure Data Lake SDK 将文件上传到 Data Lake Storage Gen2。 上传会触发事件网格数据连接,该连接将数据引入到 Azure 数据资源管理器中。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var fileName = <file_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Create the filesystem.
var dataLakeFileSystemClient = dataLakeServiceClient.CreateFileSystem(fileSystemName).Value;
// Define file metadata and uploading options.
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);
var uploadOptions = new DataLakeFileUploadOptions
{
Metadata = metadata,
Close = true // Note: The close option triggers the event being processed by the data connection.
};
// Write to the file.
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(fileName);
dataLakeFileClient.Upload(localFileName, uploadOptions);
注意
- 使用 Azure Data Lake SDK 上传文件时,初始文件创建事件的大小为 0,Azure 数据资源管理器在数据引入期间会忽略该事件。 为确保正常引入,请将
Close
参数设置为 true
。 此参数使上传方法触发 FlushAndClose 事件,表明已做出最终更新并关闭文件流。
- 为了减少将事件引入 Azure 数据资源管理器时来自事件网格和后续处理的流量,我们建议筛选 data.api 键以仅包含 FlushAndClose 事件,从而删除大小为 0 的文件创建事件。 有关刷新的详细信息,请参阅 Azure Data Lake 刷新方法。
重命名 blob
在 ADLSv2 中,可以重命名目录。 但请务必注意,重命名目录不会触发 Blob 重命名事件,也不会启动目录中包含的 Blob 的引入。 如果你想要确保在重命名目录后引入 Blob,应直接重命名目录中的各个 Blob。
以下代码示例演示如何重命名 ADLSv2 存储帐户中的 Blob。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var sourceFilePath = <source_file_path>;
var destinationFilePath = <destination_file_path>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Get a client to the the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);
// Rename a file in the file system
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(sourceFilePath);
dataLakeFileClient.Rename(destinationFilePath);
注意
启用了分层命名空间功能的存储帐户不支持在 CopyBlob
操作后触发引入。
重要
我们强烈建议不要从自定义代码生成存储事件并将其发送到事件中心。 如果选择这样做,请确保生成的事件严格遵循相应的存储事件架构和 JSON 格式规范。
删除事件网格数据连接
若要删除事件网格连接,请运行以下命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);
若要删除事件网格连接,请运行以下命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)
相关内容