news 2026/5/29 23:23:44

PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

1. 先记住一条总原则:混用 DataStream + Table 时,用 DataStream API 配依赖

文档强调了一句非常关键的话:

如果一个 Job 里混用了 Python DataStream API 和 Python Table API,建议通过 DataStream API去指定依赖,这样两边都能生效。

也就是:

  • 纯 Table:table_env.get_config()/table_env.add_python_*
  • 混用:优先StreamExecutionEnvironmentadd_jars / add_python_file / set_python_requirements / add_python_archive / set_python_executable

2. JAR 依赖:pipeline.jars vs pipeline.classpaths vs add_jars vs add_classpaths

2.1 Table API 方式

A)pipeline.jars:上传到集群(最常用)

  • 只能file://本地路径
  • 会把 JAR上传到集群
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

Windows 示例(注意还是 file:///):

table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/json.jar")

B)pipeline.classpaths:不上传,只加到 classpath(要求集群也能访问同路径)

  • 你必须保证 client、cluster 都能访问这些 URL(比如共享盘、同目录、分发好了)
table_env.get_config().set("pipeline.classpaths","file:///opt/flink/jars/connector.jar;file:///opt/flink/jars/json.jar")

一句话:

  • 你不想折腾分发:用pipeline.jars
  • 你已经把 jar 管理好并且集群路径一致:用pipeline.classpaths

2.2 DataStream API 方式(混用场景首选)

A)add_jars(...):上传到集群

env.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")

B)add_classpaths(...):加到 client + cluster classpath(同样要求可达)

env.add_classpaths("file:///opt/flink/jars/connector1.jar","file:///opt/flink/jars/connector2.jar")

2.3 提交参数--jarfile的限制

  • 只支持一个 jar,所以多个依赖通常要求你自己打fat jar / uber jar

3. Python 依赖:三种层级(文件/目录、requirements、归档环境)

3.1 python.files / add_python_file:带“代码/包”到 PYTHONPATH

适合:

  • 你的 UDF 写在my_udf.py
  • 或者你有一坨自研包目录my_pkg/
  • 或者你打好了*.whl / *.egg(zip 本质)

Table API:

table_env.add_python_file("/path/to/my_udf.py")table_env.add_python_file("/path/to/my_pkg/")# 目录也可以

DataStream API:

env.add_python_file("/path/to/my_udf.py")env.add_python_file("/path/to/my_pkg/")

等价的还有:

  • 配置python.files
  • 提交参数-pyfs / --pyFiles

关键点:这些会被加到Python UDF worker 的 PYTHONPATH

3.2 requirements.txt / set_python_requirements:让集群 pip 安装第三方依赖

适合:

  • numpy/pandas/requests/sklearn 这种 pip 依赖
  • 你希望 Flink 在 worker 上自动安装

Table API:

table_env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir"# 可选)

DataStream API:

env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir")

离线安装(集群没网)怎么做?

文档给了关键命令:

pip download -d cached_dir -r requirements.txt --no-binary :all:

然后把这个cached_dir作为requirements_cache_dir传进去,Flink 会上传它用于离线安装。

硬要求(很容易忽视):

  • pip >= 20.3
  • setuptools >= 37.0.0
  • cached_dir 里的包必须匹配集群平台与 Python 版本(比如 manylinux、glibc、cp310/cp311)

等价的还有:

  • 配置python.requirements
  • 提交参数-pyreq / --pyRequirements

3.3 python.archives / add_python_archive:带“环境/数据/模型文件”并自动解压

适合:

  • 你要带模型、词典、数据文件
  • 你要带一个完整虚拟环境(venv/conda 打包)

Table API:

table_env.add_python_archive("/path/to/py_env.zip","myenv")

DataStream API:

env.add_python_archive("/path/to/py_env.zip","myenv")

在 UDF 里访问(相对路径):

defmy_udf():withopen("myenv/py_env/data/data.txt")asf:...

如果没写 target_dir:

table_env.add_python_archive("/path/to/py_env.zip")# UDF 内访问:open("py_env.zip/py_env/data/data.txt")

支持格式:

  • zip 系(zip/jar/whl/egg…)
  • tar 系(tar/tar.gz/tgz…)

注意:如果 archive 里是虚拟环境,一定要和集群平台一致。

4. Python 解释器:worker 端与 client 端是两回事

这是很多人线上翻车的根本原因:
client 侧需要 Python 来解析/编译 UDF;cluster 侧 worker 需要 Python 来执行 UDF。

4.1 worker 端 Python:python.executable / set_python_executable

Table API:

table_env.get_config().set_python_executable("/path/to/python")

DataStream API:

env.set_python_executable("/path/to/python")

解释器放在 archive 里(推荐“自带环境”打法)

env.add_python_archive("/path/to/py_env.zip","venv")env.set_python_executable("venv/py_env/bin/python")

注意:如果指向 archive 内路径,用相对路径,别写绝对路径。

等价方式:

  • 配置python.executable
  • 提交参数-pyexec / --pyExecutable

4.2 client 端 Python:python.client.executable / --pyClientExecutable

client 端用于“编译阶段解析 Python UDF”。你可以:

  • 直接激活你本地的 venv:source my_env/bin/activate
  • 或通过配置/参数指定:python.client.executable/-pyclientexec/PYFLINK_CLIENT_EXECUTABLE

5. 在 Java/SQL 里用 Python UDF:依赖还是走 Python 那套配置

你可以在 Java Table API 或纯 SQL 里注册 Python UDF,例如:

tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");

但 Python 依赖依然要通过这些配置/参数提供:

  • python.files
  • python.archives
  • python.requirements
  • python.executable
  • python.client.executable

6. 一套工程化推荐组合(拿去就能用)

场景A:集群能上网(最省事)

  • JAR:pipeline.jarsenv.add_jars
  • Python:set_python_requirements(requirements.txt)
  • 自研代码:add_python_file(my_udf.py / my_pkg/)
  • 模型文件:add_python_archive(model.zip, "model")

场景B:集群没网(企业内网最常见)

  • requirements 离线缓存:pip download -d cached_dir -r requirements.txt
  • 代码依赖:add_python_file(...)
  • 模型/数据:add_python_archive(...)
  • 如果集群 Python 环境不可控:直接把 venv 打包进 archive,再set_python_executable("venv/.../python")

场景C:你要“零环境依赖”的可移植作业(最稳)

  • 把 venv 打包成 zip(或 conda pack)
  • add_python_archive(venv.zip, "venv")
  • set_python_executable("venv/.../python")
  • 第三方包不再走 requirements(除非你愿意让它再装一遍)

7. 最常见的坑清单(提前避雷)

  • 混用 Table + DataStream,你只在 Table 侧配了依赖,DataStream 侧 UDF 找不到包
    → 混用就统一用StreamExecutionEnvironment

  • pipeline.classpaths指向本地路径,集群节点根本访问不到
    → 不确定就用pipeline.jars / add_jars上传

  • 离线 cached_dir 的 wheel/源码包与集群平台/Python 版本不匹配
    → 必须按集群环境构建缓存(Linux x86_64 + cp310/cp311)

  • worker 端 Python 和 client 端 Python 版本不一致,Arrow/Pandas 相关依赖经常爆炸
    → 统一版本,或者用 archive 自带解释器

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/28 17:52:55

基于大数据的图书推荐系统的设计与实现

前言基于Python的图书推荐系统是结合大数据处理、机器学习算法与Web开发技术,为用户提供个性化图书推荐服务的智能平台。其核心在于通过分析用户行为数据与图书特征,利用协同过滤、深度学习等算法生成精准推荐,同时借助爬虫技术获取多源数据&…

作者头像 李华
网站建设 2026/5/28 20:44:16

DeepSeek V4即将发布:编程能力碾压GPT和Claude,AI开发者必备收藏

DeepSeek将于2月中旬发布V4模型,据报道其编程能力可能超越GPT和Claude。作为2023年成立的中国AI公司,DeepSeek凭借低成本高效率的模型引领了AI平民化进程。其突破性在于训练部署成本远低于竞争对手,推动了效率型大模型蒸馏算法创新。尽管在新…

作者头像 李华
网站建设 2026/5/29 1:00:23

大模型Function Calling实战指南:从原理到代码,让AI更强大

本文详解大模型函数调用(Function Calling)技术,包括核心概念、与ReACT的区别、工具定义格式及应用场景。通过Python代码示例展示如何让大模型执行计算任务,获取更准确结果。Function Calling使大模型能与外部服务交互,适用于API调用、数据库…

作者头像 李华
网站建设 2026/5/28 18:35:04

Java web

一、Java Web 到底是什么?你可以把 Java Web 理解为 “用 Java 语言开发网页 / 网站 / 后台系统的技术体系”,小到个人博客、企业官网,大到电商平台(比如京东)、金融系统(银行 APP 后台)&#x…

作者头像 李华
网站建设 2026/5/29 0:09:25

为什么“零向量”必须是函数而不能是数?

为什么函数空间是“无限维”的?❓ 什么叫“基函数”?❓ 为什么傅里叶级数本质是“向量分解”?❓ 为什么“零向量”必须是函数而不能是数?❓ 在别的函数空间里,零向量长什么样?❓ 为什么向量空间的这些公理这…

作者头像 李华