实时数仓ODS→DWD数据清洗与过滤
在FlinkSQL实时数仓中,ODS层到DWD层的数据清洗与过滤通常包含哪些操作?请说明如何实现脏数据过滤、字段脱敏、维度补充、数据标准化等常见DWD加工逻辑,以及如何处理维度变化(SCD Type 2)。
回答
我是大山
1. DWD层典型处理
脏数据过滤
INSERT INTO dwd_valid_orders
SELECT * FROM ods_orders
WHERE order_id IS NOT NULL
AND amount > 0
AND amount < 100000000 -- 阈值过滤
AND status IN ('PAID', 'SHIPPED', 'COMPLETED');
字段脱敏
INSERT INTO dwd_user_safe
SELECT
user_id,
REGEXP_REPLACE(phone, '(\\d{3})\\d{4}(\\d{4})', '$1****$2'),
CONCAT(SUBSTRING(id_card, 1, 3), '***********',
SUBSTRING(id_card, -4)) AS id_card
FROM ods_users;
维度补充
-- 实时维度关联(Lookup Join)
INSERT INTO dwd_order_detail
SELECT
o.*,
u.user_name,
u.user_level,
u.city
FROM ods_orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id
LEFT JOIN dim_product FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p.product_id;
2. SCD Type 2(渐变维度)
-- 使用Flink CDC + Changelog Stream
CREATE TABLE dim_product_scd2 (
product_id INT,
product_name STRING,
price DECIMAL(10,2),
effective_date TIMESTAMP(3),
expiration_date TIMESTAMP(3),
is_current BOOLEAN,
PRIMARY KEY (product_id, effective_date) NOT ENFORCED
) WITH (...);
-- 维度变更时插入新版本,标记旧版本过期
INSERT INTO dim_product_scd2
SELECT
product_id, product_name, price,
op_ts AS effective_date,
TIMESTAMP '9999-12-31' AS expiration_date,
true AS is_current
FROM cdc_products;
3. 数据标准化
-- 时间标准化
INSERT INTO dwd_events
SELECT
event_id,
-- 统一时区
CONVERT_TZ(raw_time, 'UTC', 'Asia/Shanghai') AS event_time,
-- 枚举标准化
CASE status_code
WHEN 0 THEN 'INIT'
WHEN 1 THEN 'SUCCESS'
WHEN -1 THEN 'FAILED'
ELSE 'UNKNOWN'
END AS status,
-- JSON字段解析
JSON_VALUE(ext_info, '$.source') AS source
FROM ods_events;