数据质量监控体系设计与实现
请设计一个大数据数据质量监控体系,涵盖数据完整性、准确性、一致性、及时性和唯一性五个维度。如何实现自动化监控?如何定义数据质量SLA指标?当数据质量异常时如何告警和自动补偿?
回答
编译有声
1. 五维监控体系
| 维度 | 指标 | 监控SQL示例 |
|---|---|---|
| 完整性 | 空值率 | SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)/COUNT(*) |
| 准确性 | 数据范围 | WHERE amount BETWEEN 0 AND 10^8 |
| 一致性 | 跨表校验 | SELECT a.id FROM a LEFT JOIN b ON a.id=b.id WHERE b.id IS NULL |
| 及时性 | 延迟时间 | SELECT MAX(proctime-event_time) FROM stream |
| 唯一性 | 主键重复 | SELECT pk, COUNT(*) FROM t GROUP BY pk HAVING COUNT(*) > 1 |
2. 自动化监控架构
数据质量平台
├── 规则配置模块 → YAML/JSON规则定义
├── 调度引擎 → Airflow/DolphinScheduler定时触发
├── 执行引擎 → Spark SQL / Flink SQL
├── 结果存储 → MySQL / ES
└── 告警通道 → 邮件/钉钉/飞书/Slack
规则示例:
rules:
- name: "订单表空值检测"
table: ods_orders
type: COMPLETENESS
column: order_id
threshold: 0.01 # 空值率≤1%
severity: P0
schedule: "0 */1 * * *" # 每小时
3. SLA指标定义
ODS层:完整性≥99.9%,延迟≤5min
DWD层:准确性≥99.99%,一致性=100%
DWS层:及时性≤1min
ADS层:可用性≥99.99%
4. 异常告警与补偿
告警分级:
- P0(严重):主表数据丢失 → 电话告警 → 立即停服修复
- P1(高):数据延迟超过2h → 即时通讯告警
- P2(中):轻微空值率超标 → 邮件告警
- P3(低):趋势预警 → 周报
自动补偿:
# 补偿策略
if anomaly_type == "LOST_DATA":
trigger_backfill(date_range, source="kafka")
elif anomaly_type == "DELAY":
resume_checkpoint()
elif anomaly_type == "DUPLICATE":
run_dedup_pipeline()
5. 技术实现
- Flink CEP检测实时数据质量异常
- Apache Griffin / Deequ 开源框架
- 自定义Spark/Flink作业做定时校验