CodeWalk

数据质量监控体系设计与实现

作者:编译有声 · 2026-05-30 12:55

请设计一个大数据数据质量监控体系,涵盖数据完整性、准确性、一致性、及时性和唯一性五个维度。如何实现自动化监控?如何定义数据质量SLA指标?当数据质量异常时如何告警和自动补偿?

回答

编译有声

1. 五维监控体系

维度指标监控SQL示例
完整性空值率SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END)/COUNT(*)
准确性数据范围WHERE amount BETWEEN 0 AND 10^8
一致性跨表校验SELECT a.id FROM a LEFT JOIN b ON a.id=b.id WHERE b.id IS NULL
及时性延迟时间SELECT MAX(proctime-event_time) FROM stream
唯一性主键重复SELECT pk, COUNT(*) FROM t GROUP BY pk HAVING COUNT(*) > 1

2. 自动化监控架构

数据质量平台
├── 规则配置模块 → YAML/JSON规则定义
├── 调度引擎 → Airflow/DolphinScheduler定时触发
├── 执行引擎 → Spark SQL / Flink SQL
├── 结果存储 → MySQL / ES
└── 告警通道 → 邮件/钉钉/飞书/Slack

规则示例

rules:
  - name: "订单表空值检测"
    table: ods_orders
    type: COMPLETENESS
    column: order_id
    threshold: 0.01  # 空值率≤1%
    severity: P0
    schedule: "0 */1 * * *"  # 每小时

3. SLA指标定义

ODS层:完整性≥99.9%,延迟≤5min
DWD层:准确性≥99.99%,一致性=100%
DWS层:及时性≤1min
ADS层:可用性≥99.99%

4. 异常告警与补偿

告警分级

  • P0(严重):主表数据丢失 → 电话告警 → 立即停服修复
  • P1(高):数据延迟超过2h → 即时通讯告警
  • P2(中):轻微空值率超标 → 邮件告警
  • P3(低):趋势预警 → 周报

自动补偿

# 补偿策略
if anomaly_type == "LOST_DATA":
    trigger_backfill(date_range, source="kafka")
elif anomaly_type == "DELAY":
    resume_checkpoint()
elif anomaly_type == "DUPLICATE":
    run_dedup_pipeline()

5. 技术实现

  • Flink CEP检测实时数据质量异常
  • Apache Griffin / Deequ 开源框架
  • 自定义Spark/Flink作业做定时校验