通过数据移动库传输数据
注意
本文将指导如何使用“Azure 存储数据移动”库的版本 2.0.XX。 版本 2.0.XX 目前处于维护模式,库仅接收有关数据完整性和安全问题的修补程序。 不会添加新的功能或特性,库将不支持新的存储服务版本。
新式数据移动库的 Beta 版本目前正在开发中。 有关详细信息,请参阅 GitHub 上的适用于 .NET 的 Azure 存储数据移动常用客户端库。
Azure 存储数据移动库是一个高性能的跨平台开源库,用于上传、下载和复制 Blob 与文件。 数据移动库提供用于 .NET 的 Azure 存储客户端库中不能提供的便利方法。 通过这些方法,可设置并行操作数目、跟踪传输进度、轻松恢复已取消的传输等。
此库还使用 .NET Core,这意味着,可以在构建适用于 Windows、Linux 和 macOS 的 .NET 应用时使用它。 若要了解有关 .NET Core 的详细信息,请参阅 .NET Core 文档。 此库也适用于面向 Windows 的传统 .NET Framework 应用。
本文档演示如何创建在 Windows、Linux 和 macOS 上运行的 .NET Core 控制台应用程序并执行以下方案:
- 将文件和目录上传到 Blob 存储。
- 定义传输数据时使用的并行操作数目。
- 跟踪数据传输进度。
- 恢复已取消的数据传输。
- 将文件从 URL 复制到 Blob 存储。
- 从 Blob 存储复制到 Blob 存储。
必备条件
安装
- 请参阅 .NET Core 安装指南,安装 .NET Core SDK。 选择环境时,请选择命令行选项。
- 通过命令行创建项目的目录。 导航到此目录,并键入
dotnet new console -o <sample-project-name>
创建 C# 控制台项目。 - 在 Visual Studio Code 中打开此目录。 通过在 Windows 中命令行上键入
code .
可快速完成此步骤。 - 从 Visual Studio Code Marketplace 安装 C# 扩展。 重新启动 Visual Studio Code。
- 此时,应会出现两条提示。 其中一条提示指出要“添加所需的资产用于生成和调试。”选择“是”。另一条提示指出要还原未解析的依赖项。 选择“还原”。
- 修改
.vscode
下的launch.json
,将外部终端用作控制台。 此设置应为"console": "externalTerminal"
- 可以使用 Visual Studio Code 调试 .NET Core 应用程序。 点击
F5
运行应用程序,并验证设置是否正常运行。 此时会看到“Hello World!”输出到控制台。
将数据移动库添加到项目
- 将最新版本的数据移动库添加到
<project-name>.csproj
文件的dependencies
节。 在编写本文时,最新的版本是"Microsoft.Azure.Storage.DataMovement": "0.6.2"
- 此时应会显示一条提示,指出要还原项目。 选择“还原”按钮。 也可以从命令行还原项目,在项目的根目录中键入
dotnet restore
命令即可。
修改 <project-name>.csproj
:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Storage.DataMovement" Version="0.6.2" />
</ItemGroup>
</Project>
设置应用程序框架
第一项操作是为应用程序设置代码框架。 此代码提示我们输入存储帐户名和帐户密钥,并使用这些凭据创建 CloudStorageAccount
对象。 此对象用来与所有传输方案中的存储帐户交互。 该代码还会提示我们选择要执行的传输操作类型。
修改 Program.cs
:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.Storage.DataMovement;
namespace DMLibSample
{
public class Program
{
public static void Main()
{
Console.WriteLine("Enter Storage account name:");
string accountName = Console.ReadLine();
Console.WriteLine("\nEnter Storage account key:");
string accountKey = Console.ReadLine();
string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=" + accountName + ";AccountKey=" + accountKey + ";EndpointSuffix=core.chinacloudapi.cn";
CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
ExecuteChoice(account);
}
public static void ExecuteChoice(CloudStorageAccount account)
{
Console.WriteLine("\nWhat type of transfer would you like to execute?\n1. Local file --> Azure Blob\n2. Local directory --> Azure Blob directory\n3. URL (e.g. Amazon S3 file) --> Azure Blob\n4. Azure Blob --> Azure Blob");
int choice = int.Parse(Console.ReadLine());
if(choice == 1)
{
TransferLocalFileToAzureBlob(account).Wait();
}
else if(choice == 2)
{
TransferLocalDirectoryToAzureBlobDirectory(account).Wait();
}
else if(choice == 3)
{
TransferUrlToAzureBlob(account).Wait();
}
else if(choice == 4)
{
TransferAzureBlobToAzureBlob(account).Wait();
}
}
public static async Task TransferLocalFileToAzureBlob(CloudStorageAccount account)
{
}
public static async Task TransferLocalDirectoryToAzureBlobDirectory(CloudStorageAccount account)
{
}
public static async Task TransferUrlToAzureBlob(CloudStorageAccount account)
{
}
public static async Task TransferAzureBlobToAzureBlob(CloudStorageAccount account)
{
}
}
}
重要
此代码示例使用连接字符串授权对你的存储帐户的访问。 此配置作为示例之用。 在应用程序代码中,应谨慎使用连接字符串和帐户访问密钥。 如果帐户访问密钥丢失或意外放置在不安全的位置,服务可能会变得易受攻击。 具有访问密钥的任何人都可以授权针对存储帐户的请求,并且实际上有权访问所有数据。
将本地文件上传到 blob
将 GetSourcePath
和 GetBlob
方法添加到 Program.cs
:
public static string GetSourcePath()
{
Console.WriteLine("\nProvide path for source:");
string sourcePath = Console.ReadLine();
return sourcePath;
}
public static CloudBlockBlob GetBlob(CloudStorageAccount account)
{
CloudBlobClient blobClient = account.CreateCloudBlobClient();
Console.WriteLine("\nProvide name of Blob container:");
string containerName = Console.ReadLine();
CloudBlobContainer container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExistsAsync().Wait();
Console.WriteLine("\nProvide name of new Blob:");
string blobName = Console.ReadLine();
CloudBlockBlob blob = container.GetBlockBlobReference(blobName);
return blob;
}
修改 TransferLocalFileToAzureBlob
方法:
public static async Task TransferLocalFileToAzureBlob(CloudStorageAccount account)
{
string localFilePath = GetSourcePath();
CloudBlockBlob blob = GetBlob(account);
Console.WriteLine("\nTransfer started...");
await TransferManager.UploadAsync(localFilePath, blob);
Console.WriteLine("\nTransfer operation complete.");
ExecuteChoice(account);
}
此代码提示我们输入本地文件的路径、新的或现有容器的名称,以及新 Blob 的名称。 TransferManager.UploadAsync
方法使用此信息执行上传。
点击 F5
运行应用程序。 可通过使用 Microsoft Azure 存储资源管理器查看存储帐户,来验证是否已发生上传。
设置并行操作数目
数据移动库提供设置并行操作数目的功能,以提高数据传输吞吐量。 默认情况下,数据移动库将并行操作数目设置为计算机上核心数的 8 倍。
请注意,在低带宽环境中,大量的并发操作可能会使网络连接变得紊乱,实质性地阻碍操作的全面完成。 应该试运行一下此设置,根据可用网络带宽确定哪种设置最合适。
在此示例中,添加允许我们设置并行操作数的代码。 同时添加用于计量完成传输所需时间的代码。
将 SetNumberOfParallelOperations
方法添加到 Program.cs
:
public static void SetNumberOfParallelOperations()
{
Console.WriteLine("\nHow many parallel operations would you like to use?");
string parallelOperations = Console.ReadLine();
TransferManager.Configurations.ParallelOperations = int.Parse(parallelOperations);
}
将 ExecuteChoice
方法修改为使用 SetNumberOfParallelOperations
:
public static void ExecuteChoice(CloudStorageAccount account)
{
Console.WriteLine("\nWhat type of transfer would you like to execute?\n1. Local file --> Azure Blob\n2. Local directory --> Azure Blob directory\n3. URL (e.g. Amazon S3 file) --> Azure Blob\n4. Azure Blob --> Azure Blob");
int choice = int.Parse(Console.ReadLine());
SetNumberOfParallelOperations();
if(choice == 1)
{
TransferLocalFileToAzureBlob(account).Wait();
}
else if(choice == 2)
{
TransferLocalDirectoryToAzureBlobDirectory(account).Wait();
}
else if(choice == 3)
{
TransferUrlToAzureBlob(account).Wait();
}
else if(choice == 4)
{
TransferAzureBlobToAzureBlob(account).Wait();
}
}
将 TransferLocalFileToAzureBlob
方法修改为使用计时器:
public static async Task TransferLocalFileToAzureBlob(CloudStorageAccount account)
{
string localFilePath = GetSourcePath();
CloudBlockBlob blob = GetBlob(account);
Console.WriteLine("\nTransfer started...");
Stopwatch stopWatch = Stopwatch.StartNew();
await TransferManager.UploadAsync(localFilePath, blob);
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
跟踪传输进度
可通过创建 TransferContext
对象来跟踪传输过程中传输的进度。 TransferContext
对象采用两种形式:用于单个文件传输的 SingleTransferContext
,以及用于目录传输的 DirectoryTransferContext
。
将 GetSingleTransferContext
和 GetDirectoryTransferContext
方法添加到 Program.cs
:
public static SingleTransferContext GetSingleTransferContext(TransferCheckpoint checkpoint)
{
SingleTransferContext context = new SingleTransferContext(checkpoint);
context.ProgressHandler = new Progress<TransferStatus>((progress) =>
{
Console.Write("\rBytes transferred: {0}", progress.BytesTransferred );
});
return context;
}
public static DirectoryTransferContext GetDirectoryTransferContext(TransferCheckpoint checkpoint)
{
DirectoryTransferContext context = new DirectoryTransferContext(checkpoint);
context.ProgressHandler = new Progress<TransferStatus>((progress) =>
{
Console.Write("\rBytes transferred: {0}", progress.BytesTransferred );
});
return context;
}
将 TransferLocalFileToAzureBlob
方法修改为使用 GetSingleTransferContext
:
public static async Task TransferLocalFileToAzureBlob(CloudStorageAccount account)
{
string localFilePath = GetSourcePath();
CloudBlockBlob blob = GetBlob(account);
TransferCheckpoint checkpoint = null;
SingleTransferContext context = GetSingleTransferContext(checkpoint);
Console.WriteLine("\nTransfer started...\n");
Stopwatch stopWatch = Stopwatch.StartNew();
await TransferManager.UploadAsync(localFilePath, blob, null, context);
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
恢复已取消的传输
数据移动库提供的另一个功能是恢复已取消的传输。 接下来,添加一些代码,使我们可通过键入 c
暂时取消传输,在 3 秒后恢复传输。
修改 TransferLocalFileToAzureBlob
方法:
public static async Task TransferLocalFileToAzureBlob(CloudStorageAccount account)
{
string localFilePath = GetSourcePath();
CloudBlockBlob blob = GetBlob(account);
TransferCheckpoint checkpoint = null;
SingleTransferContext context = GetSingleTransferContext(checkpoint);
CancellationTokenSource cancellationSource = new CancellationTokenSource();
Console.WriteLine("\nTransfer started...\nPress 'c' to temporarily cancel your transfer...\n");
Stopwatch stopWatch = Stopwatch.StartNew();
Task task;
ConsoleKeyInfo keyinfo;
try
{
task = TransferManager.UploadAsync(localFilePath, blob, null, context, cancellationSource.Token);
while(!task.IsCompleted)
{
if(Console.KeyAvailable)
{
keyinfo = Console.ReadKey(true);
if(keyinfo.Key == ConsoleKey.C)
{
cancellationSource.Cancel();
}
}
}
await task;
}
catch(Exception e)
{
Console.WriteLine("\nThe transfer is canceled: {0}", e.Message);
}
if(cancellationSource.IsCancellationRequested)
{
Console.WriteLine("\nTransfer will resume in 3 seconds...");
Thread.Sleep(3000);
checkpoint = context.LastCheckpoint;
context = GetSingleTransferContext(checkpoint);
Console.WriteLine("\nResuming transfer...\n");
await TransferManager.UploadAsync(localFilePath, blob, null, context);
}
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
到目前为止,checkpoint
值已设置为 null
。 现在,如果取消传输,则可以检索传输的最后一个检查点,并在传输上下文中使用这个新的检查点。
将本地目录传输到 Blob 存储
通过数据移动库,可传输文件的目录及其所有子目录,如以下示例所示。
首先,将 GetBlobDirectory
方法添加到 Program.cs
:
public static CloudBlobDirectory GetBlobDirectory(CloudStorageAccount account)
{
CloudBlobClient blobClient = account.CreateCloudBlobClient();
Console.WriteLine("\nProvide name of Blob container. This can be a new or existing Blob container:");
string containerName = Console.ReadLine();
CloudBlobContainer container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExistsAsync().Wait();
CloudBlobDirectory blobDirectory = container.GetDirectoryReference("");
return blobDirectory;
}
然后修改 TransferLocalDirectoryToAzureBlobDirectory
:
public static async Task TransferLocalDirectoryToAzureBlobDirectory(CloudStorageAccount account)
{
string localDirectoryPath = GetSourcePath();
CloudBlobDirectory blobDirectory = GetBlobDirectory(account);
TransferCheckpoint checkpoint = null;
DirectoryTransferContext context = GetDirectoryTransferContext(checkpoint);
CancellationTokenSource cancellationSource = new CancellationTokenSource();
Console.WriteLine("\nTransfer started...\nPress 'c' to temporarily cancel your transfer...\n");
Stopwatch stopWatch = Stopwatch.StartNew();
Task task;
ConsoleKeyInfo keyinfo;
UploadDirectoryOptions options = new UploadDirectoryOptions()
{
Recursive = true
};
try
{
task = TransferManager.UploadDirectoryAsync(localDirectoryPath, blobDirectory, options, context, cancellationSource.Token);
while(!task.IsCompleted)
{
if(Console.KeyAvailable)
{
keyinfo = Console.ReadKey(true);
if(keyinfo.Key == ConsoleKey.C)
{
cancellationSource.Cancel();
}
}
}
await task;
}
catch(Exception e)
{
Console.WriteLine("\nThe transfer is canceled: {0}", e.Message);
}
if(cancellationSource.IsCancellationRequested)
{
Console.WriteLine("\nTransfer will resume in 3 seconds...");
Thread.Sleep(3000);
checkpoint = context.LastCheckpoint;
context = GetDirectoryTransferContext(checkpoint);
Console.WriteLine("\nResuming transfer...\n");
await TransferManager.UploadDirectoryAsync(localDirectoryPath, blobDirectory, options, context);
}
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
此方法与用于上传单个文件的方法有几处区别。 现在,我们要使用 TransferManager.UploadDirectoryAsync
以及前面创建的 getDirectoryTransferContext
方法。 此外,要将 options
值提供给上传操作,指出我们要在上传内容中包含子目录。
将文件从 URL 复制到 Blob
现在,让我们添加代码,将文件从 URL 复制到 Azure Blob。
修改 TransferUrlToAzureBlob
:
public static async Task TransferUrlToAzureBlob(CloudStorageAccount account)
{
Uri uri = new Uri(GetSourcePath());
CloudBlockBlob blob = GetBlob(account);
TransferCheckpoint checkpoint = null;
SingleTransferContext context = GetSingleTransferContext(checkpoint);
CancellationTokenSource cancellationSource = new CancellationTokenSource();
Console.WriteLine("\nTransfer started...\nPress 'c' to temporarily cancel your transfer...\n");
Stopwatch stopWatch = Stopwatch.StartNew();
Task task;
ConsoleKeyInfo keyinfo;
try
{
task = TransferManager.CopyAsync(uri, blob, CopyMethod.ServiceSideAsyncCopy, null, context, cancellationSource.Token);
while(!task.IsCompleted)
{
if(Console.KeyAvailable)
{
keyinfo = Console.ReadKey(true);
if(keyinfo.Key == ConsoleKey.C)
{
cancellationSource.Cancel();
}
}
}
await task;
}
catch(Exception e)
{
Console.WriteLine("\nThe transfer is canceled: {0}", e.Message);
}
if(cancellationSource.IsCancellationRequested)
{
Console.WriteLine("\nTransfer will resume in 3 seconds...");
Thread.Sleep(3000);
checkpoint = context.LastCheckpoint;
context = GetSingleTransferContext(checkpoint);
Console.WriteLine("\nResuming transfer...\n");
await TransferManager.CopyAsync(uri, blob, CopyMethod.ServiceSideAsyncCopy, null, context, cancellationSource.Token);
}
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
此功能的一个重要用例是将数据从另一个云服务移到 Azure。 如果你拥有一个可用于访问资源的 URL,则可使用 TransferManager.CopyAsync
方法轻松将该资源移入 Azure Blob。 此方法还引入了 CopyMethod 参数。 下表显示了此参数的可用选项:
成员名称 | 值 | 说明 |
---|---|---|
SyncCopy | 0 | 将数据从源下载到内存,并将数据从内存上传到目标。 目前只可用于在两个不同的 Azure 存储资源之间复制。 |
ServiceSideAsyncCopy | 1 | 将开始复制请求发送到 Azure 存储以允许其执行复制;监视复制操作进度,直到复制完成。 |
ServiceSideSyncCopy | 2 | 复制每个区块的内容,其中包含“从 URL 放置块”、“从 URL 追加块”或“从 URL 放置页”。 |
复制 Blob
数据移动库提供的另一个功能是在两个不同的 Azure 存储资源之间复制。
修改 TransferAzureBlobToAzureBlob
方法:
public static async Task TransferAzureBlobToAzureBlob(CloudStorageAccount account)
{
CloudBlockBlob sourceBlob = GetBlob(account);
CloudBlockBlob destinationBlob = GetBlob(account);
TransferCheckpoint checkpoint = null;
SingleTransferContext context = GetSingleTransferContext(checkpoint);
CancellationTokenSource cancellationSource = new CancellationTokenSource();
Console.WriteLine("\nTransfer started...\nPress 'c' to temporarily cancel your transfer...\n");
Stopwatch stopWatch = Stopwatch.StartNew();
Task task;
ConsoleKeyInfo keyinfo;
try
{
task = TransferManager.CopyAsync(sourceBlob, destinationBlob, CopyMethod.SyncCopy, null, context, cancellationSource.Token);
while(!task.IsCompleted)
{
if(Console.KeyAvailable)
{
keyinfo = Console.ReadKey(true);
if(keyinfo.Key == ConsoleKey.C)
{
cancellationSource.Cancel();
}
}
}
await task;
}
catch(Exception e)
{
Console.WriteLine("\nThe transfer is canceled: {0}", e.Message);
}
if(cancellationSource.IsCancellationRequested)
{
Console.WriteLine("\nTransfer will resume in 3 seconds...");
Thread.Sleep(3000);
checkpoint = context.LastCheckpoint;
context = GetSingleTransferContext(checkpoint);
Console.WriteLine("\nResuming transfer...\n");
await TransferManager.CopyAsync(sourceBlob, destinationBlob, CopyMethod.SyncCopy, null, context, cancellationSource.Token);
}
stopWatch.Stop();
Console.WriteLine("\nTransfer operation completed in " + stopWatch.Elapsed.TotalSeconds + " seconds.");
ExecuteChoice(account);
}
在本示例中,我们将 TransferManager.CopyAsync
中的布尔参数设置为 CopyMethod.SyncCopy
,表示我们要执行同步复制。 此配置意味着,先将资源下载到本地计算机,然后将其上传到 Azure Blob。 同步复制选项能够很好地确保复制操作以一致的速度进行。 相比之下,异步服务器复制的速度取决于服务器上的可用网络带宽,而这种带宽可能会有波动。 不过,同步复制可能会产生额外的数据传出费用,而异步复制则不会。 在与源存储帐户所在的同一区域的 Azure 虚拟机中,建议使用同步复制,避免产生数据传出费用。
数据移动应用程序现已完成。 GitHub 上提供了完整的代码示例。
后续步骤
提示
使用 Azure 存储资源管理器管理 Azure Blob 存储资源。 Azure 存储资源管理器 是 Microsoft 免费提供的独立应用,可用于管理 Azure Blob 存储资源。 使用 Azure 存储资源管理器可以直观地创建、读取、更新和删除 Blob 容器与 Blob,以及管理对 Blob 容器和 Blob 的访问。