适用范围:Python SDK azure-ai-ml v2(最新版)
本文介绍如何使用 Azure 机器学习 Python SDK v2 生成 Azure 机器学习管道 ,以完成包含三个步骤的图像分类任务:准备数据、训练图像分类模型以及为模型评分。 机器学习管道使用速度、可移植性和重用性优化工作流,以便你可以专注于机器学习,而不是基础结构和自动化。
示例管道训练小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。 管道如下所示:
在本文中,你将完成以下任务:
- 为管道作业准备输入数据。
- 创建三个组件来准备数据、训练图像并为模型评分。
- 从组件生成管道。
- 获取对具有计算的工作区的访问权限。
- 提交管道作业。
- 查看组件和训练的神经网络的输出。
- (可选)注册组件,以便在工作区中进一步重复使用和共享。
如果没有 Azure 订阅,请在开始操作前先创建一个免费帐户。 立即试用免费版或付费版 Azure 机器学习。
先决条件
一个 Azure 机器学习工作区。 如果没有资源,请完成 “创建资源”教程。
已安装 Azure 机器学习 Python SDK v2 的 Python 环境。 有关安装说明,请参阅 入门。 此环境用于定义和控制 Azure 机器学习资源,与运行时用于训练的环境分开。
示例存储库的克隆。
若要运行训练示例,请先克隆示例存储库并转到
sdk
目录:git clone --depth 1 https://github.com/Azure/azureml-examples cd azureml-examples/sdk
启动交互式 Python 会话
本文使用 Azure 机器学习 Python SDK 创建和控制 Azure 机器学习管道。 本文基于假设你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。
本文基于 image_classification_keras_minist_convnet.ipynb 笔记本,可在 Azure 机器学习示例存储库的目录中找到sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet
该笔记本。
导入所需的库
导入本文所需的所有 Azure 机器学习库:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
为管道作业准备输入数据
需要为图像分类管道准备输入数据。
Fashion MNIST 是一个时尚图像数据集,分为 10 个类。 每个图像都是 28 x 28 灰度图像。 有 60,000 个训练图像和 10,000 个测试图像。 作为图像分类问题,时尚 MNIST 比经典 MNIST 手写数字数据库更具挑战性。 它以与原始手写数字数据库相同的压缩二进制形式分发。
通过定义 Input
,可以创建对数据源位置的引用。 数据会保留在其现有位置,因此不会产生额外的存储成本。
创建用于生成管道的组件
图像分类任务可以拆分为三个步骤:准备数据、训练模型和对模型评分。
Azure 机器学习组件是一个自包含的代码片段,可完成机器学习管道中的一个步骤。 在本文中,你将为图像分类任务创建三个组件:
- 为其准备训练和测试数据。
- 使用训练数据训练神经网络进行图像分类。
- 使用测试数据为模型评分。
对于每个组件,需要完成以下步骤:
准备包含执行逻辑的 Python 脚本。
定义组件的接口。
添加组件的其他元数据,包括运行时环境和用于运行组件的命令。
下一部分介绍如何通过两种方式创建组件。 对于前两个组件,请使用 Python 函数。 对于第三个组件,请使用 YAML 定义。
创建数据准备组件
此管道中的第一个组件将压缩数据文件 fashion_ds
转换为两个 .csv 文件,一个用于训练,另一个用于评分。 使用 Python 函数定义此组件。
如果您正在使用 Azure 机器学习示例库中的示例,文件夹 prep
中已经有源文件。 此文件夹包含两个用于构造组件的文件: prep_component.py
用于定义组件,以及 conda.yaml
定义组件的运行时环境。
使用 Python 函数定义组件
command_component()
通过使用函数作为修饰器,可以轻松定义组件的接口、它的元数据以及从 Python 函数运行的代码。 修饰的每个 Python 函数将转换为管道服务可以处理的单个静态规范 (YAML)。
# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="prep_data",
version="1",
display_name="Prep Data",
description="Convert data to CSV file, and split to training and test data",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def prepare_data_component(
input_data: Input(type="uri_folder"),
training_data: Output(type="uri_folder"),
test_data: Output(type="uri_folder"),
):
convert(
os.path.join(input_data, "train-images-idx3-ubyte"),
os.path.join(input_data, "train-labels-idx1-ubyte"),
os.path.join(training_data, "mnist_train.csv"),
60000,
)
convert(
os.path.join(input_data, "t10k-images-idx3-ubyte"),
os.path.join(input_data, "t10k-labels-idx1-ubyte"),
os.path.join(test_data, "mnist_test.csv"),
10000,
)
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
前面的代码使用修饰器定义具有显示名称 Prep Data
的 @command_component
组件:
name
是组件的唯一标识符。version
是组件的当前版本。 一个组件可以有多个版本。display_name
是 UI 组件的友好显示名称。 它并不唯一。description
通常描述组件可以完成的任务。environment
指定组件的运行时环境。 此组件的环境指定 Docker 映像并引用conda.yaml
该文件。该文件
conda.yaml
包含用于组件的所有包:name: imagekeras_prep_conda_env channels: - defaults dependencies: - python=3.7.11 - pip=20.0 - pip: - mldesigner==0.1.0b4
prepare_data_component
函数定义input_data
的一个输入,并定义training_data
和test_data
的两个输出。input_data
是输入数据路径。training_data
和test_data
是训练数据和测试数据的输出数据路径。该组件将数据从
input_data
.csv 转换为training_data
训练数据,并将test_data
.csv 转换为测试数据。
这是组件在工作室 UI 中的外观:
- 组件是管道图形中的一个块。
input_data
、training_data
和test_data
是组件的端口,用于连接到其他组件进行数据流传送。
现在,你已为 Prep Data
组件准备所有源文件。
创建模型训练组件
在本部分中,你将创建一个组件,用于在 Python 函数中训练图像分类模型,就像对 Prep Data
组件一样。
由于训练逻辑更为复杂,因此你将训练代码放在单独的 Python 文件中。
此组件的源文件位于 train
Azure 机器学习示例存储库中的文件夹中。 此文件夹包含三个用于构造组件的文件:
train.py
包含用于训练模型的逻辑。train_component.py
定义组件的接口并导入在其中train.py
的函数。conda.yaml
定义组件的运行时环境。
获取包含逻辑的脚本
该文件 train.py
包含一个正常的 Python 函数,用于为图像分类训练 Keras 神经网络执行逻辑。 若要查看代码,请参阅 GitHub 上的 train.py 文件。
使用 Python 函数定义组件
定义训练函数后,可以在 @command_component
Azure 机器学习 SDK v2 中使用,将函数包装为可在 Azure 机器学习管道中使用的组件:
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="train_image_classification_keras",
version="1",
display_name="Train Image Classification Keras",
description="train image classification with keras",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def keras_train_component(
input_data: Input(type="uri_folder"),
output_model: Output(type="uri_folder"),
epochs=10,
):
# avoid dependency issue, execution logic is in train() func in train.py file
from train import train
train(input_data, output_model, epochs)
前面的代码通过使用@command_component
定义了一个组件,其显示名称是Train Image Classification Keras
。
- 该
keras_train_component
函数定义一个输入,input_data
用于源训练数据,一个输入,epochs
指定训练期间要使用的纪元数,以及一个输出,output_model
指定模型文件的输出路径。epochs
的默认值为 10。 此组件的逻辑来自train()
train.py 中的函数。
训练模型组件的配置比准备数据组件要复杂一些。 conda.yaml
如下所示:
name: imagekeras_train_conda_env
channels:
- defaults
dependencies:
- python=3.8
- pip=20.2
- pip:
- mldesigner==0.1.0b12
- azureml-mlflow==1.50.0
- tensorflow==2.7.0
- numpy==1.21.4
- scikit-learn==1.0.1
- pandas==1.3.4
- matplotlib==3.2.2
- protobuf==3.20.0
现在,你已准备好组件 Train Image Classification Keras
的所有源文件。
创建模型评分组件
在本部分中,你将创建一个组件,以便通过 YAML 规范和脚本为训练的模型评分。
如果您正在使用 Azure 机器学习示例库中的示例,文件夹 score
中已经有源文件。 此文件夹包含三个用于构造组件的文件:
score.py
包含组件的源代码。score.yaml
定义组件的接口和其他详细信息。conda.yaml
定义组件的运行时环境。
获取包含逻辑的脚本
该文件 score.py
包含执行训练模型逻辑的普通 Python 函数:
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow
def get_file(f):
f = Path(f)
if f.is_file():
return f
else:
files = list(f.iterdir())
if len(files) == 1:
return files[0]
else:
raise Exception("********This path contains more than one file*******")
def parse_args():
# setup argparse
parser = argparse.ArgumentParser()
# add arguments
parser.add_argument(
"--input_data", type=str, help="path containing data for scoring"
)
parser.add_argument(
"--input_model", type=str, default="./", help="input path for model"
)
parser.add_argument(
"--output_result", type=str, default="./", help="output path for model"
)
# parse args
args = parser.parse_args()
# return args
return args
def score(input_data, input_model, output_result):
test_file = get_file(input_data)
data_test = pd.read_csv(test_file, header=None)
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
# Read test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_test = (
X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
# Load model
files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
model = load_model(input_model + "/" + files[0])
# Log metrics of the model
eval = model.evaluate(X_test, y_test, verbose=0)
mlflow.log_metric("Final test loss", eval[0])
print("Test loss:", eval[0])
mlflow.log_metric("Final test accuracy", eval[1])
print("Test accuracy:", eval[1])
# Score model using test data
y_predict = model.predict(X_test)
y_result = np.argmax(y_predict, axis=1)
# Output result
np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")
def main(args):
score(args.input_data, args.input_model, args.output_result)
# run script
if __name__ == "__main__":
# parse args
args = parse_args()
# call main function
main(args)
中的 score.py
代码采用三个命令行参数: input_data
、 input_model
和 output_result
。 程序使用输入数据为输入模型评分,然后输出结果。
通过 YAML 定义组件
在本部分中,你将了解如何以有效的 YAML 组件规范格式创建组件规范。 此文件指定以下信息:
- 元数据。 名称、显示名称、版本、类型等。
- 接口。 输入和输出。
- 命令、代码和环境。 用于运行组件的命令、代码和环境。
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
input_data:
type: uri_folder
input_model:
type: uri_folder
outputs:
output_result:
type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
conda_file: ./conda.yaml
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
name
是组件的唯一标识符。 其显示名称为Score Image Classification Keras
。- 此组件有两个输入和一个输出。
- 源代码路径在
code
节中定义。 在云中运行组件时,该路径中的所有文件都将作为组件的快照上传。 - 该
command
节指定要在组件运行时执行的命令。 - 该
environment
部分包含 Docker 映像和 conda YAML 文件。 源文件位于示例存储库中。
现在,你拥有模型评分组件的所有源文件。
加载组件以生成管道
可以导入由 Python 函数定义的数据准备组件和模型训练组件,就像普通 Python 函数一样。
以下代码分别从prep_component.py
文件夹中的文件prep
和train_component
文件夹中的文件train
导入prepare_data_component()
和keras_train_component()
函数。
%load_ext autoreload
%autoreload 2
# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component
# print hint of components
help(prepare_data_component)
help(keras_train_component)
可以使用函数 load_component()
加载 YAML 定义的分数组件。
# load component function from yaml
keras_score_component = load_component(source="./score/score.yaml")
构建您的管道
已创建并加载所有组件和输入数据以生成管道。 现在可以将它们组合成管道:
注意
若要使用 无服务器计算,请添加到 from azure.ai.ml.entities import ResourceConfiguration
文件的顶部。
然后替换:
- 带
default_compute=cpu_compute_target
的default_compute="serverless"
。 - 带
train_node.compute = gpu_compute_target
的train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2)
。
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
"""E2E image classification pipeline with keras using python sdk."""
prepare_data_node = prepare_data_component(input_data=pipeline_input_data)
train_node = keras_train_component(
input_data=prepare_data_node.outputs.training_data
)
train_node.compute = gpu_compute_target
score_node = keras_score_component(
input_data=prepare_data_node.outputs.test_data,
input_model=train_node.outputs.output_model,
)
# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=mnist_ds)
此管道具有默认计算 cpu_compute_target
。 如果未指定特定节点的计算,该节点将在默认计算上运行。
此管道具有管道级输入 pipeline_input_data
。 在提交管道作业时,可以为管道输入设置一个值。
管道包含三个节点:prepare_data_node
和train_node
score_node
。
input_data
的prepare_data_node
使用的值为pipeline_input_data
。train_node
的input_data
是prepare_data_node
的training_data
输出。score_node
的input_data
是prepare_data_node
的test_data
输出,input_model
是train_node
的output_model
。由于
train_node
训练 CNN 模型,因此你可以将其计算指定为gpu_compute_target
. 这样做可以提高训练性能。
提交管道作业
现在已经构建了管道,可以将作业提交到工作区。 若要提交作业,首先需要连接到工作区。
获取工作区访问权限
配置凭据
您将使用 DefaultAzureCredential
来访问工作区。 DefaultAzureCredential
应该能够处理大多数 Azure SDK 身份验证方案。
如果 DefaultAzureCredential
不起作用,请参阅 此配置凭据示例 和 标识包。
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
获取具有计算的工作区的句柄
创建用于 MLClient
管理 Azure 机器学习服务的对象。 如果使用 无服务器计算,则无需创建这些计算。
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)
# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))
重要
此代码片段要求工作区配置 JSON 文件保存在当前目录或其父目录中。 若要详细了解如何创建工作区,请参阅创建工作区资源。 有关将配置保存到文件的详细信息,请参阅 “创建工作区配置文件”。
将管道作业提交到工作区
现在,你已拥有工作区的句柄,可以提交管道作业了:
pipeline_job = ml_client.jobs.create_or_update(
pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job
前面的代码将此图像分类管道作业提交到名为pipeline_samples
的实验。 如果实验不存在,它会自动创建试验。 pipeline_input_data
使用 fashion_ds
。
提交试验的调用很快就完成了。 它生成如下所示的输出:
实验 | 名称 | 类型 | 状态 | 详细信息页 |
---|---|---|---|---|
pipeline_samples |
sharp_pipe_4gvqx6h1fb | 管道 | 准备 | Azure 机器学习工作室链接。 |
您可以选择该链接来监视管道的运行。 或者,可以通过运行以下代码阻止它,直到它完成:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)
重要
首次管道运行大约需要 15 分钟。 下载所有依赖项,创建 Docker 映像,并预配并创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时取决于脚本的工作负荷以及每个管道步骤中运行的进程。
在 UI 中检查输出并调试管道
可以选择 Link to Azure Machine Learning studio
,这是管道的作业详细信息页面。 你将看到管道图:
可以通过右键单击组件来检查每个组件的日志和输出,或选择组件以打开其详细信息窗格。 若要详细了解如何在 UI 中调试管道,请参阅 使用 Azure 机器学习工作室调试管道故障。
(可选)将组件注册到工作区
在上一部分中,你将使用三个组件生成管道来完成图像分类任务。 还可以将组件注册到工作区,以便可以在工作区中共享和重复使用它们。 以下示例演示如何注册数据准备组件:
try:
# try get back the component
prep = ml_client.components.get(name="prep_data", version="1")
except:
# if not exists, register component using following code
prep = ml_client.components.create_or_update(prepare_data_component)
# list all components registered in workspace
for c in ml_client.components.list():
print(c)
您可以使用 ml_client.components.get()
根据名称和版本获取已注册的组件。 可用于 ml_client.components.create_or_update()
注册以前从 Python 函数或 YAML 加载的组件。
后续步骤
- 有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库。
- 有关使用工作室 UI 提交和调试管道的信息,请参阅 在 Azure 机器学习工作室中使用组件创建和运行机器学习管道。
- 有关使用 Azure 机器学习 CLI 创建组件和管道的信息,请参阅 通过 Azure 机器学习 CLI 使用组件创建和运行机器学习管道。
- 有关使用批处理终结点将管道部署到生产环境的信息,请参阅 如何使用批处理终结点部署管道。