news 2026/4/20 5:20:29

DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

1、它到底做了什么

  • Source 并行运行:有多少个 source 并发子任务,就把Long的序列切成多少段(sub-sequence)
  • 你提供一个GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型
  • 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)

一句话:Flink 负责发 index,你负责把 index 变成事件。

2、最小可跑示例:生成 0~999 的字符串

importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}

要点:

  • 并行度为 1 时输出是严格"Number: 0""Number: 999"顺序
  • 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出

3、限速:控制总吞吐(全局每秒不超过 N 条)

importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);

适用场景:

  • 你想模拟“上游流量”但又不想把本机打爆
  • 做算子性能对比、Backpressure 观察、checkpoint 行为观察

4、有界/无界:它“永远是 bounded”,但可以“看起来无界”

  • 语义上永远是 bounded(理论上会结束)
  • numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)

建议:

  • 要跑有限数据:考虑 BATCH mode,更贴近离线回放
  • 要模拟持续输入:用Long.MAX_VALUE+ rate limit

5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?

可以,但有个硬条件:

  • GeneratorFunction必须对输入 index 完全确定性
    也就是:同一个 index 永远生成同样的输出。

反例(会破坏确定性):

  • random()System.currentTimeMillis()、读外部可变配置、读网络请求结果

正确做法:

  • 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
  • 或者用固定 seed 的伪随机:new Random(index)(每个 index 固定)

6、Watermark:也可以在 Source 侧发“确定性水位线”

默认例子用noWatermarks(),但你完全可以:

  • 在生成事件里带 eventTime
  • 配合自定义WatermarkStrategy生成 deterministic watermarks
    适合做 event-time 窗口、乱序、迟到数据的测试演示。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 16:20:45

【数据库】【MySQL】分区表深度解析:架构设计与大数据归档实践

MySQL 分区表深度解析&#xff1a;架构设计与大数据归档实践 MySQL 分区表通过将大表物理拆分为多个独立分区&#xff0c;实现了查询性能提升、数据管理灵活性和大数据归档三大核心价值。本文将详解 RANGE/LIST/HASH 分区原理、分区裁剪优化策略及 PB 级数据归档方案。一、分区…

作者头像 李华
网站建设 2026/4/19 20:25:09

1688接入API

1688 API 是阿里巴巴旗下 B2B 批发平台的官方开放接口&#xff0c;基于 RESTful 架构与签名认证&#xff0c;以 JSON 格式提供商品、订单、供应链等全链路数据&#xff0c;核心价值是合规高效赋能采购选品、订单履约、库存协同与分销运营&#xff0c;适配批发 / 零售 / 跨境 / …

作者头像 李华
网站建设 2026/4/17 13:55:56

HoRain云--掌握jQuery事件处理全攻略

&#x1f3ac; HoRain 云小助手&#xff1a;个人主页 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个超棒的服务器购买网站&#xff0c;性价比超高&#xff0c;大内存超划算&#xff01;忍不住分享一下给大家。点击跳转到网站。 目录 ⛳️ 推荐 …

作者头像 李华
网站建设 2026/4/14 17:09:45

基于springboot 心理咨询预约系统

心理咨询预约 目录 基于springboot vue心理咨询预约系统 一、前言 二、系统功能演示 三、技术选型 四、其他项目参考 五、代码参考 六、测试参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 基于springboot vue心理咨询预约系统 一、前言 博主介绍&…

作者头像 李华