CodeWalk

Flink SQL DDL中CREATE TABLE WITH子句参数详解

作者:我是大山 · 2026-05-30 12:55

请详细解释Flink SQL中CREATE TABLE DDL的WITH子句参数体系,包括连接器参数(connector)、格式参数(format)、扫描参数(scan.startup.mode)、以及连接参数(properties.*)。以Kafka Source为例,给出一个涵盖多个核心参数的完整DDL示例,并说明各参数的作用。

回答

我是大山

Flink SQL CREATE TABLE WITH子句参数体系:

1. 连接器参数

  • connector:指定连接器类型(kafka/jdbc/hbase/elasticsearch等)
  • connector.version:连接器版本

2. 格式参数

  • format:消息格式(json/csv/avro/debezium-json/canal-json/changelog-json)
  • json.ignore-parse-errors:是否忽略JSON解析错误
  • avro-confluent.schema-registry.url:Avro Schema Registry地址
  • debezium-json.schema-include:是否包含Schema信息

3. Kafka Source特有参数

CREATE TABLE kafka_orders (
  order_id BIGINT,
  user_id INT,
  amount DOUBLE,
  ts TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'flink_group',
  'scan.startup.mode' = 'earliest-offset',
  'scan.startup.specific-offsets' = 'partition:0,offset:100',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);
参数说明可选值
scan.startup.mode启动消费位置earliest-offset/latest-offset/timestamp/specific-offsets/group-offsets
scan.startup.timestamp-millistimestamp模式下指定毫秒时间戳1700000000000
properties.*Kafka原生参数auto.offset.reset/fetch.max.bytes等

4. Upsert Kafka参数

  • connector = 'upsert-kafka' 支持CDC变更流的Upsert语义