流分析中的机器学习工作室(经典)集成

重要

对 Azure 机器学习工作室(经典)的支持将于 2024 年 8 月 31 日结束。 建议在该日期之前转换到 Azure 机器学习

自 2021 年 12 月 1 日起,无法创建新的机器学习工作室(经典)资源(工作区和 Web 服务计划)。 在 2024 年 8 月 31 日之前,可继续使用现有的机器学习工作室(经典)试验和 Web 服务。 有关详细信息,请参阅:

机器学习工作室(经典)文档即将停用,并且将来可能不会更新。

Azure 流分析支持用于调用 Azure 机器学习工作室(经典)终结点的用户定义函数 (UDF)。 流分析 REST API 库介绍了此功能的 REST API 支持。

本文提供了在流分析中成功实现此功能所需的补充信息。 此外,还附赠了一个教程

概述:机器学习工作室(经典)术语

机器学习工作室(经典)提供一个协作型拖放式工具,该工具可用于根据数据生成、测试和部署预测分析解决方案。 可以使用机器学习工作室(经典)与这些机器学习资源进行交互:

  • 工作区:将所有其他机器学习资源保存在一起以进行管理和控制的容器。
  • 试验:数据科学家创建的测试,用于利用数据集和训练机器学习模型。
  • 终结点:用于以特征作为输入、应用指定的机器学习模型并返回已评分输出的对象。
  • 评分 Web 服务:终结点集合。

每个终结点都具有批处理执行和同步执行的 API。 流分析使用同步执行。 在机器学习工作室(经典)中,将特定的服务命名为请求/响应服务

流分析作业所需的机器学习工作室(经典)资源

出于流分析作业处理的目的,如要成功执行,请求/响应终结点、API 密钥和 Swagger 定义均为必要项。 流分析提供附加的终结点,用于构造 swagger 终结点的 URL、查找接口并向用户返回默认 UDF 定义。

通过 REST API 配置流分析和机器学习工作室(经典)UDF

使用 REST API,可配置作业来调用机器学习工作室(经典)函数:

  1. 创建流分析作业。
  2. 定义输入。
  3. 定义输出。
  4. 创建 UDF。
  5. 编写用于调用 UDF 的流分析转换。
  6. 启动作业。

创建具有基本属性的 UDF

例如,以下示例代码创建了绑定到机器学习工作室(经典)终结点的名为 newudf 的标量 UDF。 endpoint 值(服务 URI)位于所选服务的 API 帮助页。 apiKey 值位于服务的主页。

PUT : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>

示例请求正文:

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77fb4b46bf2a30c63c078dca/services/b7be5e40fd194258796fb402c1958eaf/execute ",
                        "apiKey": "replacekeyhere"
                    }
                }
            }
        }
    }

为默认 UDF 调用 RetrieveDefaultDefinition 终结点

创建主干 UDF 后,需要 UDF 的完整定义。 RetrieveDefaultDefinition 终结点可帮助获取绑定到机器学习工作室(经典)终结点的标量函数的默认定义。

以下有效负载要求获取绑定到工作室(经典)终结点的标量函数的默认 UDF 定义。 该负载未指定实际终结点,因为 PUT 请求已提供。

如果请求明确提供终结点,流分析会从请求调用终结点。 否则,流分析会使用最初引用的终结点。 在这里,UDF 采用单个字符串参数(一个句子)并返回一个 string 类型的输出,该输出指示该句子的 Sentiment 标签。

POST : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>/RetrieveDefaultDefinition?api-version=<apiVersion>

示例请求正文:

    {
        "bindingType": "Microsoft.MachineLearning/WebService",
        "bindingRetrievalProperties": {
            "executeEndpoint": null,
            "udfType": "Scalar"
        }
    }

此请求的输出类似于以下示例:

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "inputs": [{
                    "dataType": "nvarchar(max)",
                    "isConfigurationParameter": null
                }],
                "output": {
                    "dataType": "nvarchar(max)"
                },
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
                        "apiKey": null,
                        "inputs": {
                            "name": "input1",
                            "columnNames": [{
                                "name": "tweet",
                                "dataType": "string",
                                "mapTo": 0
                            }]
                        },
                        "outputs": [{
                            "name": "Sentiment",
                            "dataType": "string"
                        }],
                        "batchSize": 10
                    }
                }
            }
        }
    }

根据响应修补 UDF

现在,必须使用上一个响应修补 UDF。

PATCH : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>

请求正文(输出来自 RetrieveDefaultDefinition):

    {
        "name": "newudf",
        "properties": {
            "type": "Scalar",
            "properties": {
                "inputs": [{
                    "dataType": "nvarchar(max)",
                    "isConfigurationParameter": null
                }],
                "output": {
                    "dataType": "nvarchar(max)"
                },
                "binding": {
                    "type": "Microsoft.MachineLearning/WebService",
                    "properties": {
                        "endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
                        "apiKey": null,
                        "inputs": {
                            "name": "input1",
                            "columnNames": [{
                                "name": "tweet",
                                "dataType": "string",
                                "mapTo": 0
                            }]
                        },
                        "outputs": [{
                            "name": "Sentiment",
                            "dataType": "string"
                        }],
                        "batchSize": 10
                    }
                }
            }
        }
    }

实现流分析转换以调用 UDF

针对每个输入事件查询 UDF(此处名为 scoreTweet)并将该事件的响应写入到输入:

    {
        "name": "transformation",
        "properties": {
            "streamingUnits": null,
            "query": "select *,scoreTweet(Tweet) TweetSentiment into blobOutput from blobInput"
        }
    }

获取帮助

如需进一步的帮助,请尝试 Azure 流分析的 Microsoft Q&A 页

后续步骤