在交互式开发期间从 Azure 云存储访问数据

适用范围:Python SDK azure-ai-ml v2(最新版)

机器学习项目通常从探索性数据分析 (EDA)、数据预处理(清理、特征工程)开始,并包括构建 ML 模型原型来验证假设。 此原型制作项目阶段具有高度交互性,适合在 Jupyter 笔记本或带有 Python 交互式控制台的 IDE 中进行开发。 在本文中,将学习以下内容:

  • 如同在文件系统中一样从 Azure 机器学习数据存储 URI 访问数据。
  • 使用 mltable Python 库将数据具现到 Pandas 中。
  • 使用 mltable Python 库将 Azure 机器学习数据资产具现到 Pandas 中。
  • 使用 azcopy 实用工具通过显式下载具体化数据。

先决条件

提示

本文中的指南介绍了交互式开发期间的数据访问。 它适用于可以运行 Python 会话的任何主机。 这可能包括你的本地计算机、云 VM、GitHub Codespace 等。我们建议使用 Azure 机器学习计算实例 - 一个完全托管且预配置的云工作站。 有关详细信息,请访问创建 Azure 机器学习计算实例

重要

确保在 Python 环境中安装了最新的 azure-fsspecmltableazure-ai-ml Python 库:

pip install -U azureml-fsspec==1.3.1 mltable azure-ai-ml

最新的 azure-fsspec 包版本可能会随时间而变化。 有关 azure-fsspec 包的详细信息,请访问此资源

从数据存储 URI 访问数据,如文件系统

Azure 机器学习数据存储是对现有 Azure 存储帐户的一种称呼。 创建和使用数据存储的优势包括:

  • 可使用常见易用的 API 与不同的存储类型(Blob/Files/ADLS)进行交互。
  • 在团队运营中更轻松地发现有用的数据存储。
  • 支持基于凭据(例如 SAS 令牌)和基于标识(使用 Microsoft Entra ID 或托管标识)访问数据。
  • 对于基于凭据的访问,连接信息受到保护,避免脚本中的密钥泄露。
  • 在工作室 UI 中浏览数据和复制/粘贴数据存储 URI。

数据存储 URI 是一个统一资源标识符,它是对 Azure 存储帐户中存储位置(路径)的引用。 数据存储 URI 采用以下格式:

# Azure Machine Learning workspace details:
subscription = '<subscription_id>'
resource_group = '<resource_group>'
workspace = '<workspace>'
datastore_name = '<datastore>'
path_on_datastore = '<path>'

# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}/paths/{path_on_datastore}'.

这些数据存储 URI 是文件系统规范 (fsspec) 的已知实现:本地、远程和嵌入式文件系统与字节存储的统一 Python 式接口。 首先,请使用 pip 来安装 azureml-fsspec 包及其依赖项 azureml-dataprep 包。 然后,可以使用 Azure 机器学习数据存储 fsspec 实现。

Azure 机器学习数据存储 fsspec 实现会自动处理 Azure 机器学习数据存储使用的凭据/标识直通。 你可以在计算实例上避免脚本中的帐户密钥泄露和额外的登录过程。

例如,可以直接在 Pandas 中使用数据存储 URI。 此示例演示了如何读取 CSV 文件:

import pandas as pd

df = pd.read_csv("azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>/paths/<folder>/<filename>.csv")
df.head()

提示

为了免于记忆数据存储 URI 格式的麻烦,你可以使用以下步骤从工作室 UI 复制并粘贴数据存储 URI:

  1. 在左侧菜单中选择“数据”,然后选择“数据存储”选项卡。
  2. 选择你的数据存储名称,然后选择“浏览”。
  3. 找到要读入 Pandas 的文件/文件夹,然后选择它旁边的省略号 (...)。 在菜单中选择“复制 URI”。 可以选择要复制到笔记本/脚本中的“数据存储 URI”。 突出显示如何复制数据存储 URI 的屏幕截图。

你还可以实例化 Azure 机器学习文件系统,以处理类似于文件系统的命令,如 lsglobexistsopen

  • ls() 方法会列出特定目录中的文件。 可以使用 ls()、ls(.)、ls (<<folder_level_1>/<folder_level_2>) 列出文件。 我们在相对路径中同时支持“.”和“..”。
  • 方法 glob() 支持“*”和“**”通配。
  • 方法 exists() 返回一个布尔值,该值指示当前根目录中是否存在指定的文件。
  • open() 方法会返回一个类似文件的对象,该对象可以传递给任何其他应该使用 Python 文件的库。 代码还可以使用此对象,就像它是一个普通的 Python 文件对象一样。 这些类似于文件的对象遵循 with 上下文的用法,如以下示例中所示:
from azureml.fsspec import AzureMachineLearningFileSystem

# instantiate file system using following URI
fs = AzureMachineLearningFileSystem('azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastore*s*/datastorename')

fs.ls() # list folders/files in datastore 'datastorename'

# output example:
# folder1
# folder2
# file3.csv

# use an open context
with fs.open('./folder1/file1.csv') as f:
    # do some process
    process_file(f)

通过 AzureMachineLearningFileSystem 上传文件

from azureml.fsspec import AzureMachineLearningFileSystem
# instantiate file system using following URI
fs = AzureMachineLearningFileSystem('azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastorename>/paths/')

# you can specify recursive as False to upload a file
fs.upload(lpath='data/upload_files/crime-spring.csv', rpath='data/fsspec', recursive=False, **{'overwrite': 'MERGE_WITH_OVERWRITE'})

# you need to specify recursive as True to upload a folder
fs.upload(lpath='data/upload_folder/', rpath='data/fsspec_folder', recursive=True, **{'overwrite': 'MERGE_WITH_OVERWRITE'})

lpath 是本地路径,rpath 是远程路径。 如果你在 rpath 中指定的文件夹尚不存在,我们将为你创建相应文件夹。

我们支持三种“覆盖”模式:

  • APPEND:如果目标路径中已有同名的文件,APPEND 将保留原始文件
  • FAIL_ON_FILE_CONFLICT:如果目标路径中存在同名的文件,FAIL_ON_FILE_CONFLICT 将引发错误
  • MERGE_WITH_OVERWRITE:如果目标路径中存在同名的文件,MERGE_WITH_OVERWRITE 将用新文件覆盖现有文件

通过 AzureMachineLearningFileSystem 下载文件

# you can specify recursive as False to download a file
# downloading overwrite option is determined by local system, and it is MERGE_WITH_OVERWRITE
fs.download(rpath='data/fsspec/crime-spring.csv', lpath='data/download_files/, recursive=False)

# you need to specify recursive as True to download a folder
fs.download(rpath='data/fsspec_folder', lpath='data/download_folder/', recursive=True)

示例

这些示例演示了在常见方案中使用文件系统规范的情况。

将单个 CSV 文件读取到 Pandas 中

你可以将 单个 CSV 文件读取到 Pandas 中,如下所示:

import pandas as pd

df = pd.read_csv("azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>/paths/<folder>/<filename>.csv")

将包含 CSV 文件的文件夹读取到 Pandas 中

Pandas read_csv() 方法不支持读取 CSV 文件的文件夹。 若要处理此问题,请对 csv 路径进行 glob 操作,并使用 Pandas concat() 方法将它们连接到数据帧。 下一个代码示例展示了如何使用 Azure 机器学习文件系统实现这种串联:

import pandas as pd
from azureml.fsspec import AzureMachineLearningFileSystem

# define the URI - update <> placeholders
uri = 'azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>'

# create the filesystem
fs = AzureMachineLearningFileSystem(uri)

# append csv files in folder to a list
dflist = []
for path in fs.glob('/<folder>/*.csv'):
    with fs.open(path) as f:
        dflist.append(pd.read_csv(f))

# concatenate data frames
df = pd.concat(dflist)
df.head()

将 CSV 文件读入 Dask

此示例展示了如何将 CSV 文件读取到 Dask 数据帧中:

import dask.dd as dd

df = dd.read_csv("azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>/paths/<folder>/<filename>.csv")
df.head()

将包含 Parquet 文件的文件夹读取到 Pandas 中

在 ETL 过程中,Parquet 文件通常会被写入一个文件夹中,然后该文件夹便可以发出与 ETL 相关的文件,例如进度、提交等。本例显示了从 ETL 过程创建的文件(这些文件以 _ 开头),这些文件接着可生成 Parquet 数据文件。

显示 parquet ETL 进程的屏幕截图。

在这些应用场景中,你将只读取文件夹中的 Parquet 文件,而忽略 ETL 过程文件。 此代码示例显示了 glob 模式如何只能读取文件夹中的 Parquet 文件:

import pandas as pd
from azureml.fsspec import AzureMachineLearningFileSystem

# define the URI - update <> placeholders
uri = 'azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>'

# create the filesystem
fs = AzureMachineLearningFileSystem(uri)

# append parquet files in folder to a list
dflist = []
for path in fs.glob('/<folder>/*.parquet'):
    with fs.open(path) as f:
        dflist.append(pd.read_parquet(f))

# concatenate data frames
df = pd.concat(dflist)
df.head()

从 Azure Databricks 文件系统 (dbfs) 访问数据

文件系统规范 (fsspec) 具有一系列已知实现,包括 Databricks 文件系统 (dbfs)。

若要访问 dbfs 资源中的数据,则需要:

获取这些值后,必须在计算实例上为 PAT 令牌创建一个环境变量:

export ADB_PAT=<pat_token>

然后,可以访问 Pandas 中的数据,如以下示例所示:

import os
import pandas as pd

pat = os.getenv(ADB_PAT)
path_on_dbfs = '<absolute_path_on_dbfs>' # e.g. /folder/subfolder/file.csv

storage_options = {
    'instance':'adb-<some-number>.<two digits>.databricks.azure.cn', 
    'token': pat
}

df = pd.read_csv(f'dbfs://{path_on_dbfs}', storage_options=storage_options)

使用 pillow 读取图像

from PIL import Image
from azureml.fsspec import AzureMachineLearningFileSystem

# define the URI - update <> placeholders
uri = 'azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>'

# create the filesystem
fs = AzureMachineLearningFileSystem(uri)

with fs.open('/<folder>/<image.jpeg>') as f:
    img = Image.open(f)
    img.show()

PyTorch 自定义数据集示例

在此示例中,你将创建一个 PyTorch 自定义数据集来处理图像。 我们假设存在 CSV 格式的批注文件,其整体结构如下:

image_path, label
0/image0.png, label0
0/image1.png, label0
1/image2.png, label1
1/image3.png, label1
2/image4.png, label2
2/image5.png, label2

子文件夹将根据所带标签存储这些图像:

/
└── 📁images
    ├── 📁0
    │   ├── 📷image0.png
    │   └── 📷image1.png
    ├── 📁1
    │   ├── 📷image2.png
    │   └── 📷image3.png
    └── 📁2
        ├── 📷image4.png
        └── 📷image5.png

自定义 PyTorch 数据集类必须实现三个函数:__init____len____getitem__,如下所示:

import os
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset

class CustomImageDataset(Dataset):
    def __init__(self, filesystem, annotations_file, img_dir, transform=None, target_transform=None):
        self.fs = filesystem
        f = filesystem.open(annotations_file)
        self.img_labels = pd.read_csv(f)
        f.close()
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        f = self.fs.open(img_path)
        image = Image.open(f)
        f.close()
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

然后,你可以将数据集进行实例化,如下所示:

from azureml.fsspec import AzureMachineLearningFileSystem
from torch.utils.data import DataLoader

# define the URI - update <> placeholders
uri = 'azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<workspace_name>/datastores/<datastore_name>'

# create the filesystem
fs = AzureMachineLearningFileSystem(uri)

# create the dataset
training_data = CustomImageDataset(
    filesystem=fs,
    annotations_file='/annotations.csv', 
    img_dir='/<path_to_images>/'
)

# Prepare your data for training with DataLoaders
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)

使用 mltable 库将数据具体化为 Pandas

mltable 库也可以帮助访问云存储中的数据。 使用 mltable 将数据读取到 Pandas 中时,将采用以下常规格式:

import mltable

# define a path or folder or pattern
path = {
    'file': '<supported_path>'
    # alternatives
    # 'folder': '<supported_path>'
    # 'pattern': '<supported_path>'
}

# create an mltable from paths
tbl = mltable.from_delimited_files(paths=[path])
# alternatives
# tbl = mltable.from_parquet_files(paths=[path])
# tbl = mltable.from_json_lines_files(paths=[path])
# tbl = mltable.from_delta_lake(paths=[path])

# materialize to Pandas
df = tbl.to_pandas_dataframe()
df.head()

支持的路径

mltable 库支持从不同的路径类型读取表格数据:

位置 示例
本地计算机上的路径 ./home/username/data/my_data
公共 http (s) 服务器上的路径 https://raw.githubusercontent.com/pandas-dev/pandas/main/doc/data/titanic.csv
Azure 存储上的路径 wasbs://<container_name>@<account_name>.blob.core.chinacloudapi.cn/<path>
abfss://<file_system>@<account_name>.dfs.core.chinacloudapi.cn/<path>
一个长格式 Azure 机器学习数据存储 azureml://subscriptions/<subid>/resourcegroups/<rgname>/workspaces/<wsname>/datastores/<name>/paths/<path>

备注

mltable 处理 Azure 存储和 Azure 机器学习数据存储上的路径的用户凭据直通。 如果你无权访问底层存储上的数据,那么你将无法访问这些数据。

文件、文件夹和 glob

mltable 支持从以下对象读取:

  • 文件,例如 abfss://<file_system>@<account_name>.dfs.core.chinacloudapi.cn/my-csv.csv
  • 文件夹,例如 abfss://<file_system>@<account_name>.dfs.core.chinacloudapi.cn/my-folder/
  • glob 模式,例如 abfss://<file_system>@<account_name>.dfs.core.chinacloudapi.cn/my-folder/*.csv
  • 文件、文件夹和/或 glob 模式的组合

mltable 的灵活性支持将数据从本地和云存储资源的组合以及文件/文件夹/glob 的组合具体化为单个数据帧。 例如:

path1 = {
    'file': 'abfss://filesystem@account1.dfs.core.chinacloudapi.cn/my-csv.csv'
}

path2 = {
    'folder': './home/username/data/my_data'
}

path3 = {
    'pattern': 'abfss://filesystem@account2.dfs.core.chinacloudapi.cn/folder/*.csv'
}

tbl = mltable.from_delimited_files(paths=[path1, path2, path3])

支持的文件格式

mltable 支持以下文件格式:

  • 带分隔符的文本(例如 CSV 文件):mltable.from_delimited_files(paths=[path])
  • Parquet: mltable.from_parquet_files(paths=[path])
  • Delta:mltable.from_delta_lake(paths=[path])
  • JSON 行格式:mltable.from_json_lines_files(paths=[path])

示例

读取 CSV 文件

使用特定详细信息更新此代码片段中的占位符 (<>):

import mltable

path = {
    'file': 'abfss://<filesystem>@<account>.dfs.core.chinacloudapi.cn/<folder>/<file_name>.csv'
}

tbl = mltable.from_delimited_files(paths=[path])
df = tbl.to_pandas_dataframe()
df.head()

读取文件夹中的 Parquet 文件

此示例演示了 mltable 如何使用 glob 模式(例如通配符)来确保仅读取 Parquet 文件。

使用特定详细信息更新此代码片段中的占位符 (<>):

import mltable

path = {
    'pattern': 'abfss://<filesystem>@<account>.dfs.core.chinacloudapi.cn/<folder>/*.parquet'
}

tbl = mltable.from_parquet_files(paths=[path])
df = tbl.to_pandas_dataframe()
df.head()

读取数据资产

本部分介绍了如何访问 Pandas 中的 Azure 机器学习数据资产。

表资产

如果你以前在 Azure 机器学习中创建了一个表资产(mltable 或 V1 TabularDataset),可以使用以下代码将其加载到 Pandas 中:

import mltable
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())
data_asset = ml_client.data.get(name="<name_of_asset>", version="<version>")

tbl = mltable.load(f'azureml:/{data_asset.id}')
df = tbl.to_pandas_dataframe()
df.head()

文件资产

如果你注册了一个文件资产(例如,一个 CSV 文件),可以使用以下代码将该资产读取到 Pandas 数据帧中:

import mltable
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())
data_asset = ml_client.data.get(name="<name_of_asset>", version="<version>")

path = {
    'file': data_asset.path
}

tbl = mltable.from_delimited_files(paths=[path])
df = tbl.to_pandas_dataframe()
df.head()

文件夹资产

如果你注册了一个文件夹资产(uri_folder 或 V1 FileDataset,例如一个包含 CSV 文件的文件夹),可以使用以下代码将该资产读取到 Pandas 数据帧中:

import mltable
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())
data_asset = ml_client.data.get(name="<name_of_asset>", version="<version>")

path = {
    'folder': data_asset.path
}

tbl = mltable.from_delimited_files(paths=[path])
df = tbl.to_pandas_dataframe()
df.head()

有关使用 Pandas 读取和处理大量数据的说明

提示

Pandas 不是为处理大型数据集而设计的。 Pandas 只能处理计算实例内存可容纳的数据。

对于大型数据集,建议使用 Azure 机器学习托管的 Spark。 这会提供 PySpark Pandas API

在纵向扩展到远程异步作业之前,你可能需要快速迭代大型数据集的较小子集。 mltable 提供内置功能用于通过 take_random_sample 方法获取大型数据的样本:

import mltable

path = {
    'file': 'https://raw.githubusercontent.com/pandas-dev/pandas/main/doc/data/titanic.csv'
}

tbl = mltable.from_delimited_files(paths=[path])
# take a random 30% sample of the data
tbl = tbl.take_random_sample(probability=.3)
df = tbl.to_pandas_dataframe()
df.head()

还可以使用以下操作获取大型数据的子集:

使用 azcopy 实用工具下载数据

使用 azcopy 实用工具将数据下载到主机(本地计算机、云 VM、Azure 机器学习计算实例等)的本地 SSD 并使用本地文件系统。 预安装在 Azure 机器学习计算实例上的 azcopy 实用工具将处理数据下载。 如果你未使用 Azure 机器学习计算实例或 Data Science Virtual Machine (DSVM),则可能需要安装 azcopy。 有关详细信息,请访问 azcopy

注意

我们不建议将数据下载到计算实例上的 /home/azureuser/cloudfiles/code 位置。 此位置旨在用于存储笔记本和代码项目,而非数据。 在训练时,从此位置读取数据会产生很高的性能开销。 相反,我们建议将数据存储在 home/azureuser(计算节点的本地 SSD)中。

打开终端并创建一个新目录,例如:

mkdir /home/azureuser/data

使用以下命令登录到 azcopy:

azcopy login

接下来,可以使用存储 URI 复制数据

SOURCE=https://<account_name>.blob.core.chinacloudapi.cn/<container>/<path>
DEST=/home/azureuser/data
azcopy cp $SOURCE $DEST

后续步骤