news 2026/6/7 0:03:13

Flink CLI 从提交作业到 Savepoint/Checkpoint、再到 YARN/K8S 与 PyFlink

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink CLI 从提交作业到 Savepoint/Checkpoint、再到 YARN/K8S 与 PyFlink

1. CLI 的工作方式:它连接谁?

bin/flink会连接到 Flink 配置文件里指定的 JobManager(或你在命令里用--jobmanager host:port指定的 JM)。
前提:必须有一个可用的 Flink 部署(本地、YARN、K8S、Standalone Session 等)。

2. 提交作业(run):最常用的入口

2.1 提交一个 JAR(推荐加 --detached)

./bin/flink run\--detached\./examples/streaming/StateMachineExample.jar
  • --detached:提交完就返回,不会一直挂在终端等作业结束
  • 输出里会给出JobID(后续 list/savepoint/stop/cancel 都靠它)

把 JobID 存变量,后续更方便:

exportJOB_ID="cca7bc1061d61cf15238e92312c2fc20"

2.2 用 -D 传递配置(发布时非常关键)

run支持-D传递额外配置,例如:

./bin/flink run --detached\-Dpipeline.max-parallelism=120\./your-job.jar

这个能力对Application Mode特别重要:你可以不改flink-conf.yaml,直接在提交时把内存、并发、checkpoint 等配置传进集群。

注意:提交到已存在的 Session 集群时,一般只支持执行相关参数(execution config)生效,别指望所有参数都能“改动集群级别行为”。

3. 监控作业(list):查运行中/排队中

./bin/flink list

它会列出:

  • Running/Restarting Jobs(运行中/重启中)
  • Scheduled Jobs(已提交但尚未启动)

实战习惯:提交后第一件事 list 一下,确认 Job 状态不是立刻 FAILED/RESTARTING。

4. Savepoint:可控的“状态快照”,用于迁移/升级/回滚

4.1 创建 Savepoint

./bin/flink savepoint\$JOB_ID\/tmp/flink-savepoints
  • savepoint 目录可选:如果execution.checkpointing.savepoint-dir没配置,就必须在命令里带上
  • 成功后会返回一个 savepoint 路径(后续--fromSavepoint用它)

4.2 Savepoint 触发超时怎么办?用 detached

状态大时,客户端等待 savepoint 完成可能超时(TimeoutException)。解决方式是“触发后立刻返回”:

./bin/flink savepoint\$JOB_ID\/tmp/flink-savepoints\-detached

这会返回一个 triggerId,之后可以通过 REST API 查询该 trigger 的完成状态(CLI 文档也建议这么做)。

4.3 Dispose Savepoint:删除 savepoint 数据与元信息

./bin/flink savepoint\--dispose\/tmp/flink-savepoints/savepoint-xxx\$JOB_ID

注意一个坑:如果你的状态里有自定义 state/自定义类(尤其 RocksDB state),dispose 时可能需要提供原作业 jar,否则会 ClassNotFound:

./bin/flink savepoint\--dispose<savepointPath>\--jarfile<jarFile>

5. 手动触发 Checkpoint:更偏“运维诊断/临时保底”

./bin/flink checkpoint$JOB_ID

如果你的作业默认跑的是 incremental checkpoint,但你想强制做一次 full checkpoint:

./bin/flink checkpoint$JOB_ID--full

Checkpoint 和 Savepoint 的关键差异(实战理解版):

  • Checkpoint:系统为容错自动做(也可手动触发),更偏“持续容错”
  • Savepoint:人为控制,用于“迁移/升级/回滚/停止再启动”

6. 停作业:stop vs cancel(一个优雅,一个粗暴)

6.1 stop:优雅停止并创建最终 Savepoint(强烈推荐用于可恢复停机)

./bin/flink stop\--savepointPath /tmp/flink-savepoints\$JOB_ID

stop 的语义是“从 source 到 sink”平滑停:

  • 让 source 发最后一次 barrier,生成 savepoint
  • savepoint 成功后,source 调用 cancel() 结束

如果你要“彻底停机并清空事件时间相关的等待”,可以加--drain

./bin/flink stop\--savepointPath /tmp/flink-savepoints\--drain\$JOB_ID

--drain会发送 MAX_WATERMARK,触发 event-time timer(比如窗口)把“该出结果的都出完”。
注意:想将来从 savepoint 恢复继续跑,通常不要 drain,否则可能引入恢复后的语义问题。

6.2 cancel:直接取消(不保证状态一致性/不做最终保存)

./bin/flink cancel$JOB_ID

文档里提到--withSavepoint在 cancel 时顺便做 savepoint 这个功能已 deprecated:生产建议用 stop 来做“取消 + 最终 savepoint”。

7. 从 Savepoint 启动作业:升级/迁移的核心套路

./bin/flink run\--detached\--fromSavepoint /tmp/flink-savepoints/savepoint-xxx\./your-job.jar

如果你的新版本作业删掉了某些算子,导致 savepoint 里有“无法恢复的状态”,但你仍想启动,可以加:

./bin/flink run\--fromSavepoint<savepointPath>\--allowNonRestoredState\...

这是“兼容演进”常用开关,但它也意味着你明确接受丢弃某些旧状态。

8. CLI Actions 速查表(你每天会用到的)

  • run:提交并运行作业(JAR/PyFlink)
  • info:打印优化后的执行图(排查 SQL/Plan 很有用)
  • list:列出运行/排队作业
  • savepoint:触发/清理 savepoint
  • checkpoint:手动触发 checkpoint(含 full)
  • stop:优雅停止并生成最终 savepoint
  • cancel:直接取消

帮助命令:

./bin/flink --help ./bin/flink<action>--help

9. 选择部署目标:–target 一把梭(Session / Application)

--target会覆盖execution.target的配置。

常见组合:

YARN:

./bin/flink run --target yarn-session... ./bin/flink run --target yarn-application...

Kubernetes:

./bin/flink run --target kubernetes-session... ./bin/flink run --target kubernetes-application...

Standalone:

./bin/flink run --targetlocal... ./bin/flink run --target remote...

理解建议:

  • session:提交到已存在集群(共享 JM/TM)
  • application:提交时起一个专属集群(更适合隔离、参数化、CI/CD)

10. PyFlink 提交:不用 jar,但要管 Python 环境与依赖

10.1 基础运行

./bin/flink run --python examples/python/table/word_count.py

先确认 Python 版本 ≥ 3.9:

python --version

10.2 带依赖文件(–pyFiles)

./bin/flink run\--python your_job.py\--pyFiles file:///user.txt,hdfs:///path/username.txt

--pyFiles会加到 PYTHONPATH(客户端与远端 python worker 都能用)。

10.3 Python 里引用 Java UDF 或外部 connector(–jarfile)

./bin/flink run\--python your_job.py\--jarfile your-udf-or-connector.jar

10.4 用模块方式提交(–pyModule)

./bin/flink run\--pyModule word_count\--pyFiles examples/python/table

10.5 YARN application 模式跑 PyFlink(典型生产形态)

你可以通过-D把 JM/TM 内存、应用名、ship-files 等都带上,还能指定 venv、python 可执行文件:

./bin/flink run -t yarn-application\-Djobmanager.memory.process.size=1024m\-Dtaskmanager.memory.process.size=1024m\-Dyarn.application.name=<ApplicationName>\-Dyarn.ship-files=/path/to/shipfiles\-pyarch shipfiles/venv.zip\-pyclientexec venv.zip/venv/bin/python3\-pyexec venv.zip/venv/bin/python3\-pyfs shipfiles\-pym word_count

一个现实限制:-pyarch通过 blob server 分发,单个归档文件大小上限 2GB,超过要放到分布式文件系统再引用。

10.6 PyFlink 相关参数速记

  • --python/-py:入口脚本
  • --pyModule/-pym:入口模块(通常配合--pyFiles
  • --pyFiles/-pyfs:代码/资源文件(zip/whl/目录都行)
  • --pyArchives/-pyarch:归档(比如 venv、数据包)
  • --pyClientExecutable:提交端 python
  • --pyExecutable:远端 worker python
  • --pyRequirements:requirements.txt + 可选离线包目录
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/30 1:19:27

ctrl_logic + axis架构设计思路

一、ap_memory axilite axi_stream架构二、代码框架void param_array_top (hls::stream<ap_uint<32> >& src,hls::stream<ap_uint<32> >& dst,ap_uint<32> param_cfg[128]//axilite_ap_memory,generate bram logic ){ #pragma HLS …

作者头像 李华
网站建设 2026/6/6 22:08:54

十三、基于 GPT2 中文模型实现歌词自动续写

在自然语言生成&#xff08;NLG&#xff09;领域&#xff0c;GPT2 凭借轻量化、易部署的特性&#xff0c;成为中文场景下文本创作的优选模型之一。本文将以 “GPT2 中文歌词生成模型” 为例&#xff0c;从代码解析、核心原理到实战优化&#xff0c;手把手教你实现歌词自动续写功…

作者头像 李华
网站建设 2026/6/3 22:50:35

AI原生决策支持平台的选型指南与评估框架

AI原生决策支持平台的选型指南与评估框架关键词&#xff1a;AI原生、决策支持平台、选型评估、企业数字化、智能决策系统摘要&#xff1a;本文从企业数字化转型的实际需求出发&#xff0c;系统讲解AI原生决策支持平台的核心概念、选型逻辑与评估框架。通过生活类比、实战案例和…

作者头像 李华
网站建设 2026/5/27 23:19:26

开题报告 微信小程序 老年人健康老友上门服务

目录微信小程序老年人健康老友上门服务概述核心功能模块技术实现要点创新性与社会价值项目技术支持可定制开发之功能亮点源码获取详细视频演示 &#xff1a;文章底部获取博主联系方式&#xff01;同行可合作微信小程序老年人健康老友上门服务概述 该小程序旨在为老年人提供便捷…

作者头像 李华