news 2026/5/30 1:56:40

线程池版流水线模式 技术笔记

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
线程池版流水线模式 技术笔记

一、模式核心思想

流水线模式本质是任务分阶段串行处理,把一个完整业务任务拆分成多道独立工序(本例拆分为 TaskA、TaskB、TaskC 三个阶段)。
每个阶段由独立线程池负责消费处理,上一阶段处理完成后,自动把任务提交给下一阶段线程池,形成生产者→阶段A→阶段B→阶段C→任务收尾的流水线链路。
// 核心特点:工序解耦、各阶段线程池独立管控、任务异步串行流转、支持高并发持续生产任务

二、整体结构分层

  1. 任务实体类 Task:封装业务数据 + 各阶段业务处理逻辑
  2. 阶段任务处理类 TaskA/TaskB/TaskC:实现 Runnable,绑定对应业务阶段,处理完自动投递下一线程池
  3. 主线程启动类 Main:初始化多组独立线程池、任务队列、完成任务集合;启动生产者线程持续生成任务;启动监控线程观测队列积压与任务计算正确性

三、核心代码实现

1. 任务实体类 Task

封装任务核心变量 num,提供三段流水线处理方法,每个方法模拟业务耗时。

packageduoxiancheng.xq0529;publicclassTask{// 任务核心运算数据intnum;// 流水线第一阶段处理publicvoidtaskA(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段A业务逻辑:数值+20num+=20;}// 流水线第二阶段处理publicvoidtaskB(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段B业务逻辑:数值*10num*=10;}// 流水线第三阶段处理publicvoidtaskC(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段C业务逻辑:数值平方num*=num;}}

// 备注:三个方法严格串行依赖,必须按 A→B→C 顺序执行,最终预期结果:(0+20)*10 再平方 = 40000

2. 阶段任务处理器

2.1 第一阶段 TaskA

执行完 taskA 后,将任务交给第二阶段线程池

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第一阶段任务publicclassTaskAimplementsRunnable{ExecutorexecutorB;// 绑定第二阶段线程池ExecutorexecutorC;// 绑定第三阶段线程池ArrayList<Task>taskDoneList;// 存放最终完成的任务Tasktask;// 当前待处理任务publicTaskA(ExecutorexecutorB,ExecutorexecutorC,ArrayList<Task>taskDoneList,Tasktask){this.executorB=executorB;this.executorC=executorC;this.taskDoneList=taskDoneList;this.task=task;}@Overridepublicvoidrun(){// 执行第一阶段业务逻辑task.taskA();// 流转:提交任务给第二阶段线程池executorB.execute(newTaskB(task,executorC,taskDoneList));}}
2.2 第二阶段 TaskB

执行完 taskB 后,将任务交给第三阶段线程池

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第二阶段任务classTaskBimplementsRunnable{Tasktask;ExecutorexecutorC;ArrayList<Task>taskDoneList;publicTaskB(Tasktask,ExecutorexecutorC,ArrayList<Task>taskDoneList){this.task=task;this.executorC=executorC;this.taskDoneList=taskDoneList;}@Overridepublicvoidrun(){// 执行第二阶段业务逻辑task.taskB();// 流转:提交任务给第三阶段线程池executorC.execute(newTaskC(task,taskDoneList));}}
2.3 第三阶段 TaskC

最后阶段处理完毕,把任务加入完成集合

packageduoxiancheng.xq0529;importjava.util.ArrayList;// 流水线第三阶段任务classTaskCimplementsRunnable{Tasktask;ArrayList<Task>taskDoneList;publicTaskC(Tasktask,ArrayList<Task>taskDoneList){this.task=task;this.taskDoneList=taskDoneList;}@Overridepublicvoidrun(){// 执行第三阶段业务逻辑task.taskC();// 整个流水线处理完成,加入完成列表taskDoneList.add(task);}}

// 备注:各阶段通过构造器传递下一级线程池与任务上下文,实现任务自动流转,无需手动阻塞等待

3. 主线程工厂启动类 Main

初始化三级线程池、队列,启动生产者持续造任务,启动监控线程观测运行状态。

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.Executor;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;publicclassMain{publicstaticvoidmain(String[]args){// 为每个阶段独立定义阻塞队列,容量20ArrayBlockingQueue<Runnable>queueA=newArrayBlockingQueue<>(20);ArrayBlockingQueue<Runnable>queueB=newArrayBlockingQueue<>(20);ArrayBlockingQueue<Runnable>queueC=newArrayBlockingQueue<>(20);// 保存所有流水线执行完成的任务ArrayList<Task>taskDoneList=newArrayList<>();// 初始化三级独立线程池 // 核心线程5、最大线程10、空闲超时1000msExecutorexecutorA=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueA);ExecutorexecutorB=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueB);ExecutorexecutorC=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueC);// 生产者线程:循环不断生成新任务,提交给第一阶段线程池newThread(()->{intcount=0;while(true){try{// 控制生产速率Thread.sleep(50);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 新建任务实例Tasktask=newTask();// 提交到流水线首阶段executorA.execute(newTaskA(executorB,executorC,taskDoneList,task));count++;System.out.println("生产者已生成任务数:"+count);}}).start();// 监控线程:定时查看队列积压、完成任务数、计算正确率newThread(()->{while(true){try{// 每秒监控一次Thread.sleep(1000);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 打印三个阶段队列积压大小System.out.println("队列A大小:"+queueA.size()+" 队列B大小:"+queueB.size()+" 队列C大小:"+queueC.size());// 统计结果正确的任务数量intcorrectCount=0;for(inti=0;i<taskDoneList.size();i++){// 符合预期结果40000即为正确if(taskDoneList.get(i).num==40000){correctCount++;}}System.out.println("已完成任务总数:"+taskDoneList.size()+" 运算正确任务数:"+correctCount);}}).start();}}

// 关键备注1:每个阶段独享线程池和队列,互不阻塞,可单独调优核心线程数、队列容量
// 关键备注2:生产者无限循环生成任务,通过 Thread.sleep 控流,避免瞬间打满队列
// 关键备注3:监控线程实时观测队列积压,可用来判断线程池参数是否合理、是否有任务堆积瓶颈

四、执行流程梳理

  1. Main 初始化 A/B/C 三组线程池 + 对应阻塞队列
  2. 生产者线程循环创建 Task 对象,提交给 executorA
  3. executorA 调度执行 TaskA,完成后把任务封装为 TaskB 提交给 executorB
  4. executorB 调度执行 TaskB,完成后封装为 TaskC 提交给 executorC
  5. executorC 调度执行 TaskC,完成后将任务加入完成列表
  6. 监控线程每秒打印队列积压、完成任务数、业务计算正确数

五、技术要点总结

  1. 流水线拆分:复杂任务拆分为多阶段,单一职责,便于维护和单独扩容
  2. 线程池隔离:各阶段独立线程池,某一阶段阻塞不影响其他阶段运行
  3. 任务自动流转:通过 Runnable 构造器传递下一级线程池,实现链式投递
  4. 流量可控:生产者通过休眠控制任务生产速率,配合有界队列防止无限积压
  5. 可观测性:内置监控线程,实时查看队列水位与任务处理正确率,方便调参和排查瓶颈
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/30 1:54:25

35岁运维被优化后,我转了网络安全:这行的前景,比你想的更稳

35岁运维被优化后&#xff0c;我转了网络安全&#xff1a;这行的前景&#xff0c;比你想的更稳 一、38 岁运维老周的困境&#xff1a;不是你不够努力&#xff0c;是赛道容不下 “老经验” 了 上周跟老周吃饭&#xff0c;他掏出手机翻着招聘软件叹气&#xff1a;“投了 20 家&…

作者头像 李华
网站建设 2026/5/30 1:54:24

神经形态计算π²架构:突破AI硬件能效瓶颈

1. 神经形态计算的互连革命&#xff1a;π架构深度解析在AI硬件加速器领域&#xff0c;一个长期被忽视的事实正逐渐浮出水面&#xff1a;当系统规模扩展到脑级复杂度时&#xff0c;超过90%的能耗并非来自计算单元&#xff0c;而是消耗在数据传输过程中。传统冯诺伊曼架构中&…

作者头像 李华
网站建设 2026/5/30 1:53:58

开源矢量网络分析仪校准精度挑战与LibreVNA的误差修正解决方案

开源矢量网络分析仪校准精度挑战与LibreVNA的误差修正解决方案 【免费下载链接】LibreVNA 100kHz to 6GHz 2 port USB based VNA 项目地址: https://gitcode.com/gh_mirrors/li/LibreVNA 在射频工程实践中&#xff0c;矢量网络分析仪的校准精度直接决定了测量结果的可靠…

作者头像 李华
网站建设 2026/5/30 1:53:47

告别闪退!Mac Monterey/M1芯片安装AccessClient堡垒机最全避坑指南

Mac全系兼容实战&#xff1a;AccessClient堡垒机安装与闪退终极解决方案每次打开AccessClient准备跳转服务器时&#xff0c;那个熟悉的闪退画面是否让你血压飙升&#xff1f;特别是当你使用的是搭载M1芯片的新款MacBook&#xff0c;或者已经升级到Monterey、Ventura系统时&…

作者头像 李华
网站建设 2026/5/30 1:52:56

基于Raspberry Pi Pico的智能植物生长监控系统DIY教程

1. 项目概述&#xff1a;为什么我们需要一个智能植物生长监控系统&#xff1f;作为一名长期在嵌入式系统和物联网领域折腾的爱好者&#xff0c;我发现在家庭种植和园艺这件事上&#xff0c;很多朋友都面临着相似的困境&#xff1a;精心挑选的植物&#xff0c;买回来时生机勃勃&…

作者头像 李华