教程:生成用于图像分类的 Azure 机器学习管道
注意
有关使用 SDK v2 生成管道的教程,请参阅教程:在 Jupyter Notebook 中通过 Python SDK v2 使用用于生产 ML 工作流的 ML 管道。
本教程介绍如何生成 Azure 机器学习管道来准备数据和训练机器学习模型。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。
该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。
在本教程中,请完成以下任务:
- 配置工作区
- 创建试验来保存工作
- 预配 ComputeTarget 以执行该工作
- 创建用于存储压缩数据的数据集
- 创建管道步骤以准备要训练的数据
- 定义执行训练的运行时环境
- 创建管道步骤以定义神经网络并执行训练
- 通过管道步骤撰写管道
- 在试验中运行管道
- 查看步骤的输出和经训练的神经网络
- 注册模型供进一步使用
如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅。
先决条件
- 如果还没有 Azure 机器学习工作区,请完成创建帮助入门的资源。
- 已在其中安装
azureml-core
和azureml-pipeline
包的 Python 环境。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。
重要
目前,与 azureml-pipeline
兼容的最新 Python 版本是 Python 3.8。 如果在安装 azureml-pipeline
包时遇到困难,请确保 python --version
是兼容版本。 有关说明,请参阅 Python 虚拟环境管理器(venv
、conda
等)的文档。
启动交互式 Python 会话
本教程使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本教程假定你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。
- 本教程基于 Azure 机器学习示例 存储库的
python-sdk/tutorial/using-pipelines
目录中的image-classification.ipynb
笔记本。 步骤本身的源代码位于keras-mnist-fashion
子目录中。
导入类型
导入本教程所需的所有 Azure 机器学习类型:
import os
import azureml.core
from azureml.core import (
Workspace,
Experiment,
Dataset,
Datastore,
ComputeTarget,
Environment,
ScriptRunConfig
)
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
# check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
Azure 机器学习 SDK 版本应为 1.37 或更高版本。 如果不是,请使用 pip install --upgrade azureml-core
进行升级。
配置工作区
从现有的 Azure 机器学习工作区创建工作区对象。
workspace = Workspace.from_config()
为管道创建基础结构
创建一个 Experiment
对象来保存管道运行的结果:
exp = Experiment(workspace=workspace, name="keras-mnist-fashion")
创建一个 ComputeTarget
,表示管道将在其上运行的计算机资源。 即使在基于 CPU 的计算机上,本教程中使用的简单神经网络也只需几分钟即可完成训练。 如果要使用 GPU 进行训练,请将 use_gpu
设置为 True
。 预配计算目标通常需要大约五分钟。
use_gpu = False
# choose a name for your cluster
cluster_name = "gpu-cluster" if use_gpu else "cpu-cluster"
found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if cluster_name in cts and cts[cluster_name].type == "AmlCompute":
found = True
print("Found existing compute target.")
compute_target = cts[cluster_name]
if not found:
print("Creating a new compute target...")
compute_config = AmlCompute.provisioning_configuration(
vm_size= "STANDARD_NC6" if use_gpu else "STANDARD_D2_V2"
# vm_priority = 'lowpriority', # optional
max_nodes=4,
)
# Create the cluster.
compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min_node_count is provided, it will use the scale settings for the cluster.
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=10
)
# For a more detailed view of current AmlCompute status, use get_status().print(compute_target.get_status().serialize())
备注
GPU 可用性取决于 Azure 订阅的配额和 Azure 容量。 请参阅管理和增大 Azure 机器学习资源的配额。
为 Azure 存储的数据创建数据集
Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。
若要创建引用基于 Web 的数据的 Dataset
,请运行:
data_urls = ["https://data4mldemo6150520719.blob.core.chinacloudapi.cn/demo/mnist-fashion"]
fashion_ds = Dataset.File.from_files(data_urls)
# list the files referenced by fashion_ds
print(fashion_ds.to_path())
此代码将快速完成。 基础数据保留在 data_urls
数组中指定的 Azure 存储资源中。
创建数据准备管道步骤
此管道的第一步是将 fashion_ds
的压缩数据文件转换为你自己的工作区中的数据集,其中包含可供训练使用的 CSV 文件。 向工作区注册后,你的协作者可以访问此数据进行自己的分析、训练等
datastore = workspace.get_default_datastore()
prepared_fashion_ds = OutputFileDatasetConfig(
destination=(datastore, "outputdataset/{run-id}")
).register_on_complete(name="prepared_fashion_ds")
上述代码指定了一个基于管道步骤输出的数据集。 基础已处理文件将放入工作区的默认数据存储的 blob 存储中,位于 destination
中指定的路径。 数据集将在名为 prepared_fashion_ds
的工作区中注册。
创建管道步骤的源
到目前为止,执行的代码已创建并控制了 Azure 资源。 现在,可以编写在域中执行第一步的代码。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已作为 keras-mnist-fashion/prepare.py
提供。
如果是从头开始操作,请创建名为 keras-mnist-fashion/
的子目录。 创建一个新文件,将以下代码添加到其中,并将文件命名为 prepare.py
。
# prepare.py
# Converts MNIST-formatted files at the passed-in input path to a passed-in output path
import os
import sys
# Conversion routine for MNIST binary format
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
# The MNIST-formatted source
mounted_input_path = sys.argv[1]
# The output directory at which the outputs will be written
mounted_output_path = sys.argv[2]
# Create the output directory
os.makedirs(mounted_output_path, exist_ok=True)
# Convert the training data
convert(
os.path.join(mounted_input_path, "mnist-fashion/train-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/train-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_train.csv"),
60000,
)
# Convert the test data
convert(
os.path.join(mounted_input_path, "mnist-fashion/t10k-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/t10k-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_test.csv"),
10000,
)
prepare.py
中的代码采用两个命令行参数:第一个分配给 mounted_input_path
,第二个分配给 mounted_output_path
。 如果该子目录不存在,则调用 os.makedirs
会创建该目录。 然后,程序将转换训练和测试数据,并将逗号分隔的文件输出到 mounted_output_path
。
指定管道步骤
返回用于指定管道的 Python 环境,运行以下代码为准备代码创建 PythonScriptStep
:
script_folder = "./keras-mnist-fashion"
prep_step = PythonScriptStep(
name="prepare step",
script_name="prepare.py",
# On the compute target, mount fashion_ds dataset as input, prepared_fashion_ds as output
arguments=[fashion_ds.as_named_input("fashion_ds").as_mount(), prepared_fashion_ds],
source_directory=script_folder,
compute_target=compute_target,
allow_reuse=True,
)
对 PythonScriptStep
的调用指定在运行管道步骤时:
script_folder
目录中的所有文件都上传到compute_target
- 在这些上传的源文件中,将运行文件
prepare.py
fashion_ds
和prepared_fashion_ds
数据集将装载在compute_target
上,并显示为目录fashion_ds
文件的路径将是prepare.py
的第一个参数。 在prepare.py
中,此参数分配给mounted_input_path
prepared_fashion_ds
的路径将是prepare.py
的第二个参数。 在prepare.py
中,此参数分配给mounted_output_path
- 因为
allow_reuse
是True
,所以在其源文件或输入更改之前,它不会重新运行 - 此
PythonScriptStep
将被命名为prepare step
模块化和重用是管道的主要优势。 Azure 机器学习可自动确定源代码或数据集更改。 如果 allow_reuse
为 True
,则将重用不受影响的步骤的输出,而不会再次重新运行这些步骤。 如果某个步骤依赖于 Azure 机器学习外部可能发生变化的数据源(例如,包含销售数据的 URL),请将 allow_reuse
设置为 False
,在每次运行管道时都运行管道步骤。
创建训练步骤
数据从压缩格式转换为 CSV 文件后,可用于训练卷积神经网络。
创建训练步骤的源
使用较大的管道时,最佳做法是将每个步骤的源代码放在单独的目录(src/prepare/
、src/train/
等)中,但对于本教程,只需在同一 keras-mnist-fashion/
源目录中使用或创建文件 train.py
。
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.utils import to_categorical
from keras.callbacks import Callback
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from azureml.core import Run
# dataset object from the run
run = Run.get_context()
dataset = run.input_datasets["prepared_fashion_ds"]
# split dataset into train and test set
(train_dataset, test_dataset) = dataset.random_split(percentage=0.8, seed=111)
# load dataset into pandas dataframe
data_train = train_dataset.to_pandas_dataframe()
data_test = test_dataset.to_pandas_dataframe()
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
X = np.array(data_train.iloc[:, 1:])
y = to_categorical(np.array(data_train.iloc[:, 0]))
# here we split validation data to optimiza classifier during training
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)
# test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_train = (
X_train.reshape(X_train.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
X_val = X_val.reshape(X_val.shape[0], img_rows, img_cols, 1).astype("float32") / 255
batch_size = 256
num_classes = 10
epochs = 10
# construct neuron network
model = Sequential()
model.add(
Conv2D(
32,
kernel_size=(3, 3),
activation="relu",
kernel_initializer="he_normal",
input_shape=input_shape,
)
)
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation="softmax"))
model.compile(
loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
# start an Azure ML run
run = Run.get_context()
class LogRunMetrics(Callback):
# callback at the end of every epoch
def on_epoch_end(self, epoch, log):
# log a value repeated which creates a list
run.log("Loss", log["loss"])
run.log("Accuracy", log["accuracy"])
history = model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_data=(X_val, y_val),
callbacks=[LogRunMetrics()],
)
score = model.evaluate(X_test, y_test, verbose=0)
# log a single value
run.log("Final test loss", score[0])
print("Test loss:", score[0])
run.log("Final test accuracy", score[1])
print("Test accuracy:", score[1])
plt.figure(figsize=(6, 3))
plt.title("Fashion MNIST with Keras ({} epochs)".format(epochs), fontsize=14)
plt.plot(history.history["accuracy"], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot(history.history["loss"], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)
# log an image
run.log_image("Loss v.s. Accuracy", plot=plt)
# create a ./outputs/model folder in the compute target
# files saved in the "./outputs" folder are automatically uploaded into run history
os.makedirs("./outputs/model", exist_ok=True)
# serialize NN architecture to JSON
model_json = model.to_json()
# save model JSON
with open("./outputs/model/model.json", "w") as f:
f.write(model_json)
# save model weights
model.save_weights("./outputs/model/model.h5")
print("model saved in ./outputs/model folder")
ML 开发人员应熟悉这些代码的大部分内容:
- 数据已分区为用于训练的训练集和验证集,以及用于最终评分的单独测试子集
- 输入形状为 28x28x1(仅为 1,因为输入是灰度),一个批中将包含 256 个输入,共有 10 个类
- 训练循环数为 10
- 该模型有三个卷积层,包括最大池化和随机失活,后跟全连接层和 softmax 头
- 该模型适合 10 个循环,然后进行评估
- 模型体系结构写入
outputs/model/model.json
,权重写入outputs/model/model.h5
不过,某些代码特定于 Azure 机器学习。 run = Run.get_context()
检索包含当前服务上下文的 Run
对象。 train.py
源使用此 run
对象通过其名称检索输入数据集(替代 prepare.py
中通过脚本参数数组 argv
检索数据集的代码)。
run
对象还用于在每个循环结束时记录训练进度,并在训练结束时记录损失和准确度随时间变化的图表。
创建训练管道步骤
训练步骤的配置比准备步骤稍微复杂一些。 准备步骤仅使用标准 Python 库。 更常见的是,需要修改运行源代码的运行时环境。
创建具有以下内容的文件 conda_dependencies.yml
:
dependencies:
- python=3.7
- pip:
- azureml-core
- azureml-dataset-runtime
- keras==2.4.3
- tensorflow==2.4.3
- numpy
- scikit-learn
- pandas
- matplotlib
Environment
类表示运行机器学习任务的运行时环境。 将上述规范与训练代码相关联:
keras_env = Environment.from_conda_specification(
name="keras-env", file_path="./conda_dependencies.yml"
)
train_cfg = ScriptRunConfig(
source_directory=script_folder,
script="train.py",
compute_target=compute_target,
environment=keras_env,
)
创建训练步骤本身使用的代码类似于用于创建准备步骤的代码:
train_step = PythonScriptStep(
name="train step",
arguments=[
prepared_fashion_ds.read_delimited_files().as_input(name="prepared_fashion_ds")
],
source_directory=train_cfg.source_directory,
script_name=train_cfg.script,
runconfig=train_cfg.run_config,
)
创建并运行管道
现在,你已指定数据输入和输出并创建了管道的步骤,可以将它们组合到管道中并运行管道:
pipeline = Pipeline(workspace, steps=[prep_step, train_step])
run = exp.submit(pipeline)
你创建的 Pipeline
对象在 workspace
中运行,由指定的准备和训练步骤组成。
备注
此管道有一个简单的依赖项关系图:训练步骤依赖于准备步骤,准备步骤依赖于 fashion_ds
数据集。 生产管道通常具有更复杂的依赖项。 步骤可能依赖于多个上游步骤,早期步骤中的源代码更改可能会产生深远的影响,等等。 Azure 机器学习会为你跟踪这些问题。 你只需传入 steps
数组,Azure 机器学习会负责计算执行图。
对 submit
和 Experiment
的调用很快完成,并生成类似于以下内容的输出:
Submitted PipelineRun 5968530a-abcd-1234-9cc1-46168951b5eb
Link to Azure Machine Learning Portal: https://studio.ml.azure.cn/runs/abc-xyz...
可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:
run.wait_for_completion(show_output=True)
重要
首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。
管道完成后,可以检索在训练步骤中记录的指标:
run.find_step_run("train step")[0].get_metrics()
如果对指标感到满意,可以在工作区中注册模型:
run.find_step_run("train step")[0].register_model(
model_name="keras-model",
model_path="outputs/model/",
datasets=[("train test data", fashion_ds)],
)
清理资源
如果你打算运行其他 Azure 机器学习教程,请不要完成本部分。
停止计算实例
如果使用了计算实例,请在不使用 VM 时将其停止,以降低成本。
在工作区中选择“计算”。
从列表中选择计算实例的名称。
选择“停止” 。
准备好再次使用服务器时,选择“启动” 。
删除所有内容
如果不打算使用已创建的资源,请删除它们,以免产生任何费用:
- 在 Azure 门户的左侧菜单中选择“资源组”。
- 在资源组列表中,选择创建的资源组。
- 选择“删除资源组”。
- 输入资源组名称。 然后选择“删除”。
还可保留资源组,但请删除单个工作区。 显示工作区属性,然后选择“删除”。
后续步骤
在本教程中,你使用了以下类型:
Workspace
代表你的 Azure 机器学习工作区。 它包含:- 包含管道训练运行结果的
Experiment
- 延迟加载 Fashion-MNIST 数据存储中保存的数据的
Dataset
- 表示运行管道步骤的计算机的
ComputeTarget
- 运行管道步骤的运行时环境
Environment
- 将
PythonScriptStep
步骤组合成一个整体的Pipeline
- 对训练过程满意后注册的
Model
- 包含管道训练运行结果的
Workspace
对象包含对本教程中未使用的其他资源(笔记本、终结点等)的引用。 有关详细信息,请参阅什么是 Azure 机器学习工作区?。
OutputFileDatasetConfig
将运行的输出提升为基于文件的数据集。 有关数据集和处理数据的详细信息,请参阅如何访问数据。
有关计算目标和环境的详细信息,请参阅什么是 Azure 机器学习中的计算目标?和什么是 Azure 机器学习环境?
ScriptRunConfig
将 ComputeTarget
和 Environment
与 Python 源文件相关联。 PythonScriptStep
采用该 ScriptRunConfig
并定义其输入和输出,在此管道中这是由 OutputFileDatasetConfig
生成的文件数据集。
有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库。