流分析中的机器学习工作室(经典)集成
重要
对 Azure 机器学习工作室(经典)的支持将于 2024 年 8 月 31 日结束。 建议在该日期之前转换到 Azure 机器学习。
自 2021 年 12 月 1 日起,无法创建新的机器学习工作室(经典)资源(工作区和 Web 服务计划)。 在 2024 年 8 月 31 日之前,可继续使用现有的机器学习工作室(经典)试验和 Web 服务。 有关详细信息,请参阅:
机器学习工作室(经典)文档即将停用,并且将来可能不会更新。
Azure 流分析支持用于调用 Azure 机器学习工作室(经典)终结点的用户定义函数 (UDF)。 流分析 REST API 库介绍了此功能的 REST API 支持。
本文提供了在流分析中成功实现此功能所需的补充信息。 此外,还附赠了一个教程。
概述:机器学习工作室(经典)术语
机器学习工作室(经典)提供一个协作型拖放式工具,该工具可用于根据数据生成、测试和部署预测分析解决方案。 可以使用机器学习工作室(经典)与这些机器学习资源进行交互:
- 工作区:将所有其他机器学习资源保存在一起以进行管理和控制的容器。
- 试验:数据科学家创建的测试,用于利用数据集和训练机器学习模型。
- 终结点:用于以特征作为输入、应用指定的机器学习模型并返回已评分输出的对象。
- 评分 Web 服务:终结点集合。
每个终结点都具有批处理执行和同步执行的 API。 流分析使用同步执行。 在机器学习工作室(经典)中,将特定的服务命名为请求/响应服务。
流分析作业所需的机器学习工作室(经典)资源
出于流分析作业处理的目的,如要成功执行,请求/响应终结点、API 密钥和 Swagger 定义均为必要项。 流分析提供附加的终结点,用于构造 swagger 终结点的 URL、查找接口并向用户返回默认 UDF 定义。
通过 REST API 配置流分析和机器学习工作室(经典)UDF
使用 REST API,可配置作业来调用机器学习工作室(经典)函数:
- 创建流分析作业。
- 定义输入。
- 定义输出。
- 创建 UDF。
- 编写用于调用 UDF 的流分析转换。
- 启动作业。
创建具有基本属性的 UDF
例如,以下示例代码创建了绑定到机器学习工作室(经典)终结点的名为 newudf 的标量 UDF。 endpoint
值(服务 URI)位于所选服务的 API 帮助页。 apiKey
值位于服务的主页。
PUT : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>
示例请求正文:
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77fb4b46bf2a30c63c078dca/services/b7be5e40fd194258796fb402c1958eaf/execute ",
"apiKey": "replacekeyhere"
}
}
}
}
}
为默认 UDF 调用 RetrieveDefaultDefinition 终结点
创建主干 UDF 后,需要 UDF 的完整定义。 RetrieveDefaultDefinition
终结点可帮助获取绑定到机器学习工作室(经典)终结点的标量函数的默认定义。
以下有效负载要求获取绑定到工作室(经典)终结点的标量函数的默认 UDF 定义。 该负载未指定实际终结点,因为 PUT
请求已提供。
如果请求明确提供终结点,流分析会从请求调用终结点。 否则,流分析会使用最初引用的终结点。 在这里,UDF 采用单个字符串参数(一个句子)并返回一个 string
类型的输出,该输出指示该句子的 Sentiment
标签。
POST : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>/RetrieveDefaultDefinition?api-version=<apiVersion>
示例请求正文:
{
"bindingType": "Microsoft.MachineLearning/WebService",
"bindingRetrievalProperties": {
"executeEndpoint": null,
"udfType": "Scalar"
}
}
此请求的输出类似于以下示例:
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"inputs": [{
"dataType": "nvarchar(max)",
"isConfigurationParameter": null
}],
"output": {
"dataType": "nvarchar(max)"
},
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
"apiKey": null,
"inputs": {
"name": "input1",
"columnNames": [{
"name": "tweet",
"dataType": "string",
"mapTo": 0
}]
},
"outputs": [{
"name": "Sentiment",
"dataType": "string"
}],
"batchSize": 10
}
}
}
}
}
根据响应修补 UDF
现在,必须使用上一个响应修补 UDF。
PATCH : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>
请求正文(输出来自 RetrieveDefaultDefinition
):
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"inputs": [{
"dataType": "nvarchar(max)",
"isConfigurationParameter": null
}],
"output": {
"dataType": "nvarchar(max)"
},
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
"apiKey": null,
"inputs": {
"name": "input1",
"columnNames": [{
"name": "tweet",
"dataType": "string",
"mapTo": 0
}]
},
"outputs": [{
"name": "Sentiment",
"dataType": "string"
}],
"batchSize": 10
}
}
}
}
}
实现流分析转换以调用 UDF
针对每个输入事件查询 UDF(此处名为 scoreTweet
)并将该事件的响应写入到输入:
{
"name": "transformation",
"properties": {
"streamingUnits": null,
"query": "select *,scoreTweet(Tweet) TweetSentiment into blobOutput from blobInput"
}
}
获取帮助
如需进一步的帮助,请尝试 Azure 流分析的 Microsoft Q&A 页。