使用 Delta Live Tables 管理数据质量

使用预期来定义对数据集内容的数据质量约束。 期望使你能够保证进入表中的数据满足数据质量要求,并为每项管道更新提供数据质量的见解。 使用 Python 修饰器或 SQL 约束子句将预期应用于查询。

什么是增量实时表期望?

期望是添加到增量实时表数据集声明的可选子句,这些子句对通过查询的每条记录应用数据质量检查。

期望由三个元素组成:

  • 一段说明,它充当唯一标识符,并可用于跟踪约束的指标。
  • 一个布尔语句,它始终根据某个规定的条件返回 true 或 false。
  • 当记录不满足期望(即布尔语句返回 false)时要执行的操作。

以下矩阵显示了可应用于无效记录的三个操作:

操作 结果
警告(默认值) 将无效记录写入目标;将失败报告为数据集的指标。
删除 在将数据写入目标之前删除无效记录;将失败报告为数据集的指标。
失败 无效记录会阻止更新成功完成。 在重新处理之前需要手动干预。

你可以通过查询增量实时表事件日志来查看数据质量指标,例如违反预期的记录数。 请参阅监视增量实时表管道

有关增量实时表数据集声明语法的完整参考,请参阅增量实时表 Python 语言参考增量实时表 SQL 语言参考

注意

  • 虽然可以在任一期望中包含多个子句,但只有 Python 支持基于多个期望定义操作。 请参阅多个期望
  • 必须使用 SQL 表达式来定义预期。 定义预期时,不能使用非 SQL 语法(如 Python 函数)。

保留无效记录

当你想要保留不符合预期的记录时,请使用 expect 运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

删除无效记录

使用 expect or drop 运算符可防止进一步处理无效记录。 不符合预期的记录将从目标数据集中删除:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

出现无效记录时失败

当无效记录不可接受时,使用 expect or fail 运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

如果在管道中定义了多个并行流,则单个流的失败不会导致其他流失败。

当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。

预期违规会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 对于许多查询,可以使用此信息来识别导致违规的输入记录。 下面是一个异常示例:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

多个预期

可以在 Python 管道中定义具有一个或多个数据质量约束的期望。 这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。

当未通过验证的记录应包含在目标数据集中时,请使用 expect_all 指定多个数据质量约束:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

当未通过验证的记录应从目标数据集中删除时,请使用 expect_all_or_drop 指定多个数据质量约束:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

当未通过验证的记录应停止管道执行时,请使用 expect_all_or_fail 指定多个数据质量约束:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

你还可以将一组预期定义为变量,并将其传递给管道中的一个或多个查询:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

隔离无效数据

以下示例将期望与临时表和视图结合使用。 此模式提供在管道更新期间通过期望检查的记录的指标,并提供一种通过不同下游路径处理有效和无效记录的方式。

注意

此示例读取 Databricks 数据集中包含的示例数据。 由于发布到 Unity Catalog 的管道不支持 Databricks 数据集,因此此示例仅适用于配置为发布到 Hive 元存储的管道。 不过,此模式也适用于已启用 Unity Catalog 的管道,但你必须从外部位置读取数据。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

验证跨表的行计数

可以向管道添加一个附加表,用于定义比较两个具体化视图或流式处理表之间的行计数的预期。 该预期的结果显示在事件日志和增量实时表 UI 中。 以下示例验证 tblatblb 表之间的行计数是否相等:

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

根据 Delta Live Tables 预期执行高级验证

可使用聚合和联接查询定义具体化视图,并将这些查询的结果用作预期检查的一部分。 如果你想要执行复杂的数据质量检查,从而确保派生表包含源表中的所有记录或保证表中数值列的相等性,则此功能非常有用。 可以使用 TEMPORARY 关键字来防止将这些表发布到目标架构。

以下示例验证 report 表中是否存在所有预期记录:

CREATE TEMPORARY MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

以下示例使用聚合来确保主键的唯一性:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

使期望可移植且可重用

可以将数据质量规则与管道实现分开维护。

Databricks 建议将规则存储在 Delta 表中,每个规则按标记分类。 在数据集定义中使用此标记来确定要应用的规则。

以下示例创建一个名为 rules 的表来维护规则:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

以下 Python 示例基于 rules 表中存储的规则定义数据质量期望。 get_rules() 函数从 rules 表读取规则并返回一个 Python 字典,其中包含与传递给该函数的 tag 参数匹配的规则。 该字典应用于 @dlt.expect_all_*() 修饰器以强制实施数据质量约束。 例如,将从 raw_farmers_market 表中删除任何与使用 validity 标记的规则不符的记录:

注意

此示例读取 Databricks 数据集中包含的示例数据。 由于发布到 Unity Catalog 的管道不支持 Databricks 数据集,因此此示例仅适用于配置为发布到 Hive 元存储的管道。 不过,此模式也适用于已启用 Unity Catalog 的管道,但你必须从外部位置读取数据。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

可以将 Python 模块创建到主规则中,例如,在笔记本所在的同一文件夹中名为 rules_module.py 的文件中创建 Python 模块,而不是创建名为 rules 的表来维护规则:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

然后,通过导入模块并将 get_rules() 函数更改为从模块中读取,而不是从 rules 表中读取来修改前面的笔记本:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )