CodeWalk

实时数仓ODS→DWD数据清洗与过滤

作者:我是大山 · 2026-05-30 12:55

在FlinkSQL实时数仓中,ODS层到DWD层的数据清洗与过滤通常包含哪些操作?请说明如何实现脏数据过滤、字段脱敏、维度补充、数据标准化等常见DWD加工逻辑,以及如何处理维度变化(SCD Type 2)。

回答

我是大山

1. DWD层典型处理

脏数据过滤

INSERT INTO dwd_valid_orders
SELECT * FROM ods_orders
WHERE order_id IS NOT NULL
  AND amount > 0 
  AND amount < 100000000  -- 阈值过滤
  AND status IN ('PAID', 'SHIPPED', 'COMPLETED');

字段脱敏

INSERT INTO dwd_user_safe
SELECT
  user_id,
  REGEXP_REPLACE(phone, '(\\d{3})\\d{4}(\\d{4})', '$1****$2'),
  CONCAT(SUBSTRING(id_card, 1, 3), '***********', 
         SUBSTRING(id_card, -4)) AS id_card
FROM ods_users;

维度补充

-- 实时维度关联(Lookup Join)
INSERT INTO dwd_order_detail
SELECT 
  o.*,
  u.user_name,
  u.user_level,
  u.city
FROM ods_orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
  ON o.user_id = u.user_id
LEFT JOIN dim_product FOR SYSTEM_TIME AS OF o.proc_time AS p
  ON o.product_id = p.product_id;

2. SCD Type 2(渐变维度)

-- 使用Flink CDC + Changelog Stream
CREATE TABLE dim_product_scd2 (
  product_id INT,
  product_name STRING,
  price DECIMAL(10,2),
  effective_date TIMESTAMP(3),
  expiration_date TIMESTAMP(3),
  is_current BOOLEAN,
  PRIMARY KEY (product_id, effective_date) NOT ENFORCED
) WITH (...);

-- 维度变更时插入新版本,标记旧版本过期
INSERT INTO dim_product_scd2
SELECT 
  product_id, product_name, price,
  op_ts AS effective_date,
  TIMESTAMP '9999-12-31' AS expiration_date,
  true AS is_current
FROM cdc_products;

3. 数据标准化

-- 时间标准化
INSERT INTO dwd_events
SELECT
  event_id,
  -- 统一时区
  CONVERT_TZ(raw_time, 'UTC', 'Asia/Shanghai') AS event_time,
  -- 枚举标准化
  CASE status_code
    WHEN 0 THEN 'INIT'
    WHEN 1 THEN 'SUCCESS'
    WHEN -1 THEN 'FAILED'
    ELSE 'UNKNOWN'
  END AS status,
  -- JSON字段解析
  JSON_VALUE(ext_info, '$.source') AS source
FROM ods_events;