Great Expectations自动化数据质量监控与Pipeline集成
如何将Great Expectations集成到ETL数据Pipeline中实现自动化数据质量监控?请设计一个完整的GX工作流:定义Expectation Suite→在ETL中集成Validator→失败的记录写入Bad Records Store→触发告警。同时说明GX Cloud(SaaS版)和开源版的区别,以及在大数据量下如何通过DataFrame抽样提升校验效率。
回答
专业代码师
Great Expectations数据质量Pipeline集成:
1. 完整工作流设计:
ETL Data Pipeline
│
├─ Step 1: 数据读取(从Kafka/HDFS)
├─ Step 2: GX校验(Validator)
│ ├─ ✅通过 → 写入目标表
│ ├─ ⚠️警告 → 写入目标表 + 告警通知
│ └─ ❌失败 → 写入Bad Records Store + 阻断Pipeline
├─ Step 3: 告警通知(钉钉/Slack/Email)
└─ Step 4: Data Docs生成(质量报告)
2. 代码实现:
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
from great_expectations.validator.validator import Validator
# 1. 创建Expectation Suite
context = gx.get_context()
suite = context.add_expectation_suite("orders_quality_suite")
batch_request = BatchRequest(
datasource_name="spark_ds",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="orders"
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality_suite"
)
# 2. 添加质量规则
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", 0, 1000000)
validator.expect_column_values_to_be_in_set("status", ["pending","paid","cancelled"])
validator.expect_column_pair_values_to_be_equal("currency", "currency_code")
# 3. 执行校验
results = validator.validate()
# 4. 失败处理
if not results["success"]:
bad_records = validator.get_sql_based_expectation_failures(...)
bad_records.write.format("parquet").save("/data/bad_records/orders")
alert.send(f"数据质量校验失败: {len(bad_records)}条异常")
# 可选:raise Exception("阻断Pipeline")
# 5. 保存Data Docs
context.build_data_docs()
3. 大数据量优化(抽样校验):
# 抽样10%数据做快速校验
validator = context.get_validator(
batch_request={
"datasource_name": "spark_ds",
"data_connector_query": {
"custom_filter_function": lambda batch: batch.sample(0.1)
}
},
...
)
# 或使用GX的onboarding_data_assistant自动抽样
4. GX Cloud vs Open Source: | 功能 | 开源版 | Cloud版 | |------|--------|---------| | 部署 | 自行部署 | SaaS托管 | | 协作 | 文件共享 | Team协作 | | 数据源 | 无限制 | 有限制(按计划)| | 告警 | Webhook | 内置Slack/Email/PagerDuty | | 管理界面 | Data Docs静态页 | WebUI仪表盘 |