Flink SQL DDL中CREATE TABLE WITH子句参数详解
请详细解释Flink SQL中CREATE TABLE DDL的WITH子句参数体系,包括连接器参数(connector)、格式参数(format)、扫描参数(scan.startup.mode)、以及连接参数(properties.*)。以Kafka Source为例,给出一个涵盖多个核心参数的完整DDL示例,并说明各参数的作用。
回答
我是大山
Flink SQL CREATE TABLE WITH子句参数体系:
1. 连接器参数
connector:指定连接器类型(kafka/jdbc/hbase/elasticsearch等)connector.version:连接器版本
2. 格式参数
format:消息格式(json/csv/avro/debezium-json/canal-json/changelog-json)json.ignore-parse-errors:是否忽略JSON解析错误avro-confluent.schema-registry.url:Avro Schema Registry地址debezium-json.schema-include:是否包含Schema信息
3. Kafka Source特有参数
CREATE TABLE kafka_orders (
order_id BIGINT,
user_id INT,
amount DOUBLE,
ts TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink_group',
'scan.startup.mode' = 'earliest-offset',
'scan.startup.specific-offsets' = 'partition:0,offset:100',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
| 参数 | 说明 | 可选值 |
|---|---|---|
| scan.startup.mode | 启动消费位置 | earliest-offset/latest-offset/timestamp/specific-offsets/group-offsets |
| scan.startup.timestamp-millis | timestamp模式下指定毫秒时间戳 | 1700000000000 |
| properties.* | Kafka原生参数 | auto.offset.reset/fetch.max.bytes等 |
4. Upsert Kafka参数
connector= 'upsert-kafka' 支持CDC变更流的Upsert语义