使用 MongoDB 扩展命令来管理存储在适用于 MongoDB 的 Azure Cosmos DB 中的数据

适用对象: MongoDB

以下文档包含特定于 Azure Cosmos DB for MongoDB 的自定义操作命令。 这些命令可用来创建和获取特定于 Azure Cosmos DB 容量模型的数据库资源。

通过使用 Azure Cosmos DB for MongoDB,可以享受 Azure Cosmos DB 的共享优势。 这些优势包括但不限于:

  • 多区域分布
  • 自动分片
  • 高可用性
  • 延迟保证
  • 静态加密
  • 备份

你可以在享受这些优势的同时,保留对现有 MongoDB 应用程序的投资。 你可以通过任何开源 MongoDB 客户端驱动程序与 Azure Cosmos DB for MongoDB 进行通信。 Azure Cosmos DB for MongoDB 符合 MongoDB 线路协议的要求,因此支持使用现有的客户端驱动程序。

MongoDB 协议支持

Azure Cosmos DB for MongoDB 与 MongoDB Server 版本 4.0、3.6 和 3.2 兼容。 有关详细信息,请查看版本 4.03.63.2 中支持的功能和语法。

以下扩展命令将通过数据库请求来创建和修改特定于 Azure Cosmos DB 的资源:

创建数据库

“创建数据库”扩展命令可创建新的 MongoDB 数据库。 可以从 use database 命令设置的数据库上下文使用数据库名称。 下表描述了该命令中的参数:

字段 类型​​ 说明
customAction string 自定义命令的名称。 值必须是 CreateDatabase
offerThroughput int 对数据库设置的预配吞吐量。 此参数是可选的。
autoScaleSettings Object 对于自动缩放模式,此字段是必需的。 此对象包含与自动缩放容量模式关联的设置。 你可以设置 maxThroughput 值,该值描述了集合可以动态增加到的最大请求单位数量。

输出

如果此命令成功,它将返回以下响应:

{ "ok" : 1 }

有关输出中的参数,请参阅自定义命令的默认输出

示例:创建数据库

若要创建一个使用所有默认值的名为 "test" 的数据库,请使用以下命令:

use test
db.runCommand({customAction: "CreateDatabase"});

此命令将创建一个没有数据库级吞吐量的数据库。 该操作意味着,此数据库中的集合需要指定你需要使用的吞吐量。

示例:创建具有吞吐量的数据库

若要创建名为 "test" 的数据库并将数据库级别预配吞吐量指定为 1000 RU,请使用以下命令:

use test
db.runCommand({customAction: "CreateDatabase", offerThroughput: 1000 });

此命令将创建数据库并为其设置吞吐量。 除非创建集合时指定了特定的吞吐量级别,否则此数据库中的所有集合都将共享所设置的吞吐量。

示例:创建具有自动扩缩吞吐量的数据库

若要创建名为 "test" 的数据库,并在数据库级别指定 20,000 RU/秒的自动缩放最大吞吐量,请使用以下命令:

use test
db.runCommand({customAction: "CreateDatabase", autoScaleSettings: { maxThroughput: 20000 } });

更新数据库

“更新数据库”扩展命令可更新与指定的数据库相关联的属性。 如需将数据库从预配吞吐量更改为自动扩缩吞吐量,只能在 Azure 门户中进行操作,反之亦然。 下表描述了该命令中的参数:

字段 类型​​ 说明
customAction string 自定义命令的名称。 值必须是 UpdateDatabase
offerThroughput int 当数据库使用数据库级别吞吐量时你要在数据库上设置的新预配吞吐量
autoScaleSettings Object 对于自动缩放模式,此字段是必需的。 此对象包含与自动缩放容量模式关联的设置。 你可以设置 maxThroughput 值,该值描述了数据库可动态增加到的最大请求单位数量。

此命令使用在会话上下文中指定的数据库。 此数据库与你在 use <database> 命令中使用的数据库相同。 目前,无法使用此命令更改数据库名称。

输出

如果此命令成功,它将返回以下响应:

{ "ok" : 1 }

有关输出中的参数,请参阅自定义命令的默认输出

示例:更新与数据库关联的预配吞吐量

若要将名为 "test" 的数据库的预配吞吐量更新为 1200 RU,请使用以下命令:

use test
db.runCommand({customAction: "UpdateDatabase", offerThroughput: 1200 });

示例:更新与数据库关联的自动扩缩吞吐量

若要将名为 "test" 的数据库的预配吞吐量更新为 20,000 RU,或将其转换为自动缩放吞吐量级别,请使用以下命令:

use test
db.runCommand({customAction: "UpdateDatabase", autoScaleSettings: { maxThroughput: 20000 } });

获取数据库

“获取数据库”扩展命令返回数据库对象。 数据库名称取自该命令所针对的数据库上下文。

{
  customAction: "GetDatabase"
}

下表描述了该命令中的参数:

字段 类型​​ 说明
customAction string 自定义命令的名称。 值必须是 GetDatabase

输出

如果该命令成功,则响应包含带有以下字段的文档:

字段 类型​​ 说明
ok int 响应的状态。 1 == 成功。 0 == 失败。
database string 数据库的名称。
provisionedThroughput int 当数据库使用手动数据库级别吞吐量时在数据库上设置的预配吞吐量
autoScaleSettings Object 如果数据库使用的是自动扩缩模式,此对象将包含与数据库关联的容量参数。 maxThroughput 值描述了数据库可动态增加到的最大请求单位数量。

如果该命令失败,则返回默认的自定义命令响应。 有关输出中的参数,请参阅自定义命令的默认输出

示例:获取数据库

若要获取名为 "test" 的数据库的数据库对象,请使用以下命令:

use test
db.runCommand({customAction: "GetDatabase"});

如果数据库没有关联的吞吐量,输出将是:

{ "database" : "test", "ok" : 1 }

如果数据库具有关联的数据库级别手动吞吐量,则输出将显示 provisionedThroughput 值:

{ "database" : "test", "provisionedThroughput" : 20000, "ok" : 1 }

如果数据库具有关联的数据库级别自动缩放吞吐量,则输出将显示 provisionedThroughput,其中描述了数据库的最小 RU/秒,以及包含 maxThroughput(描述了数据库的最大 RU/秒)的 autoScaleSettings 对象。

{
        "database" : "test",
        "provisionedThroughput" : 2000,
        "autoScaleSettings" : {
                "maxThroughput" : 20000
        },
        "ok" : 1
}

创建集合

“创建集合”扩展命令可创建新的 MongoDB 集合。 数据库名称将在 use database 命令设置的数据库上下文中使用。 CreateCollection 命令的格式如下:

{
  customAction: "CreateCollection",
  collection: "<Collection Name>",
  shardKey: "<Shard key path>",
  // Replace the line below with "autoScaleSettings: { maxThroughput: (int) }" to use Autoscale instead of Provisioned Throughput. Fill the required Autoscale max throughput setting.
  offerThroughput: (int) // Provisioned Throughput enabled with required throughput amount set.
  indexes: [{key: {_id: 1}, name: "_id_1"}, ... ] // Optional indexes (3.6+ accounts only).
}

下表描述了该命令中的参数:

字段 类型 需要 说明
customAction string 必须 自定义命令的名称。 值必须是 CreateCollection
collection string 必须 集合的名称。 不允许使用特殊字符或空格。
offerThroughput int 可选 要在数据库上设置的预配吞吐量。 如果未提供此参数,则默认为最小值(400 请求单位/秒)。 *若要指定超过 10,000 RU/秒的吞吐量,需要使用 shardKey 参数。
shardKey string 对于具有较大吞吐量的集合,此字段是必需的 分片集合的分片键的路径。 如果在 offerThroughput 中设置的 RU/秒超过 10,000,则需要此参数。 如果指定了此参数,则插入的所有文档都需要此键和值。
autoScaleSettings Object 对于自动缩放模式,此字段是必需的 此对象包含与自动缩放容量模式关联的设置。 你可以设置 maxThroughput 值,该值描述了集合可以动态增加到的最大请求单位数量。
indexes Array (可选)配置索引。 仅 3.6+ 帐户支持此参数。 当存在时,需要在 _id 上建立索引。 数组中的每个条目必须包含由一个或多个字段组成的键和一个名称,并且可以包含索引选项。 例如,若要为字段 ab 创建复合唯一索引,请使用此输入:{key: {a: 1, b: 1}, name:"a_1_b_1", unique: true}

输出

返回默认的自定义命令响应。 有关输出中的参数,请参阅自定义命令的默认输出

示例:使用最低配置创建集合

若要创建使用名称 "testCollection" 和默认值的新集合,请使用以下命令:

use test
db.runCommand({customAction: "CreateCollection", collection: "testCollection"});

此操作将生成一个未分片且吞吐量为 400 请求单位/秒的全新固定集合,并自动创建基于 _id 字段的索引。 这种类型的配置也适用于通过 insert() 函数创建新集合的情况。 例如:

use test
db.newCollection.insert({});

示例:创建未分片的集合

若要创建名为 "testCollection" 且预配吞吐量为 1000 请求单位的未分片集合,请使用以下命令:

use test
db.runCommand({customAction: "CreateCollection", collection: "testCollection", offerThroughput: 1000});

你可以创建 offerThroughput 最高为 10,000 RU/秒的集合,不需要指定分片键。 对于吞吐量更大的集合,请参阅下一部分。

示例:创建分片集合

若要创建名为 "testCollection"、预配吞吐量为 11,000 RU 且具有 shardkey 属性“a.b”的分片集合,请使用以下命令:

use test
db.runCommand({customAction: "CreateCollection", collection: "testCollection", offerThroughput: 11000, shardKey: "a.b" });

此命令现在需要 shardKey 参数,因为 offerThroughput 中指定的 RU/秒超过 10,000。

示例:创建未分片的自动扩缩集合

若要创建名为 'testCollection' 且使用设置为 4,000 RU/秒的自动缩放吞吐量容量的未分片集合,请使用以下命令:

use test
db.runCommand({ 
    customAction: "CreateCollection", collection: "testCollection", 
    autoScaleSettings:{
      maxThroughput: 4000
    } 
});

对于 autoScaleSettings.maxThroughput 值,你可以指定一个从 4,000 请求单位/秒到 10,000 请求单位/秒的范围,而无需使用分片键。 若要实现更高的自动缩放吞吐量,需要指定 shardKey 参数。

示例:创建分片的自动扩缩集合

若要创建名为 'testCollection'、分片键名为 'a.b' 且使用设置为 20,000 RU/秒的自动缩放吞吐量容量的分片集合,请使用以下命令:

use test
db.runCommand({customAction: "CreateCollection", collection: "testCollection", shardKey: "a.b", autoScaleSettings: { maxThroughput: 20000 }});

更新集合

“更新集合”扩展命令可更新与指定的集合相关联的属性。 如需将集合从预配吞吐量更改为自动扩缩吞吐量,只能在 Azure 门户中进行操作,反之亦然。

{
  customAction: "UpdateCollection",
  collection: "<Name of the collection that you want to update>",
  // Replace the line below with "autoScaleSettings: { maxThroughput: (int) }" if using Autoscale instead of Provisioned Throughput. Fill the required Autoscale max throughput setting. Changing between Autoscale and Provisioned throughput is only supported in the Azure Portal.
  offerThroughput: (int) // Provisioned Throughput enabled with required throughput amount set.
  indexes: [{key: {_id: 1}, name: "_id_1"}, ... ] // Optional indexes (3.6+ accounts only).
}

下表描述了该命令中的参数:

字段 类型​​ 说明
customAction string 自定义命令的名称。 值必须是 UpdateCollection
collection string 集合的名称。
offerThroughput int 要对集合设置的预配吞吐量。
autoScaleSettings Object 对于自动缩放模式,此字段是必需的。 此对象包含与自动缩放容量模式关联的设置。 maxThroughput 值描述了集合可动态增加到的最大请求单位数量。
indexes Array (可选)配置索引。 仅 3.6+ 帐户支持此参数。 当存在时,指定的索引集(包括删除索引)将替代集合的现有索引。 需要在 _id 上建立索引。 数组中的每个条目必须包含由一个或多个字段组成的键和一个名称,并且可以包含索引选项。 例如,若要在字段 a 和 b 上创建复合唯一索引,请使用此条目:{key: {a: 1, b: 1}, name: "a_1_b_1", unique: true}

输出

返回默认的自定义命令响应。 有关输出中的参数,请参阅自定义命令的默认输出

示例:更新与集合关联的预配吞吐量

若要将名为 "testCollection" 的集合的预配吞吐量更新为 1200 RU,请使用以下命令:

use test
db.runCommand({customAction: "UpdateCollection", collection: "testCollection", offerThroughput: 1200 });

获取集合

“获取集合”自定义命令返回集合对象。

{
  customAction: "GetCollection",
  collection: "<Name of the collection>"
}

下表描述了该命令中的参数:

字段 类型​​ 说明
customAction string 自定义命令的名称。 值必须是 GetCollection
collection string 集合的名称。

输出

如果该命令成功,则响应包含带有以下字段的文档

字段 类型​​ 说明
ok int 响应的状态。 1 == 成功。 0 == 失败。
database string 数据库的名称。
collection string 集合的名称。
shardKeyDefinition document 用作分片键的索引规范文档。 此字段是一个可选的响应参数。
provisionedThroughput int 要对集合设置的预配吞吐量。 此字段是一个可选的响应参数。
autoScaleSettings Object 如果数据库使用的是自动扩缩模式,此对象将包含与数据库关联的容量参数。 maxThroughput 值描述了集合可动态增加到的最大请求单位数量。

如果该命令失败,则返回默认的自定义命令响应。 有关输出中的参数,请参阅自定义命令的默认输出

示例:获取集合

若要获取名为 "testCollection" 的集合的集合对象,请使用以下命令:

use test
db.runCommand({customAction: "GetCollection", collection: "testCollection"});

如果集合具有关联的吞吐量容量,则它将包括 provisionedThroughput 值,且输出为:

{
        "database" : "test",
        "collection" : "testCollection",
        "provisionedThroughput" : 400,
        "ok" : 1
}

如果集合具有关联的自动扩缩吞吐量,则它将包括带有 maxThroughput 参数的 autoScaleSettings 对象,该参数定义的是集合可动态增加到的最大吞吐量。 此外,它还将包括 provisionedThroughput 值,该值定义的是当集合中没有请求时此集合将减少到的最小吞吐量:

{
        "database" : "test",
        "collection" : "testCollection",
        "provisionedThroughput" : 1000,
        "autoScaleSettings" : {
            "maxThroughput" : 10000
        },
        "ok" : 1
}

如果集合共享数据库级别吞吐量(不管是自动缩放模式还是手动模式),则输出将是:

{ "database" : "test", "collection" : "testCollection", "ok" : 1 }
{
        "database" : "test",
        "provisionedThroughput" : 2000,
        "autoScaleSettings" : {
            "maxThroughput" : 20000
        },
        "ok" : 1
}

并行化更改流

大规模使用更改流时,最好均匀分布负载。 以下命令将返回一个或多个更改流恢复标记,其中每个标记都对应于单个物理分片/分区中的数据(多个逻辑分片/分区可存在于一个物理分区中)。 每个恢复标记都可导致 watch() 仅从该物理分片/分区返回数据。

通过在每个恢复标记(每个标记一个线程)上使用 db.collection.watch(),可有效地扩缩更改流。

{
        customAction: "GetChangeStreamTokens", 
        collection: "<Name of the collection>", 
        startAtOperationTime: "<BSON Timestamp>" // Optional. Defaults to the time the command is run.
} 

示例:获取流标记

运行自定义命令,获取每个物理分片/分区的恢复标记。

use test
db.runCommand({customAction: "GetChangeStreamTokens", collection: "<Name of the collection>"})

为从 GetChangeStreamTokens 自定义命令返回的每个恢复标记运行 watch() 线程/进程。 下面是一个线程的示例。

db.test_coll.watch([{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } }, { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }], 
{fullDocument: "updateLookup", 
resumeAfter: { "_data" : BinData(0,"eyJWIjoyLCJSaWQiOiJQeFVhQUxuMFNLRT0iLCJDb250aW51YXRpb24iOlt7IkZlZWRSYW5nZSI6eyJ0eXBlIjoiRWZmZWN0aXZlIFBhcnRpdGlvbiBLZXkgUmFuZ2UiLCJ2YWx1ZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiJ9fSwiU3RhdGUiOnsidHlwZSI6ImNvbnRpbndkFLbiIsInZhbHVlIjoiXCIxODQ0XCIifX1dfQ=="), "_kind" : NumberInt(1)}})

resumeAfter 字段中的文档(值)表示恢复标记。 watch() 命令将返回自 GetChangeStreamTokens 自定义命令运行以来从该物理分区插入、更新或替换的所有文档的游标。 下面是返回的数据的示例。

{
  "_id": {
    "_data": BinData(0,
    "eyJWIjoyLCJSaWQiOiJQeFVhQUxuMFNLRT0iLCJDfdsfdsfdsft7IkZlZWRSYW5nZSI6eyJ0eXBlIjoiRWZmZWN0aXZlIFBhcnRpdGlvbiBLZXkgUmFuZ2UiLCJ2YWx1ZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiJ9fSwiU3RhdGUiOnsidHlwZSI6ImNvbnRpbnVhdGlvbiIsInZhbHVlIjoiXCIxOTgwXCIifX1dfQ=="),
    "_kind": 1
  },
  "fullDocument": {
    "_id": ObjectId("60da41ec9d1065b9f3b238fc"),
    "name": John,
    "age": 6
  },
  "ns": {
    "db": "test-db",
    "coll": "test_coll"
  },
  "documentKey": {
    "_id": ObjectId("60da41ec9d1065b9f3b238fc")
  }
}

返回的每个文档都包含一个恢复标记(每一页的恢复标记都是相同的)。 如果线程/进程停止,应存储此恢复标记并重复使用。 此恢复标记将从中断的位置继续,并仅从该物理分区接收数据。

自定义命令的默认输出

如果未指定,则自定义响应包含带有以下字段的文档:

字段 类型​​ 说明
ok int 响应的状态。 1 == 成功。 0 == 失败。
code int 仅当命令失败时才返回(即 ok == 0)。 包含 MongoDB 错误代码。 此字段是一个可选的响应参数。
errMsg string 仅当命令失败时才返回(即 ok == 0)。 包含用户友好的错误消息。 此字段是一个可选的响应参数。

例如:

{ "ok" : 1 }

后续步骤

接下来,可以继续学习以下 Azure Cosmos DB 概念: