CDC同步工具Canal原理与实践
请详细介绍阿里巴巴Canal的工作原理(MySQL Binlog解析、GTID模式、HA机制),以及Canal在大数据场景中的典型应用(MySQL→Kafka→Flink实时同步)。Canal的高可用如何实现?如何处理Binlog堆积和性能问题?
回答
专业代码师
1. Canal工作原理
MySQL Master → Binlog → Canal Server → MQ(Kafka/RocketMQ)
↓
Canal Client/Adapter
核心流程:
1. Canal伪装为MySQL Slave,发送dump协议
2. MySQL推送Binlog事件(ROW格式)
3. Canal解析Binlog为Entry(Insert/Update/Delete)
4. 序列化后发送到MQ
5. Client消费MQ消息恢复数据变更
GTID模式
# canal.properties
canal.instance.mysql.master.gtid.enable = true
canal.instance.gtid.consume = true
# 基于GTID断点续传,不受Binlog文件名变化影响
2. 高可用机制
Canal Server HA:
- 多Server组成集群
- 使用ZooKeeper选主
- 主Server消费Binlog,备机Standby
- 故障时自动切换(秒级)
配置:
canal.zkServers = zk1:2181,zk2:2181,zk3:2181
canal.instance.global.mode = cluster
3. 典型应用
MySQL → Canal → Kafka → Flink → Hudi/Iceberg
# 数据格式(JSON)
{
"database": "shop",
"table": "orders",
"type": "UPDATE", # INSERT/UPDATE/DELETE
"ts": 1704067200000,
"data": {"id": 1, "status": "PAID"},
"old": {"status": "INIT"}
}
4. Binlog堆积处理
# 排查:
# 1. 查看Canal延迟
binlog.position = mysql-bin.001234:567890
current_position = mysql-bin.001235:123456
# 延迟 = 当前 - 消费位置
# 解决方案:
# 1. 增加Canal并行度
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
# 2. 调整Batch Size
canal.instance.transaction.size = 2048 # 每次解析事务数
# 3. 增大MQ缓冲
canal.mq.producerBatchSize = 16384
5. 注意事项
- Binlog格式必须为ROW
- 确保
server-id不与现有MySQL Slave冲突 - 监控Canal延迟(
delayedTime指标) - 定期清理MQ中堆积消息