使用 Azure IoT 中心将文件从设备上传到云

本文演示了以下内容的操作方法:

  • 使用 IoT 中心的文件上传功能,通过 Azure IoT 设备和服务 SDK 将文件上传到 Azure Blob 存储。
  • 通知 IoT 中心文件已成功上传,并创建后端服务,以使用 Azure IoT 服务 SDK 从 IoT 中心接收文件上传通知。

在某些情况下,无法轻松地将设备发送的数据映射为 IoT 中心接受的相对较小的设备到云消息。 借助 IoT 中心中的文件上传功能,可以将大型或复杂数据移到云中。 例如:

  • 视频
  • 包含图像的大型文件
  • 以高频率采样的振动数据
  • 某种形式的预处理数据

通常使用 Azure 数据工厂Hadoop 堆栈等工具在云中批处理这些文件。 需要从设备上传文件时,仍可以使用 IoT 中心的安全性和可靠性。 本文介绍如何进行此操作。

本文旨在补充本文中引用的可运行 SDK 示例。

有关详细信息,请参阅:

重要

使用 X.509 证书颁发机构 (CA) 身份验证的设备上的文件上传功能为公共预览版,并且必须启用预览模式。 这是在一同使用 X.509 指纹身份验证或 X.509 证书证明与 Azure 设备预配服务的设备上的正式发布版本。 若要了解有关使用 IoT 中心进行 X.509 身份验证的详细信息,请参阅支持的 X.509 证书

先决条件

  • IoT 中心。 某些 SDK 调用需要 IoT 中心主连接字符串,因此请记下连接字符串。

  • 已注册的设备。 某些 SDK 调用需要设备主连接字符串,因此请记下连接字符串。

  • IoT 中心服务连接权限 - 若要接收文件上传通知消息,后端服务需要“服务连接”权限。 默认情况下,每个 IoT 中心都使用名为“服务”的共享访问策略创建,该策略会授予此权限。 有关详细信息,请参阅连接到 IoT 中心

  • 通过链接 Azure 存储帐户和 Azure Blob 存储容器,在 IoT 中心配置文件上传。 可以使用 Azure 门户Azure CLIAzure PowerShell 进行配置。

概述

本操作说明包含两个部分:

  • 从设备应用程序上传文件
  • 在后端应用程序中接收文件上传通知

从设备应用程序上传文件

本部分介绍如何使用适用于 .NET 的 Azure IoT SDK 中的 DeviceClient 类将文件从设备上传到 IoT 中心。

按照此过程将文件从设备上传到 IoT 中心:

  1. 连接到 IoT 中心
  2. 从 IoT 中心获取 SAS URI
  3. 将文件上传到 Azure 存储
  4. 通知 IoT 中心文件上传状态

连接到设备

调用 CreateFromConnectionString 以连接到设备。 传递设备主连接字符串。

AMQP 是默认传输协议。

static string connectionString = "{device primary connection string}";
deviceClient = DeviceClient.CreateFromConnectionString(connectionString);

从 IoT 中心获取 SAS URI

调用 GetFileUploadSasUriAsync 以获取文件上传详细信息。 下一步使用 SAS URI 将文件从设备上传到 Blob 存储。

const string filePath = "TestPayload.txt";
using var fileStreamSource = new FileStream(filePath, FileMode.Open);
var fileName = Path.GetFileName(fileStreamSource.Name);
var fileUploadSasUriRequest = new FileUploadSasUriRequest
{
    BlobName = fileName
};

FileUploadSasUriResponse sasUri = await _deviceClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest, System.Threading.CancellationToken cancellationToken = default);
Uri uploadUri = sasUri.GetBlobUri();

将文件上传到 Azure 存储

将文件上传到 Azure 存储:

  1. 创建 blockBlobClient 对象,并传递文件上传 URI。

  2. 使用 UploadAsync 方法将文件上传到 Blob 存储,并传递 SAS URI。 可以选择性地添加 Blob 上传选项和取消令牌参数。

Azure Blob 客户端始终使用 HTTPS 作为协议来将文件上传到 Azure 存储。

在此示例中,将 SAS URI 传递给 BlockBlobClient,以创建 Azure 存储块 Blob 客户端并上传文件:

var blockBlobClient = new BlockBlobClient(uploadUri);
await blockBlobClient.UploadAsync(fileStreamSource, null, null);

通知 IoT 中心文件上传状态

使用 CompleteFileUploadAsync 通知 IoT 中心设备客户端已完成上传,并传递 FileUploadCompletionNotification 对象。 该 IsSuccess 标志指示上传是否成功。 收到通知后,IoT 中心将释放与上传关联的资源 (SAS URI)。

如果启用了文件上传通知,IoT 中心会将文件上传通知消息发送到为文件上传通知配置的后端服务。

var successfulFileUploadCompletionNotification = new FileUploadCompletionNotification
{
    // Mandatory. Must be the same value as the correlation id returned in the sas uri response
    CorrelationId = sasUri.CorrelationId,

    // Mandatory. Will be present when service client receives this file upload notification
    IsSuccess = true,

    // Optional, user defined status code. Will be present when service client receives this file upload notification
    StatusCode = 200,

    // Optional, user-defined status description. Will be present when service client receives this file upload notification
    StatusDescription = "Success"
};

await _deviceClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification);

SDK 文件上传示例

SDK 包含此文件上传示例

在后端应用程序中接收文件上传通知

创建一项后端服务,用于从 IoT 中心接收文件上传通知消息。

ServiceClient 类包含服务可用于接收文件上传通知的方法。

接收文件上传通知:

  1. 调用 CreateFromConnectionString 以连接到 IoT 中心。 传递 IoT 中心主连接字符串。
  2. 创建 CancellationToken
  3. 调用 GetFileNotificationReceiver 来创建通知接收方。
  4. 将循环与 ReceiveAsync 一起使用,等待文件上传通知。

例如:

using Microsoft.Azure.Devices;
static ServiceClient serviceClient;
static string connectionString = "{IoT hub connection string}";
serviceClient = ServiceClient.CreateFromConnectionString(connectionString);

// Define the cancellation token
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;

// Create a notification receiver
var notificationReceiver = serviceClient.GetFileNotificationReceiver();
Console.WriteLine("\nReceiving file upload notification from service");

// Check for file upload notifications
while (true)
{
    var fileUploadNotification = await notificationReceiver.ReceiveAsync(token);
    if (fileUploadNotification == null) continue;
    Console.ForegroundColor = ConsoleColor.Yellow;
    Console.WriteLine("Received file upload notification: {0}", 
        string.Join(", ", fileUploadNotification.BlobName));
    Console.ResetColor();
    await notificationReceiver.CompleteAsync(fileUploadNotification);
}

概述

本操作说明包含两个部分:

  • 从设备应用程序上传文件
  • 在后端应用程序中接收文件上传通知

从设备应用程序上传文件

本部分介绍如何使用适用于 Java 的 Azure IoT SDK 中的 DeviceClient 类将文件从设备上传到 IoT 中心。

按照此过程将文件从设备上传到 IoT 中心:

  1. 连接到设备
  2. 从 IoT 中心获取 SAS URI
  3. 将文件上传到 Azure 存储
  4. 将文件上传状态通知发送到 IoT 中心

连接协议

文件上传操作始终使用 HTTPS,但 DeviceClient 可以为遥测、设备方法和设备孪生等其他服务定义 IotHubClientProtocol

IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;

连接到设备

实例化 DeviceClient,以使用设备主连接字符串连接到设备。

String connString = "{IoT hub connection string}";
DeviceClient client = new DeviceClient(connString, protocol);

从 IoT 中心获取 SAS URI

调用 getFileUploadSasUri 以获取 FileUploadSasUriResponse 对象。

FileUploadSasUriResponse 包括这些方法和返回值。 可以将返回值传递给文件上传方法。

方法 返回值
getCorrelationId() 相关 ID
getContainerName() 容器名称
getBlobName() Blob 名称
getBlobUri() Blob URI

例如:

FileUploadSasUriResponse sasUriResponse = client.getFileUploadSasUri(new FileUploadSasUriRequest(file.getName()));

System.out.println("Successfully got SAS URI from IoT hub");
System.out.println("Correlation Id: " + sasUriResponse.getCorrelationId());
System.out.println("Container name: " + sasUriResponse.getContainerName());
System.out.println("Blob name: " + sasUriResponse.getBlobName());
System.out.println("Blob Uri: " + sasUriResponse.getBlobUri());

将文件上传到 Azure 存储

将 blob URI 终结点传递给 BlobClientBuilder.buildclient ,以创建 BlobClient 对象。

BlobClient blobClient =
    new BlobClientBuilder()
        .endpoint(sasUriResponse.getBlobUri().toString())
        .buildClient();

调用 uploadFromFile 将文件上传到 Blob 存储。

String fullFileName = "Path of the file to upload";
blobClient.uploadFromFile(fullFileName);

将文件上传状态通知发送到 IoT 中心

尝试文件上传后,将上传状态通知发送到 IoT 中心。

创建 FileUploadCompletionNotification 对象。 传递 correlationIdisSuccess 文件上传成功状态。 成功上传文件时传递 isSuccess true 值,未成功上传文件时传递 false

即使文件上传失败,也必须调用 FileUploadCompletionNotification。 IoT 中心具有固定数量的 SAS URI,它们可以在任何给定时间处于活动状态。 完成文件上传后,应释放 SAS URI,以便生成其他 SAS URI。 如果 SAS URI 未通过此 API 释放,则它最终会根据在 IoT 中心上配置的 SAS URI 存活时间来释放自己。

此示例传递成功状态。

FileUploadCompletionNotification completionNotification = new FileUploadCompletionNotification(sasUriResponse.getCorrelationId(), true);
client.completeFileUpload(completionNotification);

关闭客户端

释放 client 资源。

client.closeNow();

在后端应用程序中接收文件上传通知

可以创建后端应用程序来接收文件上传通知。

若要创建文件上传通知应用程序,请执行以下操作:

  1. 连接到 IoT 中心服务客户端
  2. 检查文件上传通知

ServiceClient 类包含服务可用于接收文件上传通知的方法。

连接到 IoT 中心服务客户端

创建 IotHubServiceClientProtocol 对象。 连接使用 AMQPS 协议。

调用 createFromConnectionString 以连接到 IoT 中心。 传递 IoT 中心主连接字符串。

private static final String connectionString = "{IoT hub primary connection string}";
private static final IotHubServiceClientProtocol protocol = IotHubServiceClientProtocol.AMQPS;
ServiceClient sc = ServiceClient.createFromConnectionString(connectionString, protocol);

检查文件上传状态

检查文件上传状态:

  1. 创建 getFileUploadNotificationReceiver 对象。
  2. 使用 open 连接到 IoT 中心。
  3. 调用 receive 以检查文件上传状态。 此方法返回 fileUploadNotification 对象。 如果收到上传通知,可以使用 fileUploadNotification 方法查看上传状态字段。

例如:

FileUploadNotificationReceiver receiver = sc.getFileUploadNotificationReceiver();
receiver.open();
FileUploadNotification fileUploadNotification = receiver.receive(2000);

if (fileUploadNotification != null)
{
    System.out.println("File Upload notification received");
    System.out.println("Device Id : " + fileUploadNotification.getDeviceId());
    System.out.println("Blob Uri: " + fileUploadNotification.getBlobUri());
    System.out.println("Blob Name: " + fileUploadNotification.getBlobName());
    System.out.println("Last Updated : " + fileUploadNotification.getLastUpdatedTimeDate());
    System.out.println("Blob Size (Bytes): " + fileUploadNotification.getBlobSizeInBytes());
    System.out.println("Enqueued Time: " + fileUploadNotification.getEnqueuedTimeUtcDate());
}
else
{
    System.out.println("No file upload notification");
}

// Close the receiver object
receiver.close();

SDK 文件上传示例

有两个 Java 文件上传示例

安装包

必须先安装 azure-iot-device 库,然后才能调用任何相关代码。

pip install azure-iot-device

azure.storage.blob 包用于执行文件上传。

pip install azure.storage.blob

从设备应用程序上传文件

本部分介绍如何使用适用于 Python 的 Azure IoT SDK 中的 IoTHubDeviceClient 类将文件从设备上传到 IoT 中心。

按照此过程将文件从设备上传到 IoT 中心:

  1. 连接到设备
  2. 获取 Blob 存储信息
  3. 将文件上传到 Blob 存储
  4. 通知 IoT 中心上传状态

导入库

import os
from azure.iot.device import IoTHubDeviceClient
from azure.core.exceptions import AzureError
from azure.storage.blob import BlobClient

连接到设备

若要连接到设备,请执行以下操作:

  1. 调用 create_from_connection_string,以添加设备主连接字符串。

  2. 调用 connect 以连接设备客户端。

例如:

# Add your IoT hub primary connection string
CONNECTION_STRING = "{Device primary connection string}"
device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)

# Connect the client
device_client.connect()

获取 Blob 存储信息

调用 get_storage_info_for_blob,从 IoT 中心获取有关链接的 Azure 存储帐户的信息。 此信息包括主机名、容器名称、blob 名称和 SAS 令牌。 get_storage_info_for_blob 方法还返回在 notify_blob_upload_status 方法中使用的 correlation_id。 IoT 中心使用 correlation_id 来标记你在处理的 Blob。

# Get the storage info for the blob
PATH_TO_FILE = "{Full path to local file}"
blob_name = os.path.basename(PATH_TO_FILE)
blob_info = device_client.get_storage_info_for_blob(blob_name)

将文件上传到 Blob 存储

若要将文件上传到 Blob 存储,请执行以下操作:

  1. 使用 from_blob_url 从 Blob URL 创建 BlobClient 对象。
  2. 调用 upload_blob 将文件上传到 Blob 存储。

此示例分析 blob_info 结构,以创建用于初始化 BlobClient 的 URL。 然后,它会调用 upload_blob 将文件上传到 Blob 存储。

try:
    sas_url = "https://{}/{}/{}{}".format(
        blob_info["hostName"],
        blob_info["containerName"],
        blob_info["blobName"],
        blob_info["sasToken"]
    )

    print("\nUploading file: {} to Azure Storage as blob: {} in container {}\n".format(file_name, blob_info["blobName"], blob_info["containerName"]))

    # Upload the specified file
    with BlobClient.from_blob_url(sas_url) as blob_client:
        with open(file_name, "rb") as f:
            result = blob_client.upload_blob(f, overwrite=True)
            return (True, result)

except FileNotFoundError as ex:
    # catch file not found and add an HTTP status code to return in notification to IoT hub
    ex.status_code = 404
    return (False, ex)

except AzureError as ex:
    # catch Azure errors that might result from the upload operation
    return (False, ex)

通知 IoT 中心上传状态

使用 notify_blob_upload_status 通知 IoT 中心 Blob 存储操作的状态。 传递 get_storage_info_for_blob 方法获取的 correlation_id。 IoT 中心使用 correlation_id 来通知任何可能正在侦听文件上传任务状态通知的服务。

此示例通知 IoT 中心已成功上传文件:

device_client.notify_blob_upload_status(storage_info["correlationId"], True, 200, "OK: {}".format(PATH_TO_FILE)

关闭设备客户端

关闭客户端。 调用此方法后,尝试进一步调用客户端会导致引发 ClientError

device_client.shutdown()

SDK 文件上传示例

SDK 包含两个文件上传示例:

概述

本操作说明包含两个部分:

  • 从设备应用程序上传文件
  • 在后端应用程序中接收文件上传通知

从设备应用程序上传文件

本部分介绍如何使用适用于 Node.js 的 Azure IoT SDK 中的 azure-iot-device 包将文件从设备上传到 IoT 中心。

安装 SDK 包

运行以下命令,在开发计算机上安装 azure-iot-device 设备 SDK、azure-iot-device-mqtt 和 @azure/storage-blob 包

npm install azure-iot-device azure-iot-device-mqtt @azure/storage-blob --save

azure-iot-device 包包含与 IoT 设备交互的对象。

按照此过程将文件从设备上传到 IoT 中心:

  1. 获取 Blob 共享访问签名
  2. 将文件上传到 Azure 存储
  3. 将文件上传状态通知发送到 IoT 中心

创建模块

使用已安装的包创建客户端、协议、错误和路径模块。

const Client = require('azure-iot-device').Client;
const Protocol = require('azure-iot-device-mqtt').Mqtt;
const errors = require('azure-iot-common').errors;
const path = require('path');

从 IoT 中心获取 SAS URI

使用 getBlobSharedAccessSignature 从 IoT 中心获取链接的存储帐户 SAS 令牌。 如先决条件中所述,IoT 中心会链接到 Blob 存储。

例如:

// make sure you set these environment variables prior to running the sample.
const localFilePath = process.env.PATH_TO_FILE;
const storageBlobName = path.basename(localFilePath);
const blobInfo = await client.getBlobSharedAccessSignature(storageBlobName);
if (!blobInfo) {
throw new errors.ArgumentError('Invalid upload parameters');
}

将文件上传到 IoT 中心

将文件从设备上传到 IoT 中心:

  1. 创建流管道
  2. 构造 Blob URL
  3. 创建 BlockBlobClient 以将文件上传到 Blob 存储
  4. 调用 uploadFile 将文件上传到 Blob 存储
  5. 调用 notifyBlobUploadStatus 以通知 IoT 中心上传是成功还是失败

例如:

// Open the pipeline
const pipeline = newPipeline(new AnonymousCredential(), {
retryOptions: { maxTries: 4 },
telemetry: { value: 'HighLevelSample V1.0.0' }, // Customized telemetry string
keepAliveOptions: { enable: false }
});

// Construct the blob URL
const { hostName, containerName, blobName, sasToken } = blobInfo;
const blobUrl = `https://${hostName}/${containerName}/${blobName}${sasToken}`;

// Create the BlockBlobClient for file upload to Blob Storage
const blobClient = new BlockBlobClient(blobUrl, pipeline);

// Setup blank status notification arguments to be filled in on success/failure
let isSuccess;
let statusCode;
let statusDescription;

const uploadStatus = await blobClient.uploadFile(localFilePath);
console.log('uploadStreamToBlockBlob success');

  try {
    const uploadStatus = await blobClient.uploadFile(localFilePath);
    console.log('uploadStreamToBlockBlob success');

    // Save successful status notification arguments
    isSuccess = true;
    statusCode = uploadStatus._response.status;
    statusDescription = uploadStatus._response.bodyAsText;

    // Notify IoT hub of upload to blob status (success)
    console.log('notifyBlobUploadStatus success');
  }
  catch (err) {
    isSuccess = false;
    statusCode = err.code;
    statusDescription = err.message;

    console.log('notifyBlobUploadStatus failed');
    console.log(err);
  }

// Send file upload status notification to IoT hub
await client.notifyBlobUploadStatus(blobInfo.correlationId, isSuccess, statusCode, statusDescription);

在后端应用程序中接收文件上传通知

可以创建后端应用程序来检查 IoT 中心服务客户端是否有设备文件上传通知。

若要创建文件上传通知应用程序,请执行以下操作:

  1. 连接到 IoT 中心服务客户端
  2. 检查文件上传通知

连接到 IoT 中心服务客户端

ServiceClient 类包含服务可用于接收文件上传通知的方法。

使用 fromConnectionString 连接到 IoT 中心。 传递 IoT 中心主连接字符串。

const Client = require('azure-iothub').Client;
const connectionString = "{IoT hub primary connection string}";
const serviceClient = Client.fromConnectionString(connectionString);

打开到 IoT 中心的连接。

//Open the connection to IoT hub
serviceClient.open(function (err) {
  if (err) {
    console.error('Could not connect: ' + err.message);
  } else {
    console.log('Service client connected');

检查文件上传通知

若要检查文件上传通知,请执行以下操作:

  1. 调用 getFileNotificationReceiver。 提供收到通知消息时调用的文件上传回调方法的名称。
  2. 在回调方法中处理文件上传通知。

此示例设置 receiveFileUploadNotification 通知回调接收方。 接收方解释文件上传状态信息,并将状态消息输出到控制台。

//Set up the receiveFileUploadNotification notification message callback receiver
serviceClient.getFileNotificationReceiver(function receiveFileUploadNotification(err, receiver){
if (err) {
  console.error('error getting the file notification receiver: ' + err.toString());
} else {
  receiver.on('message', function (msg) {
    console.log('File upload from device:')
    console.log(msg.getData().toString('utf-8'));
    receiver.complete(msg, function (err) {
      if (err) {
        console.error('Could not finish the upload: ' + err.message);
      } else {
        console.log('Upload complete');
      }
    });
  });
}

SDK 文件上传示例

此 SDK 包含一个上传到 Blob 高级示例。