news 2026/4/16 0:26:57

Flink SQL 接入 Amazon Kinesis Data Streams 版本迁移、DDL、EFO/Polling、分区与常见坑一篇搞定

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL 接入 Amazon Kinesis Data Streams 版本迁移、DDL、EFO/Polling、分区与常见坑一篇搞定

1. 先说现实:Flink 2.2 暂无可用 Connector

文档里已经写得很直白:

  • Flink 2.2:还没有可用的 Kinesis SQL Connector(依赖暂不可用)
  • Connector不在 Flink 二进制发行包里,上集群需要自己把 jar 带上(lib/ 或 uber-jar)

所以你在博客开头一定要强调一句:
如果你正在用 Flink 2.2,先别在 Maven 里死磕依赖坐标,短期要么等官方发布 2.2 对应版本,要么选用已有可用连接器的 Flink 版本线。

2. 版本与依赖:两套发行版 + TableFactory 冲突必踩

Kinesis SQL Connector 有两套分发包,原因是底层从旧的SourceFunction/SinkFunction迁移到新的Source/Sink接口。

关键规则:同一个 connector identifier(kinesis)在 Table API/SQL 中只能有一个 TableFactory
也就是说:你的应用依赖里只能放一个相关 artifact,否则会出现 TableFactory 名称冲突。

2.1 依赖与 identifier 对照表(你写博客建议直接贴表)

依赖Connector 版本Source connector identifier(接口)Sink connector identifier(接口)
flink-sql-connector-aws-kinesis-streams5.x+kinesis(Source)kinesis(Sink)
flink-sql-connector-aws-kinesis-streams4.x-N/A(不打包 source)kinesis(Sink)
flink-sql-connector-kinesis5.x+kinesis(Source), kinesis-legacy(SourceFunction)kinesis(Sink)
flink-sql-connector-kinesis4.x-kinesis(SourceFunction)kinesis(Sink)

一句话建议:

  • 新项目优先 5.x+ 的kinesis(新 Source/Sink)
  • 如果你依赖 metadata VIRTUAL 列(后面会说 bug),暂时可能要用kinesis-legacy

3. 从 v4.x 迁移到 v5.x:没有状态兼容,别幻想“原地升级不丢不停”

文档里明确:Table API/SQL 在 4.x 与 5.x 之间没有 state compatibility,因为底层实现变了。

迁移策略(官方建议思路):

  1. 停掉 v4.x 作业
  2. 启动 v5.x 的 kinesis 表
  3. source.init.position = AT_TIMESTAMP,并把source.init.timestamp设到“停作业时间稍早一点”
  4. 接受可能会有重复处理(re-processed records)

你写博客时可以用一句很工程化的话解释:
迁移不是“状态无缝接管”,而是“时间点对齐 + 幂等处理兜底”。

4. 快速开始:创建一张 Kinesis Source 表(Scan Source)

下面这段 DDL 是典型模板(按你的文档):

CREATETABLEKinesisTable(`user_id`BIGINT,`item_id`BIGINT,`category_id`BIGINT,`behavior`STRING,`ts`TIMESTAMP(3))PARTITIONEDBY(user_id,item_id)WITH('connector'='kinesis','stream.arn'='arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name','aws.region'='us-east-1','source.init.position'='LATEST','format'='csv');

这里有几个细节很重要:

  • stream.arn是必填(指向具体 KDS Stream)
  • format决定序列化/反序列化(Kinesis 本身只存二进制,不理解 schema)
  • PARTITIONED BY在写入(sink)时会影响分区策略(后面详解)
  • source.init.position常见就是LATESTAT_TIMESTAMP

5. Metadata VIRTUAL 列:很香,但新 Source 有 bug,得用 legacy

文档给了一个“已知问题”:

  • kinesis(新 Source)目前不支持 VIRTUAL 列
  • 需要 metadata(timestamp / shard-id / sequence-number)时,建议用kinesis-legacy

metadata 字段含义(只读 VIRTUAL):

  • timestamp:记录写入 stream 的近似时间
  • shard-id:来源 shard
  • sequence-number:shard 内序号

示例 DDL(按文档):

CREATETABLEKinesisTable(`user_id`BIGINT,`item_id`BIGINT,`category_id`BIGINT,`behavior`STRING,`ts`TIMESTAMP(3),`arrival_time`TIMESTAMP(3)METADATAFROM'timestamp'VIRTUAL,`shard_id`VARCHAR(128)NOTNULLMETADATAFROM'shard-id'VIRTUAL,`sequence_number`VARCHAR(128)NOTNULLMETADATAFROM'sequence-number'VIRTUAL)PARTITIONEDBY(user_id,item_id)WITH('connector'='kinesis-legacy','stream'='user_behavior','aws.region'='us-east-2','scan.stream.initpos'='LATEST','format'='csv');

实战建议:
如果你做的是“延迟观测、按 shard 排障、顺序诊断”,metadata 列非常有价值;否则可以先用新kinesis,等 bug 修复再切。

6. 配置项拆解:写 DDL 时你真正需要关心什么

下面按文档的 Option 分组,把最有用的部分提炼出来。

6.1 Common Options(通用)

  • connectorkinesiskinesis-legacy
  • stream.arn(新)/stream(legacy):stream 标识
  • format:csv/json/avro 等
  • aws.region:区域
  • aws.endpoint:自定义 endpoint(VPC endpoint / Localstack)
  • aws.trust.all.certificates:测试用,生产不建议开

6.2 Authentication(鉴权)

支持多种 credentials provider(AUTO/BASIC/PROFILE/ASSUME_ROLE/WEB_IDENTITY_TOKEN/CUSTOM),常用落地建议:

  • 生产环境:优先 IAM Role(或 AssumeRole)
  • 开发联调:PROFILE 或 BASIC(避免把 AK/SK 硬编码进 DDL 仓库)

6.3 Source Options(读 Kinesis)

核心关注点有 5 个:

  1. 起始位置
  • source.init.position(新)
  • scan.stream.initpos(legacy)
    常用:LATESTAT_TIMESTAMP(配合 timestamp)
  1. shard 发现频率
  • source.shard.discovery.interval(默认 10s)
  1. 读取模式:POLLING vs EFO
  • source.reader.type = POLLING | EFO
  1. POLLING 关键参数
  • source.shard.get-records.max-record-count:单次拉取上限(默认 10000)
  1. EFO 关键参数
  • source.efo.consumer.name:消费者名
  • source.efo.lifecycle = JOB_MANAGED | SELF_MANAGED
  • 以及订阅/注销超时、DescribeStreamConsumer 的指数退避重试等

直观理解:

  • POLLING 更通用,配置简单
  • EFO(Enhanced Fan-Out)更偏低延迟/高隔离,但要管理 consumer(名称、生命周期、注册/注销)

6.4 Sink Options(写 Kinesis)

写入端你最该盯紧的就三类:

  1. 分区策略
  • sink.partitioner
  • sink.partitioner-field-delimiter
  1. 背压与缓冲
  • sink.batch.max-size(默认 500)
  • sink.requests.max-inflight(默认 16)
  • sink.requests.max-buffered(默认 10000)
  • sink.flush-buffer.size(默认 5242880 bytes)
  • sink.flush-buffer.timeout(默认 5000 ms)
  1. 错误策略
  • sink.fail-on-error(默认 false):失败是否直接 fail 作业(注意:这会显著影响容错与恢复策略)

另外sink.producer.*在新体系里属于 legacy 的遗留项:能映射的会映射,不能映射的会 warn。

7. Sink 分区策略:shard 多了之后,不分区你就等着热点

Kinesis 是 shard 架构,写入时 PartitionKey 决定落到哪个 shard。SQL connector 用sink.partitioner控制 PartitionKey 的生成方式:

  • fixed:PartitionKey 由 Flink subtask index 推导(每个 subtask 基本固定打到一个分区)
  • random:随机(没有PARTITION BY时的默认)
  • 自定义 partitioner:org.mycompany.MyPartitioner

重点规则(很容易踩):

  • 如果表定义了PARTITION BY,PartitionKey 会由这些字段拼接得到
  • 这时不能再用sink.partitioner改行为,否则直接配置错误
  • 你仍然可以用sink.partitioner-field-delimiter控制拼接分隔符(比如|或空字符串)

工程建议:
写入有热点 key 的业务(比如 user_id 极度倾斜)要特别注意:PARTITION BY字段选错了,Kinesis shard 会被打爆。

8. Data Type Mapping:Kinesis 不懂结构,format 才是“schema 的灵魂”

Kinesis 存的就是 Base64 编码的二进制 blob,本身没有 schema。
你在 DDL 里写的字段类型能不能解析,全靠:

  • format = 'csv' / 'json' / 'avro' ...

建议在博客里提醒两点:

  • json 更适合 schema 演进(字段增减)
  • csv 更轻,但对字段顺序、分隔符、空值非常敏感

9. 一套可直接跑的 SQL Pipeline 模板(Source -> Sink)

下面给你一个“能复制粘贴”的结构(你可以按需改字段与 format)。

9.1 Source 表

CREATETABLEkds_source(user_idBIGINT,item_idBIGINT,category_idBIGINT,behavior STRING,tsTIMESTAMP(3))WITH('connector'='kinesis','stream.arn'='arn:aws:kinesis:us-east-1:012345678901:stream/my-source-stream','aws.region'='us-east-1','source.init.position'='LATEST','source.reader.type'='POLLING','format'='json');

9.2 Sink 表(Streaming Append / Batch Sink / Unbounded Sink 按文档能力)

CREATETABLEkds_sink(user_idBIGINT,item_idBIGINT,category_idBIGINT,behavior STRING,tsTIMESTAMP(3))PARTITIONEDBY(user_id)WITH('connector'='kinesis','stream.arn'='arn:aws:kinesis:us-east-1:012345678901:stream/my-sink-stream','aws.region'='us-east-1','sink.batch.max-size'='500','sink.requests.max-inflight'='16','sink.requests.max-buffered'='10000','sink.flush-buffer.timeout'='5000','format'='json');

9.3 写入任务

INSERTINTOkds_sinkSELECTuser_id,item_id,category_id,behavior,tsFROMkds_source;

如果你需要更强的分区控制:

  • 没有PARTITION BY:用sink.partitioner = fixed/random/custom
  • PARTITION BY:PartitionKey 由字段拼接,必要时调sink.partitioner-field-delimiter

10. 本地/测试联调:VPC Endpoint / Localstack 一把梭

如果你在测试环境不想直连 AWS,可以用aws.endpoint指向内网 endpoint 或 Localstack,同时测试场景下可配置aws.trust.all.certificates=true(生产别开)。

示例(概念写法):

WITH('aws.region'='us-east-1','aws.endpoint'='http://localhost:4566','aws.trust.all.certificates'='true')

11. 常见坑清单(写给未来的自己,也写给读者)

  1. 依赖冲突
    同时引入flink-sql-connector-aws-kinesis-streamsflink-sql-connector-kinesis会导致 TableFactory 冲突

  2. Flink 2.2 没依赖
    DDL 写得再对,jar 没有就是跑不起来

  3. v4 -> v5 没有状态兼容
    迁移靠时间点对齐,接受重复处理,业务侧做幂等

  4. metadata VIRTUAL 列新 Source 有 bug
    需要 metadata 先用kinesis-legacy

  5. shard 扩缩容与 shard discovery interval
    扩容后发现慢,先检查source.shard.discovery.interval

  6. 写入热点
    PARTITION BY字段选错、分布不均,会把某几个 shard 打到限流

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/3 5:49:25

AI智能体的开发费用

AI智能体的开发费用已经形成了一套从“开箱即用”到“深度定制”的分层体系。由于技术成熟度和工具链的完善,成本比起两年前已有显著优化,但对于企业级应用,深度集成和安全合规依然是主要的支出项。以下是AI智能体开发费用的详细构成与预估&a…

作者头像 李华
网站建设 2026/4/9 18:06:31

C++ 有哪些性能分析工具?

2026年 C 性能分析(Profiling)工具全景(基于当前社区共识与生产实践) C 性能分析工具主要分为几大类:采样型(Sampling)、插桩型(Instrumentation)、内存专用、硬件级深度…

作者头像 李华
网站建设 2026/3/22 22:45:47

如何快速配置游戏模组系统:3步实现性能最大化

如何快速配置游戏模组系统:3步实现性能最大化 【免费下载链接】HS2-HF_Patch Automatically translate, uncensor and update HoneySelect2! 项目地址: https://gitcode.com/gh_mirrors/hs/HS2-HF_Patch 游戏模组配置是提升游戏体验的关键环节,合…

作者头像 李华
网站建设 2026/4/2 10:36:08

危化品储罐状态远程监测自动告警系统方案

随着工业安全与智能化管理要求的不断提升,危化品储罐的实时监控与安全管理成为企业运营的关键环节。传统储罐监测依赖人工巡检、就地仪表读取,存在监测频率低、响应延迟、异常发现滞后等隐患,难以满足现代工业对安全、高效、合规的严格要求。…

作者头像 李华