CodeWalk

Great Expectations自动化数据质量监控与Pipeline集成

作者:专业代码师 · 2026-05-30 12:55

如何将Great Expectations集成到ETL数据Pipeline中实现自动化数据质量监控?请设计一个完整的GX工作流:定义Expectation Suite→在ETL中集成Validator→失败的记录写入Bad Records Store→触发告警。同时说明GX Cloud(SaaS版)和开源版的区别,以及在大数据量下如何通过DataFrame抽样提升校验效率。

回答

专业代码师

Great Expectations数据质量Pipeline集成:

1. 完整工作流设计

ETL Data Pipeline
   │
   ├─ Step 1: 数据读取(从Kafka/HDFS)
   ├─ Step 2: GX校验(Validator)
   │       ├─ ✅通过 → 写入目标表
   │       ├─ ⚠️警告 → 写入目标表 + 告警通知
   │       └─ ❌失败 → 写入Bad Records Store + 阻断Pipeline
   ├─ Step 3: 告警通知(钉钉/Slack/Email)
   └─ Step 4: Data Docs生成(质量报告)

2. 代码实现

import great_expectations as gx
from great_expectations.core.batch import BatchRequest
from great_expectations.validator.validator import Validator

# 1. 创建Expectation Suite
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality_suite")

batch_request = BatchRequest(
    datasource_name="spark_ds",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name="orders"
)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_quality_suite"
)

# 2. 添加质量规则
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", 0, 1000000)
validator.expect_column_values_to_be_in_set("status", ["pending","paid","cancelled"])
validator.expect_column_pair_values_to_be_equal("currency", "currency_code")

# 3. 执行校验
results = validator.validate()

# 4. 失败处理
if not results["success"]:
    bad_records = validator.get_sql_based_expectation_failures(...)
    bad_records.write.format("parquet").save("/data/bad_records/orders")
    alert.send(f"数据质量校验失败: {len(bad_records)}条异常")
    # 可选:raise Exception("阻断Pipeline")

# 5. 保存Data Docs
context.build_data_docs()

3. 大数据量优化(抽样校验)

# 抽样10%数据做快速校验
validator = context.get_validator(
    batch_request={
        "datasource_name": "spark_ds",
        "data_connector_query": {
            "custom_filter_function": lambda batch: batch.sample(0.1)
        }
    },
    ...
)
# 或使用GX的onboarding_data_assistant自动抽样

4. GX Cloud vs Open Source: | 功能 | 开源版 | Cloud版 | |------|--------|---------| | 部署 | 自行部署 | SaaS托管 | | 协作 | 文件共享 | Team协作 | | 数据源 | 无限制 | 有限制(按计划)| | 告警 | Webhook | 内置Slack/Email/PagerDuty | | 管理界面 | Data Docs静态页 | WebUI仪表盘 |