news 2026/5/30 22:46:39

Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema:KafkaSource 中反序列化 POJO

JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。

典型用法:KafkaSource 只消费 value,反序列化成 POJO:

JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();

适用场景:

  • Kafka 的 value 是 JSON
  • 你希望在 DataStream 里直接拿到业务对象SomePojo

工程建议:

  • POJO 字段尽量使用包装类型(Integer/Long)应对字段缺失或 null
  • 为了兼容字段变动,可以配合 ObjectMapper 设置忽略未知字段(见第 3 节)

2. JsonSerializationSchema:KafkaSink 中序列化 POJO

写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。

典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:

JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();

适用场景:

  • 你希望下游系统继续消费 JSON
  • 你不想自己手写 Jackson 序列化逻辑

3. 自定义 ObjectMapper:控制 Jackson 行为(非常常用)

Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。

你可以用它做很多工程级增强,比如:

  • 忽略未知字段(兼容上游 schema 变更)
  • 注册模块(Java 时间类型、参数名模块等)
  • 开启/关闭某些序列化特性(字段排序、空值处理等)

示例:自定义序列化 mapper,让 map key 有序,并注册模块:

JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));

你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):

  • FAIL_ON_UNKNOWN_PROPERTIES关闭
  • JavaTimeModule 等

(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)

4. PyFlink:Row 类型用 JsonRowSerializationSchema / JsonRowDeserializationSchema

在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:

  • JsonRowDeserializationSchema
  • JsonRowSerializationSchema

这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。

KafkaSource:JSON -> Row

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()

KafkaSink:Row -> JSON

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()

适用场景:

  • Python 处理流数据,行结构清晰
  • Kafka 中 value 为 JSON

5. 选型建议:POJO vs ObjectNode vs Row

  • Java POJO:类型安全、IDE 友好、适合稳定 schema 的业务流
  • ObjectNode:更灵活,适合 schema 频繁变化、半结构化数据
  • PyFlink Row:Python 生态更顺手,适合表/行式处理
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/30 4:00:24

Java 25 中的虚拟线程

一、Java 25 虚拟线程核心定位虚拟线程&#xff08;Project Loom 核心特性&#xff09;自 Java 19 预览、Java 21 正式发布后&#xff0c;Java 25 并未新增颠覆性功能&#xff0c;而是聚焦调度优化、稳定性提升、场景适配&#xff0c;让虚拟线程在生产环境中更易用、更高效。二…

作者头像 李华
网站建设 2026/5/28 22:00:57

使用C#控制台批量删除 Unity目录里的 .meta文件

因为Unity会生成.meta文件,有的时候比如我 SteamingAssets里面有很多视频文件 是.mp4格式的,某些原因我需要将里面的所有视频文件改为.webm格式,那么会残留很多 .meta文件我们可以创建一个控制台,批量删除class Program {static void Main(string[] args){if (args.Length 0 |…

作者头像 李华
网站建设 2026/5/30 3:47:37

全方位CRM源码系统功能详解,完全开源,支持个性化定制

温馨提示&#xff1a;文末有资源获取方式 随着市场竞争加剧&#xff0c;企业销售团队亟需一套高效工具来管理客户关系和优化销售流程。一款专为销售团队设计的CRM客户关系管理系统源码应运而生&#xff0c;它集成了多种实用功能&#xff0c;帮助企业实现客户数据整合、商机追踪…

作者头像 李华
网站建设 2026/5/28 20:59:04

机器人仿真技术十年演进

下面给你一条专门聚焦机器人仿真&#xff08;Simulation&#xff09;的 「机器人仿真技术十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“更逼真画面”“更快物理引擎”的表层叙事&#xff0c;直指仿真在机器人长期工程化与规模化中真正承担的角色变化。一…

作者头像 李华