Databricks SDK for R

注意

本文介绍 Databricks Labs 提供的 Databricks SDK for R,它处于试验状态。 若要提供反馈、提出问题和报告问题,请使用 GitHub 上 Databricks SDK for R 存储库中的“问题”选项卡。

本文介绍如何使用 Databricks SDK for R 在 Azure Databricks 工作区中自动执行 Azure Databricks 操作。本文补充了 Databricks SDK for R 文档

注意

Databricks SDK for R 不支持 Azure Databricks 帐户中的操作自动化。 若要调用帐户级操作,请使用其他 Databricks SDK,例如:

开始之前

在开始使用 Databricks SDK for R 之前,开发计算机必须满足以下要求:

  • 你要自动化的目标 Azure Databricks 工作区的 Azure Databricks 个人访问令牌

    注意

    Databricks SDK for R 仅支持 Azure Databricks 个人访问令牌身份验证。

  • R 或者 R 兼容的集成开发环境 (IDE)。 Databricks 建议使用 RStudio Desktop,并在本文的说明中使用它。

开始使用 Databricks SDK for R

  1. 使你的 Azure Databricks 工作区 URL 和个人访问令牌可用于你的 R 项目的脚本。 例如,可以将以下内容添加到 R 项目的 .Renviron 文件中。 将 <your-workspace-url> 替换为每工作区 URL,例如 https://adb-1234567890123456.7.databricks.azure.cn。 将 <your-personal-access-token> 替换为你的 Azure Databricks 个人访问令牌,例如 dapi12345678901234567890123456789012

    DATABRICKS_HOST=<your-workspace-url>
    DATABRICKS_TOKEN=<your-personal-access-token>
    

    要创建 Azure Databricks 个人访问令牌,请执行以下操作:

    1. 在 Azure Databricks 工作区中,单击顶部栏中的 Azure Databricks 用户名,然后从下拉列表中选择“设置”
    2. 单击“开发人员”。
    3. 在“访问令牌”旁边,单击“管理”。
    4. 单击“生成新令牌”。
    5. (可选)输入有助于将来识别此令牌的注释,并将令牌的默认生存期更改为 90 天。 若要创建没有生存期的令牌(不建议),请将“生存期(天)”框留空(保留空白)。
    6. 单击“生成” 。
    7. 将显示的令牌复制到安全位置,然后单击“完成”。

    注意

    请务必将复制的令牌保存到安全的位置。 请勿与他人共享复制的令牌。 如果丢失了复制的令牌,你将无法重新生成完全相同的令牌, 而必须重复此过程来创建新令牌。 如果丢失了复制的令牌,或者认为令牌已泄露,Databricks 强烈建议通过单击“访问令牌”页上令牌旁边的垃圾桶(撤销)图标立即从工作区中删除该令牌。

    如果你无法在工作区中创建或使用令牌,可能是因为工作区管理员已禁用令牌或未授予你创建或使用令牌的权限。 请与工作区管理员联系,或参阅以下主题:

    有关提供 Azure Databricks 工作区 URL 和个人访问令牌的其他方法,请参阅 GitHub 中 Databricks SDK for R 存储库中的身份验证

    重要

    请勿将 .Renviron 文件添加到版本控制系统,因为这有公开敏感信息的风险,例如 Azure Databricks 个人访问令牌。

  2. 安装 Databricks SDK for R 包。 例如,在 RStudio Desktop 中,在“控制台”视图(“视图 > 将焦点移动到控制台”)中,运行以下命令,一次一个:

    install.packages("devtools")
    library(devtools)
    install_github("databrickslabs/databricks-sdk-r")
    

    注意

    Databricks SDK for R 包在 CRAN 上不可用。

  3. 添加代码以引用 Databricks SDK for R 并列出 Azure Databricks 工作区中的所有群集。 例如,在项目的 main.r 文件中,代码可能如下所示:

    require(databricks)
    
    client <- DatabricksClient()
    
    list_clusters(client)[, "cluster_name"]
    
  4. 运行脚本。 例如,在 RStudio Desktop 中,在项目的 main.r 文件处于活动状态的脚本编辑器中,单击“源”>“源”或“带回显的源”

  5. 此时会显示群集列表。 例如,在 RStudio Desktop 中,它位于控制台视图中。

代码示例

以下代码示例演示如何使用 Databricks SDK for R 创建和删除群集以及创建作业。

创建群集

此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,群集在空闲 15 分钟后自动终止。

require(databricks)

client <- DatabricksClient()

response <- create_cluster(
  client = client,
  cluster_name = "my-cluster",
  spark_version = "12.2.x-scala2.12",
  node_type_id = "Standard_DS3_v2",
  autotermination_minutes = 15,
  num_workers = 1
)

# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]

# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
  host <- paste(host, "/", sep = "")
}

print(paste(
  "View the cluster at ",
  host,
  "#setting/clusters/",
  response$cluster_id,
  "/configuration",
  sep = "")
)

永久删除群集

此代码示例从工作区中永久删除具有指定群集 ID 的群集。

require(databricks)

client <- DatabricksClient()

cluster_id <- readline("ID of the cluster to delete (for example, 1234-567890-ab123cd4):")

delete_cluster(client, cluster_id)

创建作业

此代码示例创建一个可用于在指定群集上运行指定笔记本的 Azure Databricks 作业。 此代码运行时,它会从控制台的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。

require(databricks)

client <- DatabricksClient()

job_name <- readline("Some short name for the job (for example, my-job):")
description <- readline("Some short description for the job (for example, My job):")
existing_cluster_id <- readline("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4):")
notebook_path <- readline("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook):")
task_key <- readline("Some key to apply to the job's tasks (for example, my-key):")

print("Attempting to create the job. Please wait...")

notebook_task <- list(
  notebook_path = notebook_path,
  source = "WORKSPACE"
)

job_task <- list(
  task_key = task_key,
  description = description,
  existing_cluster_id = existing_cluster_id,
  notebook_task = notebook_task
)

response <- create_job(
  client,
  name = job_name,
  tasks = list(job_task)
)

# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]

# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
  host <- paste(host, "/", sep = "")
}

print(paste(
  "View the job at ",
  host,
  "#job/",
  response$job_id,
  sep = "")
)

日志记录

可以使用常用 logging 包来记录消息。 此包支持多个日志记录级别和自定义日志格式。 可以使用此包将消息记录到控制台或文件。 若要记录消息,请执行以下操作:

  1. 安装 logging 包。 例如,在 RStudio Desktop 中的“控制台”视图中(“视图”>“将焦点移动到控制台”),运行以下命令:

    install.packages("logging")
    library(logging)
    
  2. 启动日志记录包,设置记录消息的位置,并设置日志记录级别。 例如,以下代码将所有 ERROR 消息记录到 results.log 文件中。

    basicConfig()
    addHandler(writeToFile, file="results.log")
    setLevel("ERROR")
    
  3. 根据需要记录消息。 例如,如果代码无法进行身份验证或列出可用群集的名称,则以下代码会记录任何错误。

    require(databricks)
    require(logging)
    
    basicConfig()
    addHandler(writeToFile, file="results.log")
    setLevel("ERROR")
    
    tryCatch({
      client <- DatabricksClient()
    }, error = function(e) {
      logerror(paste("Error initializing DatabricksClient(): ", e$message))
      return(NA)
    })
    
    tryCatch({
      list_clusters(client)[, "cluster_name"]
    }, error = function(e) {
      logerror(paste("Error in list_clusters(client): ", e$message))
      return(NA)
    })
    

测试

若要测试代码,可以使用 R 测试框架(如 testthat)。 若要在不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态的情况下在模拟条件下测试代码,可以使用 R 模拟库(如 mockery)。

例如,给定以下名为 helpers.r 的文件,其中包含返回有关新群集的信息的 createCluster 函数:

library(databricks)

createCluster <- function(
  databricks_client,
  cluster_name,
  spark_version,
  node_type_id,
  autotermination_minutes,
  num_workers
) {
  response <- create_cluster(
    client = databricks_client,
    cluster_name = cluster_name,
    spark_version = spark_version,
    node_type_id = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers = num_workers
  )
  return(response)
}

给定以下名为 main.R 的文件,用于调用 createCluster 函数:

library(databricks)
source("helpers.R")

client <- DatabricksClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = createCluster(
  databricks_client = client,
  cluster_name = "my-cluster",
  spark_version = "<spark-version>",
  node_type_id = "<node-type-id>",
  autotermination_minutes = 15,
  num_workers = 1
)

print(response$cluster_id)

以下名为 test-helpers.py 的文件测试 createCluster 函数是否返回预期的响应。 此测试不会在目标工作区中创建群集,而是模拟 DatabricksClient 对象,定义模拟对象的设置,然后将模拟对象传递给 createCluster 函数。 然后,测试将检查函数是否返回新的模拟群集的预期 ID。

# install.packages("testthat")
# install.pacakges("mockery")
# testthat::test_file("test-helpers.R")
lapply(c("databricks", "testthat", "mockery"), library, character.only = TRUE)
source("helpers.R")

test_that("createCluster mock returns expected results", {
  # Create a mock response.
  mock_response <- list(cluster_id = "abc123")

  # Create a mock function for create_cluster().
  mock_create_cluster <- mock(return_value = mock_response)

  # Run the test with the mock function.
  with_mock(
    create_cluster = mock_create_cluster,
    {
      # Create a mock Databricks client.
      mock_client <- mock()

      # Call the function with the mock client.
      # Replace <spark-version> with the target Spark version string.
      # Replace <node-type-id> with the target node type string.
      response <- createCluster(
        databricks_client = mock_client,
        cluster_name = "my-cluster",
        spark_version = "<spark-version>",
        node_type_id = "<node-type-id>",
        autotermination_minutes = 15,
        num_workers = 1
      )

      # Check that the function returned the correct mock response.
      expect_equal(response$cluster_id, "abc123")
    }
  )
})

其他资源

有关详细信息,请参阅: