news 2026/4/5 20:29:41

PyFlink Table API Data Types DataType 是什么、UDF 类型声明怎么写、Python / Pandas 类型映射一文搞懂

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink Table API Data Types DataType 是什么、UDF 类型声明怎么写、Python / Pandas 类型映射一文搞懂

1. DataType 是什么:逻辑类型,不等于物理存储类型

DataType 描述的是表生态里一个值的逻辑类型(Logical Type),比如BIGINTVARCHARDECIMAL(10,2)ROW<...>

关键理解点:

  • DataType 只定义“是什么类型”,不暗示它在网络传输或存储时怎么编码(那是物理表示层的事情)。
  • PyFlink 中所有预定义类型都在pyflink.table.types,并且推荐用DataTypes工具类来构造。

典型写法:

frompyflink.table.typesimportDataTypes DataTypes.BIGINT()DataTypes.STRING()DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("name",DataTypes.STRING())])

在写 UDF 时,你常会看到两种声明方式:

  • 用字符串:result_type='ROW<id BIGINT, data STRING>'
  • 用 DataTypes:result_type=DataTypes.ROW([...])

两者都行;工程里建议 DataTypes 写法更安全(IDE 友好,拼写不易错)。

2. DataType 与 Python 类型映射:UDF 参数/返回值怎么落地

当你在 UDF 声明了 DataType,Flink 会做两件事:

1)把输入列值转换成对应的 Python 对象传给你
2)要求你的返回值类型匹配你声明的 DataType(否则运行时会报类型/序列化问题)

你给的映射表可以直接当“速查表”。

2.1 标量 UDF:DataType → Python Type

Data TypePython Type
BOOLEANbool
TINYINT / SMALLINT / INT / BIGINTint
FLOAT / DOUBLEfloat
VARCHARstr
VARBINARYbytes
DECIMALdecimal.Decimal
DATEdatetime.date
TIMEdatetime.time
TimestampTypedatetime.datetime
LocalZonedTimestampTypedatetime.datetime
INTERVAL YEAR TO MONTHint(注意:pandas 不支持)
INTERVAL DAY TO SECONDdatetime.timedelta(注意:pandas 不支持)
ARRAYlist
MULTISETlist(Not Supported Yet in pandas)
MAPdict(Not Supported Yet in pandas)
ROWpyflink.common.Row

2.2 向量化 pandas UDF:输入/输出是 pandas.Series(元素类型由 DataType 决定)

文档里的核心点是:

对于 vectorized Python UDF,输入类型和输出类型是pandas.Series,Series 里每个元素类型对应你声明的 DataType。

映射中 pandas type 这一列很关键:

Data TypePandas Type
BOOLEANnumpy.bool_
INTnumpy.int32
BIGINTnumpy.int64
FLOATnumpy.float32
DOUBLEnumpy.float64
VARCHARstr
VARBINARYbytes
DECIMALdecimal.Decimal
DATE / TIME / Timestampdatetime.*
ARRAYnumpy.ndarray
ROWdict

注意:表里也写了很多Not Supported Yet,比如MAP/MULTISET/INTERVAL在 pandas UDF 场景还不支持或不完整,生产里尽量绕开或提前验证。

3. 复合类型在 UDF 中的使用:ARRAY / MAP / ROW

这是工程里最容易踩坑的部分:复合类型传到 Python 侧会变成什么?

3.1 ARRAY:Python 侧是 list(pandas 侧是 ndarray)

  • 标量 UDF:拿到的是list
  • pandas UDF:单元格可能是numpy.ndarray

示例(标量 UDF):

frompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(input_types=[DataTypes.ARRAY(DataTypes.INT())],result_type=DataTypes.INT())defarray_len(arr):returnlen(arr)ifarrisnotNoneelse0

3.2 MAP:Python 侧是 dict(pandas UDF 暂不支持)

MAP 在 Python 标量 UDF 中是dict,但 pandas UDF 不支持(表里明确写了 Not Supported Yet),如果你需要向量化处理 MAP,通常要先在 SQL/Table 层把 MAP 展开/转换成基础列。

3.3 ROW:Python 侧是 Row(pandas UDF 侧是 dict)

  • 标量 UDF:ROW →pyflink.common.Row
  • pandas UDF:ROW →dict

这也是为什么你在 Row-based Operations 里看到两种写法:

  • def func2(r: Row) -> Row: ...
  • pandas 模式下DataFrame里取列,再拼DataFrame返回

4. 为什么“声明类型”很重要:类型决定运行时转换与序列化

在 PyFlink 里,很多问题的根源都是“类型不明确”:

  • 你写 UDF 返回了 Pythonint,但声明的是STRING
  • 你返回 Row 的字段数量/顺序与声明的ROW<...>不一致
  • pandas UDF 返回列的 dtype 跟 DataType 冲突
  • 你用了 pandas UDF 但传了 MAP/INTERVAL 等 pandas 不支持类型

最稳的做法是:

  • UDF 明确写input_types/result_type(不要只靠推断)
  • 复合类型尽量用DataTypes.ROW/ARRAY/...明确结构
  • pandas UDF 场景优先用基础数值/字符串/时间类型

5. 给你一套“UDF 类型声明模板”(生产更稳)

5.1 标量 UDF:建议用 DataTypes 写清楚

frompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(input_types=[DataTypes.BIGINT(),DataTypes.STRING()],result_type=DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("data",DataTypes.STRING())]))defenrich(id_,data):frompyflink.commonimportRowreturnRow(id_,data+"_x")

5.2 pandas UDF:牢记输入输出是 Series/DataFrame(元素类型由 DataType 控制)

importpandasaspdfrompyflink.table.udfimportudffrompyflink.table.typesimportDataTypes@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id",DataTypes.BIGINT()),DataTypes.FIELD("data",DataTypes.STRING())]),func_type="pandas")defenrich_vec(df:pd.DataFrame)->pd.DataFrame:returnpd.concat([df["id"],df["data"]+"_x"],axis=1)

如果你接下来要写 CSDN 系列文章,我建议你把这一篇作为“类型基础篇”,下一篇可以直接承接你前面写的 Row-based Operations:把每种算子(map/flat_map/aggregate/flat_aggregate)里涉及到的result_typeinput_types、ROW 扁平化、pandas 模式的 dtype 坑,都用本文的类型映射做解释,会非常连贯。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/31 3:03:25

硅基流动API在智能客服中的实战应用

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个基于硅基流动API的简易智能客服demo。功能要求&#xff1a;1. 使用Flask搭建Web接口 2. 集成硅基流动的自然语言理解API 3. 实现常见问题自动回复 4. 包含对话上下文管理 …

作者头像 李华
网站建设 2026/3/28 23:16:39

智能相册进阶:用万物识别构建个性化图像搜索引擎

智能相册进阶&#xff1a;用万物识别构建个性化图像搜索引擎 作为一名摄影爱好者和技术开发者&#xff0c;我经常面临一个痛点&#xff1a;手机和硬盘里堆积如山的家庭照片难以有效管理。传统的相册应用只能按时间或地点分类&#xff0c;而我想实现更智能的搜索——比如快速找到…

作者头像 李华
网站建设 2026/3/31 15:31:11

仅限内部分享:MCP加密系统中不对外公开的4种密钥管理技巧

第一章&#xff1a;MCP加密系统安全概述 MCP&#xff08;Multi-layer Cryptographic Protocol&#xff09;加密系统是一种专为高安全性通信环境设计的多层加密协议&#xff0c;广泛应用于金融、国防及云计算领域。该系统通过结合对称加密、非对称加密与哈希验证机制&#xff0c…

作者头像 李华
网站建设 2026/4/2 3:33:16

掌握这3个MCP实验工具,效率提升300%不是梦

第一章&#xff1a;掌握MCP实验工具的核心价值MCP&#xff08;Modular Control Platform&#xff09;实验工具是一套专为自动化系统开发与测试设计的集成化环境&#xff0c;广泛应用于工业控制、嵌入式研发和算法验证场景。其核心价值在于提供模块化架构、实时数据反馈和可扩展…

作者头像 李华
网站建设 2026/3/27 15:19:23

中药方剂成分解释:Hunyuan-MT-7B避免直译产生误解

中药方剂翻译为何不能靠“字面直译”&#xff1f;Hunyuan-MT-7B 的破局之道 在中医药走向国际的进程中&#xff0c;一个看似简单却长期困扰从业者的问题浮出水面&#xff1a;如何准确翻译“黄芪”“当归”这类中药名称&#xff1f; 如果交给普通翻译工具&#xff0c;“黄芪”可…

作者头像 李华
网站建设 2026/4/2 14:07:30

万物识别模型更新:无缝切换新旧版本的技巧

万物识别模型更新&#xff1a;无缝切换新旧版本的技巧 作为一名AI产品经理&#xff0c;我经常面临一个棘手问题&#xff1a;每次更新识别模型版本时&#xff0c;服务都会中断一段时间。这不仅影响用户体验&#xff0c;还可能造成业务损失。经过多次实践&#xff0c;我总结出一套…

作者头像 李华