Flink SQL Catalog管理多数据源元数据
Flink SQL中Catalog的作用是什么?请解释HiveCatalog、JdbcCatalog、自定义Catalog的配置方法,以及如何通过Catalog实现跨数据源的元数据统一管理(如Flink SQL查询Hive表+Kafka Topic+MySQL维表)。演示一个包含Catalog注册的完整Flink SQL作业。
回答
我是大山
Catalog概念: Catalog管理元数据(数据库、表、函数、视图),统一Flink SQL访问不同数据源。
常见Catalog:
-
HiveCatalog:
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/etc/hive/conf' ); USE CATALOG hive_catalog; -
JdbcCatalog(PostgreSQL/MySQL):
CREATE CATALOG pg_catalog WITH ( 'type' = 'jdbc', 'base-url' = 'jdbc:postgresql://localhost:5432', 'default-database' = 'mydb', 'username' = 'user', 'password' = 'pass' ); -
自定义Catalog:
- 实现Catalog接口
- 注册:
tableEnv.registerCatalog("my_catalog", new MyCatalog())
跨源查询示例:
-- 1. 注册Catalog
CREATE CATALOG hive_cat WITH ('type'='hive', 'default-database'='ods');
CREATE CATALOG kafka_cat WITH ('type'='generic_in_memory');
-- 2. 切换到Hive Catalog创建查询
USE CATALOG hive_cat;
-- 3. 读取Kafka数据(在默认Catalog中创建Kafka表)
CREATE TABLE kafka_orders (
order_id BIGINT,
user_id INT,
amount DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 4. JOIN Hive维表 + Kafka事实表
INSERT INTO hive_cat.ods.ads_order_stats
SELECT
k.*,
d.user_name
FROM kafka_orders k
LEFT JOIN hive_cat.ods.dim_user FOR SYSTEM_TIME AS OF k.proctime AS d
ON k.user_id = d.user_id;