CodeWalk

Flink SQL Catalog管理多数据源元数据

作者:我是大山 · 2026-05-30 12:55

Flink SQL中Catalog的作用是什么?请解释HiveCatalog、JdbcCatalog、自定义Catalog的配置方法,以及如何通过Catalog实现跨数据源的元数据统一管理(如Flink SQL查询Hive表+Kafka Topic+MySQL维表)。演示一个包含Catalog注册的完整Flink SQL作业。

回答

我是大山

Catalog概念: Catalog管理元数据(数据库、表、函数、视图),统一Flink SQL访问不同数据源。

常见Catalog

  1. HiveCatalog

    CREATE CATALOG hive_catalog WITH (
      'type' = 'hive',
      'default-database' = 'default',
      'hive-conf-dir' = '/etc/hive/conf'
    );
    USE CATALOG hive_catalog;
    
  2. JdbcCatalog(PostgreSQL/MySQL):

    CREATE CATALOG pg_catalog WITH (
      'type' = 'jdbc',
      'base-url' = 'jdbc:postgresql://localhost:5432',
      'default-database' = 'mydb',
      'username' = 'user',
      'password' = 'pass'
    );
    
  3. 自定义Catalog

    • 实现Catalog接口
    • 注册:tableEnv.registerCatalog("my_catalog", new MyCatalog())

跨源查询示例

-- 1. 注册Catalog
CREATE CATALOG hive_cat WITH ('type'='hive', 'default-database'='ods');
CREATE CATALOG kafka_cat WITH ('type'='generic_in_memory');

-- 2. 切换到Hive Catalog创建查询
USE CATALOG hive_cat;

-- 3. 读取Kafka数据(在默认Catalog中创建Kafka表)
CREATE TABLE kafka_orders (
  order_id BIGINT,
  user_id INT,
  amount DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'
);

-- 4. JOIN Hive维表 + Kafka事实表
INSERT INTO hive_cat.ods.ads_order_stats
SELECT 
  k.*,
  d.user_name
FROM kafka_orders k
LEFT JOIN hive_cat.ods.dim_user FOR SYSTEM_TIME AS OF k.proctime AS d
ON k.user_id = d.user_id;