从部署的用于实时推理的模型中收集生产数据

适用范围:Azure CLI ml 扩展 v2(最新版)Python SDK azure-ai-ml v2(最新版)

在本文中,你将了解如何使用 Azure 机器学习数据收集器从部署到 Azure 机器学习托管联机终结点或 Kubernetes 联机终结点的模型中收集生产推理数据

可以为新的或现有的联机终结点部署启用数据收集。 Azure 机器学习数据收集器在 Azure Blob 存储中记录推理数据。 使用 Python SDK 收集的数据将自动注册为 Azure 机器学习工作区中的数据资产。 此数据资产可用于监控模型。

如果有兴趣为部署到实时终结点的 MLflow 模型收集生产推理数据,请参阅 MLflow 模型的数据收集

先决条件

在按照本文中的步骤操作之前,请确保满足以下先决条件:

  • Azure 基于角色的访问控制 (Azure RBAC) 用于授予对 Azure 机器学习中的操作的访问权限。 若要执行本文中的步骤,必须为用户帐户分配 Azure 机器学习工作区的所有者或参与者角色,或者分配一个允许 Microsoft.MachineLearningServices/workspaces/onlineEndpoints/* 的自定义角色。 有关详细信息,请参阅管理对 Azure 机器学习工作区的访问

执行模型监视的自定义日志记录

使用自定义日志记录的数据收集允许你在任何数据转换之前、期间和之后直接从记分脚本记录 Pandas DataFrame。 使用自定义日志记录时,表格数据将实时记录到工作区 Blob 存储或自定义 Blob 存储容器。 模型监视器可以使用存储中的数据。

使用自定义日志记录代码更新评分脚本

首先,将自定义日志记录代码添加到评分脚本 (score.py)。 对于自定义日志记录,你需要 azureml-ai-monitoring 包。 有关此包的更多信息,请参阅 数据收集 SDK 的 PyPI 页面

  1. 通过将以下行添加到评分脚本的顶部来导入 azureml-ai-monitoring 包:

    from azureml.ai.monitoring import Collector
    
  2. init() 函数中声明数据收集变量(最多五个):

    注意

    如果你对 Collector 对象使用名称 model_inputsmodel_outputs,模型监控系统将自动识别自动注册的数据资产以提供更加无缝的模型监视体验。

    global inputs_collector, outputs_collector
    inputs_collector = Collector(name='model_inputs')          
    outputs_collector = Collector(name='model_outputs')
    

    默认情况下,如果在数据收集过程中失败,Azure 机器学习会引发异常。 (可选)可以使用 on_error 参数指定在发生日志记录失败时要运行的函数。 例如,在以下代码中使用 on_error 参数,Azure 机器学习将记录错误,而不是抛出异常:

    inputs_collector = Collector(name='model_inputs', on_error=lambda e: logging.info("ex:{}".format(e)))
    
  3. run() 函数中,使用 collect() 函数记录评分前后的数据帧。 第一次调用 collect() 返回 context,其中包含稍后关联模型输入和模型输出的信息。

    context = inputs_collector.collect(data) 
    result = model.predict(data)
    outputs_collector.collect(result, context)
    

    注意

    目前,collect() API 仅记录 pandas DataFrame。 如果数据在传递到 collect() 时不在 DataFrame 中,则不会被记录到存储中,系统将报告错误。

以下代码是使用自定义日志记录 Python SDK 的完整评分脚本 (score.py) 的示例。

import pandas as pd
import json
from azureml.ai.monitoring import Collector

def init():
  global inputs_collector, outputs_collector, inputs_outputs_collector

  # instantiate collectors with appropriate names, make sure align with deployment spec
  inputs_collector = Collector(name='model_inputs')                    
  outputs_collector = Collector(name='model_outputs')

def run(data): 
  # json data: { "data" : {  "col1": [1,2,3], "col2": [2,3,4] } }
  pdf_data = preprocess(json.loads(data))
  
  # tabular data: {  "col1": [1,2,3], "col2": [2,3,4] }
  input_df = pd.DataFrame(pdf_data)

  # collect inputs data, store correlation_context
  context = inputs_collector.collect(input_df)

  # perform scoring with pandas Dataframe, return value is also pandas Dataframe
  output_df = predict(input_df) 

  # collect outputs data, pass in correlation_context so inputs and outputs data can be correlated later
  outputs_collector.collect(output_df, context)
  
  return output_df.to_dict()
  
def preprocess(json_data):
  # preprocess the payload to ensure it can be converted to pandas DataFrame
  return json_data["data"]

def predict(input_df):
  # process input and return with outputs
  ...
  
  return output_df

更新评分脚本以记录自定义唯一 ID

除了直接在评分脚本中记录 pandas 数据帧,还可以使用所选的唯一 ID 记录数据。 这些 ID 可以来自应用程序、外部系统,也可以生成它们。 如果未提供自定义 ID(如本部分所述),数据收集器将自动生成唯一的 correlationid,以帮助稍后关联模型的输入和输出。 如果提供自定义 ID,则所记录数据的 correlationid 字段将包含提供的自定义 ID 的值。

  1. 首先完成上一部分中的步骤,然后将以下行添加到评分脚本,导入 azureml.ai.monitoring.context 包:

    from azureml.ai.monitoring.context import BasicCorrelationContext
    
  2. 在评分脚本中,实例化 BasicCorrelationContext 对象,并传入要为该行记录的 id。 建议此 id 是系统中的唯一 ID,以便唯一标识 Blob 存储中的每个记录行。 将此对象作为参数传递到 collect() API 调用:

      # create a context with a custom unique id
      artificial_context = BasicCorrelationContext(id='test')
    
      # collect inputs data, store correlation_context
      context = inputs_collector.collect(input_df, artificial_context)
    
  3. 确保将上下文传入 outputs_collector,以便对模型输入和输出记录相同的唯一 ID,并且稍后可以轻松关联它们:

      # collect outputs data, pass in context so inputs and outputs data can be correlated later
      outputs_collector.collect(output_df, context)
    

以下代码是记录自定义唯一 ID 的完整评分脚本 (score.py) 的示例。

import pandas as pd
import json
from azureml.ai.monitoring import Collector
from azureml.ai.monitoring.context import BasicCorrelationContext

def init():
  global inputs_collector, outputs_collector, inputs_outputs_collector

  # instantiate collectors with appropriate names, make sure align with deployment spec
  inputs_collector = Collector(name='model_inputs')                    
  outputs_collector = Collector(name='model_outputs')

def run(data): 
  # json data: { "data" : {  "col1": [1,2,3], "col2": [2,3,4] } }
  pdf_data = preprocess(json.loads(data))
  
  # tabular data: {  "col1": [1,2,3], "col2": [2,3,4] }
  input_df = pd.DataFrame(pdf_data)

  # create a context with a custom unique id
  artificial_context = BasicCorrelationContext(id='test')

  # collect inputs data, store correlation_context
  context = inputs_collector.collect(input_df, artificial_context)

  # perform scoring with pandas Dataframe, return value is also pandas Dataframe
  output_df = predict(input_df) 

  # collect outputs data, pass in context so inputs and outputs data can be correlated later
  outputs_collector.collect(output_df, context)
  
  return output_df.to_dict()
  
def preprocess(json_data):
  # preprocess the payload to ensure it can be converted to pandas DataFrame
  return json_data["data"]

def predict(input_df):
  # process input and return with outputs
  ...
  
  return output_df

收集数据以监视模型性能

如果要使用收集的数据进行模型性能监视,请务必在此类数据可用时,每个记录的行都有唯一的 correlationid,可用于将数据与地面真实数据相关联。 数据收集器将为每个记录的行自动生成唯一的 correlationid,并将此自动生成的 ID 包含在 JSON 对象的 correlationid 字段中。 有关 JSON 架构的详细信息,请参阅存储 blob 存储中收集的数据

如果要将自己的唯一 ID 用于记录生产数据,建议将此 ID 记录为 Pandas 数据帧中的单独列,因为数据收集器会批处理彼此靠近的请求。 通过将 correlationid 记录为单独的列,它可以在下游随时用于与地面真实数据集成。

更新依赖项

需要先使用基础映像 mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04 和适当的 Conda 依赖项创建环境,才能使用更新的评分脚本创建部署。 因此,可以使用以下 YAML 中的规范构建环境。

channels:
  - conda-forge
dependencies:
  - python=3.8
  - pip=22.3.1
  - pip:
      - azureml-defaults==1.38.0
      - azureml-ai-monitoring~=0.1.0b1
name: model-env

更新部署 YAML

接下来,你需要创建部署 YAML。 若要创建部署 YAML,需要包括 data_collector 属性,并为之前通过自定义日志记录 Python SDK 实例化的 Collector 对象 model_inputsmodel_outputs 启用数据收集:

data_collector:
  collections:
    model_inputs:
      enabled: 'True'
    model_outputs:
      enabled: 'True'

以下代码是用于托管联机终结点部署的综合部署 YAML 的示例。 你应该根据自己的方案来更新部署 YAML。 有关如何为推理数据日志记录设置部署 YAML 格式的更多示例,请参阅 Azure 模型数据收集器示例

$schema: https://azuremlschemas.azureedge.net/latest/managedOnlineDeployment.schema.json
name: blue
endpoint_name: my_endpoint
model: azureml:iris_mlflow_model@latest
environment:
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
  conda_file: model/conda.yaml
code_configuration:
  code: scripts
  scoring_script: score.py
instance_type: Standard_F2s_v2
instance_count: 1
data_collector:
  collections:
    model_inputs:
      enabled: 'True'
    model_outputs:
      enabled: 'True'

(可选)可以为 data_collector 调整以下附加参数:

  • data_collector.rolling_rate:对存储中的数据进行分区的速率。 从以下值中进行选择:MinuteHourDayMonthYear
  • data_collector.sampling_rate:要收集的数据的百分比,以十进制比率表示。 例如,值 1.0 表示收集 100% 的数据。
  • data_collector.collections.<collection_name>.data.name:要与收集的数据一起注册的数据资产的名称。
  • data_collector.collections.<collection_name>.data.path:完整的 Azure 机器学习数据存储路径,其中收集的数据应注册为数据资产。
  • data_collector.collections.<collection_name>.data.version:要与 Blob 存储中收集的数据一起注册的数据资产的版本。

将数据收集到自定义 Blob 存储容器

可以按照以下步骤使用数据收集器将生产推理数据收集到自定义 Blob 存储容器:

  1. 将存储容器连接到 Azure 机器学习数据存储。 有关将存储容器连接到 Azure 机器学习数据存储的详细信息,请参阅创建数据存储

  2. 检查 Azure 机器学习终结点是否具有写入数据存储目标所需的权限。

    数据收集器支持系统分配的托管标识 (SAMI) 和用户分配的托管标识 (UAMI)。 将标识添加到终结点。 使用 Blob 存储容器(用作数据目标)将角色 Storage Blob Data Contributor 分配给此标识。 若要了解如何在 Azure 中使用托管标识,请参阅将 Azure 角色分配给托管标识

  3. 更新部署 YAML 以在每个集合中包含 data 属性。

    • 必需的 data.name 参数,指定要向收集的数据注册的数据资产的名称
    • 必需的 data.path 参数,指定连接到你的 Azure Blob 存储容器的完整的 Azure 机器学习数据存储路径
    • 可选的 data.version 参数,指定数据资产的版本(默认为 1)

    以下 YAML 配置显示了如何在每个集合中包含 data 属性的示例。

    data_collector:
      collections:
        model_inputs:
          enabled: 'True'
          data: 
            name: my_model_inputs_data_asset
            path: azureml://datastores/workspaceblobstore/paths/modelDataCollector/my_endpoint/blue/model_inputs
            version: 1
        model_outputs:
          enabled: 'True'
          data: 
            name: my_model_outputs_data_asset
            path: azureml://datastores/workspaceblobstore/paths/modelDataCollector/my_endpoint/blue/model_outputs 
            version: 1
    

    注意

    还可以使用 data.path 参数来指向不同 Azure 订阅中的数据存储,方法是提供遵循以下格式的路径:azureml://subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore_name>/paths/<path>

使用数据收集创建部署

在启用自定义日志记录的情况下部署模型:

$ az ml online-deployment create -f deployment.YAML

有关如何格式化部署 YAML 以使用托管联机终结点进行数据收集的详细信息,请参阅 CLI (v2) 托管联机部署 YAML 架构

执行有效负载日志记录

除了使用提供的 PythonSDK 自定义日志记录外,你还可以直接收集请求和响应 HTTP 有效负载数据,而无需扩展评分脚本 (score.py)。

  1. 要启用有效负载日志记录,请在部署 YAML 中使用名称 requestresponse

    $schema: http://azureml/sdk-2-0/OnlineDeployment.json
    
    endpoint_name: my_endpoint 
    name: blue 
    model: azureml:my-model-m1:1 
    environment: azureml:env-m1:1 
    data_collector:
       collections:
           request:
               enabled: 'True'
           response:
               enabled: 'True'
    
  2. 在启用有效负载日志记录的情况下部署模型:

    $ az ml online-deployment create -f deployment.YAML
    

使用有效负载日志记录时,不能保证收集的数据为表格格式。 因此,如果你希望在模型监控中使用收集的有效负载数据,则需要提供一个预处理组件来使数据表格化。 如果想要体验无缝模型监控,我们建议你使用自定义日志 Python SDK

使用部署时,收集的数据会流向工作区 Blob 存储。 以下 JSON 代码是收集的 HTTP 请求的示例

{"specversion":"1.0",
"id":"19790b87-a63c-4295-9a67-febb2d8fbce0",
"source":"/subscriptions/d511f82f-71ba-49a4-8233-d7be8a3650f4/resourceGroups/mire2etesting/providers/Microsoft.MachineLearningServices/workspaces/mirmasterenvws/onlineEndpoints/localdev-endpoint/deployments/localdev",
"type":"azureml.inference.request",
"datacontenttype":"application/json",
"time":"2022-05-25T08:59:48Z",
"data":{"data": [  [1,2,3,4,5,6,7,8,9,10], [10,9,8,7,6,5,4,3,2,1]]},
"path":"/score",
"method":"POST",
"contentrange":"bytes 0-59/*",
"correlationid":"f6e806c9-1a9a-446b-baa2-901373162105","xrequestid":"f6e806c9-1a9a-446b-baa2-901373162105"}

以下 JSON 代码是另一个收集的 HTTP 响应的示例

{"specversion":"1.0",
"id":"bbd80e51-8855-455f-a719-970023f41e7d",
"source":"/subscriptions/d511f82f-71ba-49a4-8233-d7be8a3650f4/resourceGroups/mire2etesting/providers/Microsoft.MachineLearningServices/workspaces/mirmasterenvws/onlineEndpoints/localdev-endpoint/deployments/localdev",
"type":"azureml.inference.response",
"datacontenttype":"application/json",
"time":"2022-05-25T08:59:48Z",
"data":[11055.977245525679, 4503.079536107787],
"contentrange":"bytes 0-38/39",
"correlationid":"f6e806c9-1a9a-446b-baa2-901373162105","xrequestid":"f6e806c9-1a9a-446b-baa2-901373162105"}

在 blob 存储中存储收集的数据

数据收集允许将生产推理数据记录到所选的 Blob 存储目标。 数据目标设置可在 collection_name 级别进行配置。

Blob 存储输出/格式

  • 默认情况下,收集的数据存储在工作区 Blob 存储的以下路径中:azureml://datastores/workspaceblobstore/paths/modelDataCollector

  • Blob 中的最终路径将附加 {endpoint_name}/{deployment_name}/{collection_name}/{yyyy}/{MM}/{dd}/{HH}/{instance_id}.jsonl

  • 文件中的每一行都是一个 JSON 对象,表示已记录的单个推理请求/响应。

注意

collection_name 引用数据集合名称(例如 model_inputsmodel_outputs)。 instance_id 是标识记录的数据分组的唯一 ID。

收集的数据遵循以下 JSON 架构。 可从 data关键值中获得收集的数据,并且系统提供附加元数据。

{"specversion":"1.0",
"id":"725aa8af-0834-415c-aaf5-c76d0c08f694",
"source":"/subscriptions/636d700c-4412-48fa-84be-452ac03d34a1/resourceGroups/mire2etesting/providers/Microsoft.MachineLearningServices/workspaces/mirmasterws/onlineEndpoints/localdev-endpoint/deployments/localdev",
"type":"azureml.inference.inputs",
"datacontenttype":"application/json",
"time":"2022-12-01T08:51:30Z",
"data":[{"label":"DRUG","pattern":"aspirin"},{"label":"DRUG","pattern":"trazodone"},{"label":"DRUG","pattern":"citalopram"}],
"correlationid":"3711655d-b04c-4aa2-a6c4-6a90cbfcb73f","xrequestid":"3711655d-b04c-4aa2-a6c4-6a90cbfcb73f",
"modelversion":"default",
"collectdatatype":"pandas.core.frame.DataFrame",
"agent":"monitoring-sdk/0.1.2",
"contentrange":"bytes 0-116/117"}

提示

显示换行符只是为了便于阅读。 在收集的.jsonl 文件中,不会有任何换行符。

存储大型有效负载

如果数据的有效负载大于 4 MB,则 {endpoint_name}/{deployment_name}/request/.../{instance_id}.jsonl 路径中包含的 {instance_id}.jsonl 文件中将有一个事件指向原始文件路径,该文件的路径应该如下:blob_url/{blob_container}/{blob_path}/{endpoint_name}/{deployment_name}/{rolled_time}/{instance_id}.jsonl。 收集的数据将存在于此路径中。

存储二进制数据

对于收集的二进制数据,我们直接显示原始文件,并将 instance_id 作为文件名。 根据 rolling_rate,二进制数据放置在与请求源组路径相同的文件夹中。 以下示例反映数据字段中的路径。 格式为 JSON,显示换行符只是为了便于阅读:

{
"specversion":"1.0",
"id":"ba993308-f630-4fe2-833f-481b2e4d169a",
"source":"/subscriptions//resourceGroups//providers/Microsoft.MachineLearningServices/workspaces/ws/onlineEndpoints/ep/deployments/dp",
"type":"azureml.inference.request",
"datacontenttype":"text/plain",
"time":"2022-02-28T08:41:07Z",
"data":"https://masterws0373607518.blob.core.chinacloudapi.cn/modeldata/mdc/%5Byear%5D%5Bmonth%5D%5Bday%5D-%5Bhour%5D_%5Bminute%5D/ba993308-f630-4fe2-833f-481b2e4d169a",
"path":"/score?size=1",
"method":"POST",
"contentrange":"bytes 0-80770/80771",
"datainblob":"true"
}

数据收集器批处理

如果在彼此的短时间间隔内发送请求,数据收集器会将它们一起批处理到同一 JSON 对象中。 例如,如果运行脚本将示例数据发送到终结点,并且部署已启用数据收集,则某些请求可以一起批处理,具体取决于它们之间的时间间隔。 如果使用数据收集与 Azure 机器学习模型监视,模型监视服务将单独处理每个请求。 但是,如果希望每个记录的数据行都有自己的唯一 correlationid,则可以将 correlationid 作为列包含在使用数据收集器记录的 pandas 数据帧中。 有关如何在 pandas 数据帧中将唯一 correlationid 作为列包含在内的详细信息,请参阅收集数据以监视模型性能

下面是两个已记录的请求的示例,这些请求一起批处理:

{"specversion":"1.0",
"id":"720b8867-54a2-4876-80eb-1fd6a8975770",
"source":"/subscriptions/79a1ba0c-35bb-436b-bff2-3074d5ff1f89/resourceGroups/rg-bozhlinmomoignite/providers/Microsoft.MachineLearningServices/workspaces/momo-demo-ws/onlineEndpoints/credit-default-mdc-testing-4/deployments/main2",
"type":"azureml.inference.model_inputs",
"datacontenttype":"application/json",
"time":"2024-03-05T18:16:25Z",
"data":[{"LIMIT_BAL":502970,"AGE":54,"BILL_AMT1":308068,"BILL_AMT2":381402,"BILL_AMT3":442625,"BILL_AMT4":320399,"BILL_AMT5":322616,"BILL_AMT6":397534,"PAY_AMT1":17987,"PAY_AMT2":78764,"PAY_AMT3":26067,"PAY_AMT4":24102,"PAY_AMT5":-1155,"PAY_AMT6":2154,"SEX":2,"EDUCATION":2,"MARRIAGE":2,"PAY_0":0,"PAY_2":0,"PAY_3":0,"PAY_4":0,"PAY_5":0,"PAY_6":0},{"LIMIT_BAL":293458,"AGE":35,"BILL_AMT1":74131,"BILL_AMT2":-71014,"BILL_AMT3":59284,"BILL_AMT4":98926,"BILL_AMT5":110,"BILL_AMT6":1033,"PAY_AMT1":-3926,"PAY_AMT2":-12729,"PAY_AMT3":17405,"PAY_AMT4":25110,"PAY_AMT5":7051,"PAY_AMT6":1623,"SEX":1,"EDUCATION":3,"MARRIAGE":2,"PAY_0":-2,"PAY_2":-2,"PAY_3":-2,"PAY_4":-2,"PAY_5":-1,"PAY_6":-1}],
"contentrange":"bytes 0-6794/6795",
"correlationid":"test",
"xrequestid":"test",
"modelversion":"default",
"collectdatatype":"pandas.core.frame.DataFrame",
"agent":"azureml-ai-monitoring/0.1.0b4"}

在工作室 UI 中查看数据

要从工作室 UI 查看 Blob 存储中收集的数据,请执行以下操作:

  1. 转到 Azure 机器学习工作区中的“数据”选项卡:

    屏幕截图突出显示了 Azure 机器学习工作区中的“数据”页面

  2. 导航到“数据存储”并选择你的 workspaceblobstore(默认):

    屏幕截图突出显示了 AzureML 工作区中的“数据源”页面

  3. 使用“浏览”菜单查看收集的生产数据:

    截图突出显示了数据存储中数据的树结构

收集 MLflow 模型的数据

如果你将 MLflow 模型部署到 Azure 机器学习联机终结点,则可以通过在工作室 UI 中进行一次切换来启用生产推理数据收集。 如果数据收集已打开,Azure 机器学习使用自定义日志记录代码自动检测你的评分脚本,以确保将生产数据记录到你的工作区 Blob 存储中。 然后,模型监视器可以使用数据来监视生产环境中的 MLflow 模型的性能。

在配置模型的部署时,可以启用生产数据收集。 在“部署”选项卡下,对“数据收集(预览版)”选择“启用”

启用数据收集后,生产推理数据将记录到你的 Azure 机器学习工作区 Blob 存储中,并将创建两个数据资产,名称分别为 <endpoint_name>-<deployment_name>-model_inputs<endpoint_name>-<deployment_name>-model_outputs。 在生产环境中使用部署时,这些数据资产会实时更新。 然后,模型监视器可以使用数据资产来监视生产环境中的模型的性能。