news 2026/3/22 14:50:40

Java多线程(十)ForkJoinPool

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java多线程(十)ForkJoinPool

简介

Java7引入的线程池实现,适合于计算可以被递归执行的任务,并且这些任务都是是计算密集型的,不会有IO阻塞。

ForkJoinPool中有两个关键点:

  • 分治法:将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立、并且与原问题性质相同,求出子问题的解后,将这些解合并,就可以得出原问题的解,例如二分法。
  • 工作窃取:当某个线程的任务队列中没有任务时,从其他线程的任务队列中获取任务来执行,以充分利用工作线程的窃取能力,减少由于线程获取不到任务而造成的空闲浪费。在工作窃取机制中,每个线程都有自己的双端队列存储任务,线程从自身队列的尾部获取任务,此时是后进先出,如果当前线程空闲,会从其他线程的任务队列的头部获取任务,此时是先进先出,获取任务均是通过CAS操作实现的。

ForkJoinPool基于工作窃取算法,能高效利用多核处理器,提升并发性能。

入门案例

案例1:求start到end之间的和

需求:

/** * 求x到y之间的和 */publicclassSumTaskextendsRecursiveTask<BigInteger>{privatestaticfinalLongTHRESHOLD=102400000L;privatefinalIntegerstart;privatefinalIntegerend;publicSumTask(Integerstart,Integerend){this.start=start;this.end=end;}@OverrideprotectedBigIntegercompute(){if(end-start<THRESHOLD){BigIntegerresult=BigInteger.ZERO;longlen=end-start+1;longs=start;for(longi=0;i<len;i++){result=result.add(BigInteger.valueOf(s));s++;}returnresult;}else{intmid=start+(end-start)/2;SumTaskt1=newSumTask(start,mid);SumTaskt2=newSumTask(mid+1,end);invokeAll(t1,t2);BigIntegerjoin=t1.join();BigIntegerjoin1=t2.join();returnjoin.add(join1);}}}publicclassSumTaskTest{publicstaticvoidmain(String[]args){longstartTime=System.currentTimeMillis();ForkJoinPoolcommonPool=ForkJoinPool.commonPool();BigIntegerresult=commonPool.invoke(newSumTask(1,Integer.MAX_VALUE));longexecuteTime=System.currentTimeMillis()-startTime;System.out.println("结果 = "+result.toString()+" 耗时 = "+executeTime+"ms");}}
案例2:斐波那契数列

斐波那契数列,第一项是0还是1? 第一项为0和第一项为1,都符合定义,都可以正确计算。最初斐波那契在《计算之书》(1202年)中以兔子繁殖为例提出数列时,起始项为 1、1。 随着数学体系的发展,第一项为0被引入进来。通常如果没有特殊说明,第一项通常是0

importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.RecursiveTask;publicclassFibonacciTaskextendsRecursiveTask<Long>{privatestaticfinalIntegerTHRESHOLD=10;privatefinallongn;publicFibonacciTask(Longn){if(n==null){thrownewIllegalArgumentException("参数 n 不可为null");}if(n<0){thrownewIllegalArgumentException("参数 n 不可小于0");}this.n=n;}@OverrideprotectedLongcompute(){if(n<=THRESHOLD){returncomputeSequentially();}List<FibonacciTask>list=newArrayList<>();list.add(newFibonacciTask(n-1));list.add(newFibonacciTask(n-2));invokeAll(list);returnlist.get(0).join()+list.get(1).join();}/** * 计算斐波那契数列 * @return 成员变量n对应的数列值 */privateLongcomputeSequentially(){if(n<=1){returnn;}longa=0,b=1;for(longi=2;i<n+1;i++){longtmp=b;b=a+b;a=tmp;}returnb;}}importjava.util.concurrent.ForkJoinPool;publicclassFibonacciTaskTest{publicstaticvoidmain(String[]args){ForkJoinPoolcommonPool=ForkJoinPool.commonPool();longstartTime=System.currentTimeMillis();FibonacciTaskfibonacciTask=newFibonacciTask(100L);Longresult=commonPool.invoke(fibonacciTask);longexecuteTime=System.currentTimeMillis()-startTime;System.out.println(executeTime+"ms, result = "+result);}}
案例3:无返回值的异步任务,归并排序
publicclassMergeSortTask2<TextendsComparable<T>>extendsRecursiveAction{privatestaticfinalIntegerTHRESHOLD=100000;privatefinalT[]arr;privatefinalintstart;privatefinalintend;publicMergeSortTask2(T[]arr){this.arr=arr;start=0;end=arr.length-1;}privateMergeSortTask2(T[]arr,intstart,intend){this.arr=arr;this.start=start;this.end=end;}@Overrideprotectedvoidcompute(){if(end-start<=THRESHOLD){mergeSort(arr,start,end);}else{intmid=calMid(start,end);MergeSortTask2<T>task1=newMergeSortTask2<>(arr,start,mid);MergeSortTask2<T>task2=newMergeSortTask2<>(arr,mid+1,end);// 先提交task1到线程池,再在当前线程计算task2,然后再阻塞地等待task1的结果task1.fork();task2.compute();task1.join();merge(arr,start,mid,end);}}/** * 计算start和end的中间值 */privateintcalMid(intstart,intend){returnstart+(end-start)/2;}// 归并排序privatevoidmergeSort(T[]arr,intleft,intright){if(left==right){return;}intmid=calMid(left,right);mergeSort(arr,left,mid);mergeSort(arr,mid+1,right);merge(arr,left,mid,right);}@SuppressWarnings("unchecked")privatevoidmerge(T[]arr,intstart,intmid,intend){intlen=end-start+1;T[]tmpArr=(T[])newComparable[len];inti=start;intj=mid+1;intk=0;while(i<=mid&&j<=end){if(arr[i].compareTo(arr[j])<=0){tmpArr[k++]=arr[i++];}else{tmpArr[k++]=arr[j++];}}while(i<=mid){tmpArr[k++]=arr[i++];}while(j<=end){tmpArr[k++]=arr[j++];}System.arraycopy(tmpArr,0,arr,start,len);}}

测试:

@Testpublicvoidtest3(){Integer[]arr=buildIntegerArr(n);// 100000000longstartTime=System.currentTimeMillis();MergeSortTask2<Integer>integerMergeSortTask=newMergeSortTask2<>(arr);integerMergeSortTask.invoke();// 1亿大小的数组,使用ForkJoinPool排序,花费 98438ms// System.out.println("arr = " + Arrays.toString(arr));System.out.println("result = "+(System.currentTimeMillis()-startTime)+"ms");}/** * 构建待排序的数组 */privatestaticInteger[]buildIntegerArr(intlimit){longstartTime=System.currentTimeMillis();Integer[]arr=newInteger[limit];Randomrandom=newRandom();for(inti=0;i<limit;i++){arr[i]=random.nextInt(limit*10);}System.out.println("构建测试"+limit+"大小的数组,花费 "+(System.currentTimeMillis()-startTime)+"ms");returnarr;}
案例4:ForkJoinPool 状态监控
/** * 打印ForkJoinPool的状态 */publicstaticvoidprintCommonPoolStatus(ForkJoinPoolcommonPool){if(commonPool==null){return;}System.out.println("----------------------");System.out.println("并行度 = "+commonPool.getParallelism()+"\n"+"工作线程数 = "+commonPool.getPoolSize()+"\n"+"活跃的线程数 = "+commonPool.getActiveThreadCount()+"\n"+"运行中的线程数 = "+commonPool.getRunningThreadCount()+"\n"+"排队提交的数量 = "+commonPool.getQueuedSubmissionCount()+"\n"+"排队任务的数量 = "+commonPool.getQueuedTaskCount()+"\n"+"当前所有线程是否空闲中 = "+commonPool.isQuiescent()+"\n"+"偷窃任务数 = "+commonPool.getStealCount()+"\n"+"是否异步模式 = "+commonPool.getAsyncMode()+"\n"+"是否还有未执行的任务 = "+commonPool.hasQueuedSubmissions()+"\n"+"线程工厂 = "+commonPool.getFactory()+"\n"+"异常处理器 = "+commonPool.getUncaughtExceptionHandler());System.out.println("======================");}

提交任务后,另起一个线程,循环调用这个方法,打印线程池的状态

基本使用

ForkJoinPool,自带一个公共线程池,线程数是CPU核数减1,如果用户向ForkJoinPool中提交任务时,没有指定线程池,就是案例中的那种写法,就使用默认线程池。

向ForkJoinPool中提交任务时指定线程池:

ForkJoinPoolforkJoinPool=newForkJoinPool(4);forkJoinPool.execute(integerMergeSortTask);

ForkJoinPool中执行的任务,需要继承RecursiveTask或RecursiveAction,一个是有返回值的任务,一个是无返回值的任务,任务本身负责自身的拆分,直到拆分到一定阈值再执行。这里就是把单线程执行的计算任务拆分为多线程可以执行的,最后再合并它的结果。同时基于工作窃取机制,确保执行任务时所有线程都不会空闲,除非数据倾斜导致某些任务的计算量比较大,否则所有线程都可以保证一个很好的效率,好的一点是如何拆分数据取决于用户,他可以在拆分时避免数据倾斜。

向线程池中提交task,或者调用task本身的invoke方法,都可以把任务提交到线程池。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/22 11:55:14

ET框架:重塑Unity游戏服务器开发的革命性架构

ET框架&#xff1a;重塑Unity游戏服务器开发的革命性架构 【免费下载链接】ET Unity3D 客户端和 C# 服务器框架。 项目地址: https://gitcode.com/GitHub_Trending/et/ET 在游戏开发技术快速迭代的今天&#xff0c;传统服务器架构正面临着前所未有的性能瓶颈和开发效率挑…

作者头像 李华
网站建设 2026/3/15 15:34:29

图像分割组件化设计:从单体模型到生产级可复用架构

图像分割组件化设计&#xff1a;从单体模型到生产级可复用架构 引言&#xff1a;图像分割的技术演进与现实挑战 图像分割作为计算机视觉的核心任务之一&#xff0c;已经从传统的阈值分割、边缘检测发展到如今的深度学习驱动方法。随着Transformer架构的崛起和大型基础模型的出现…

作者头像 李华
网站建设 2026/3/15 13:30:55

Emby弹幕插件:打造私人影院的弹幕互动盛宴

还在为独自观影感到乏味吗&#xff1f;emby-danmaku弹幕插件为你带来B站般的弹幕互动体验&#xff0c;让私人影院瞬间充满社交氛围。这款专为Emby设计的智能弹幕工具&#xff0c;能够从多源平台获取高质量弹幕数据&#xff0c;彻底改变你的观影方式。 【免费下载链接】dd-danma…

作者头像 李华
网站建设 2026/3/16 0:55:44

TensorFlow Serving部署实战:打造高性能在线推理服务

TensorFlow Serving部署实战&#xff1a;打造高性能在线推理服务 在今天的AI驱动型业务中&#xff0c;一个训练好的模型如果无法快速、稳定地服务于线上请求&#xff0c;其价值将大打折扣。尤其是在电商推荐、金融风控、智能客服等对响应延迟极为敏感的场景下&#xff0c;如何把…

作者头像 李华
网站建设 2026/3/15 12:53:15

突破性LLM评估实战指南:从数据验证到性能优化的完整解决方案

突破性LLM评估实战指南&#xff1a;从数据验证到性能优化的完整解决方案 【免费下载链接】deepeval The Evaluation Framework for LLMs 项目地址: https://gitcode.com/GitHub_Trending/de/deepeval 还在为LLM输出质量的不确定性而烦恼吗&#xff1f;&#x1f914; 面对…

作者头像 李华
网站建设 2026/3/22 11:57:19

ChanlunX缠论插件完整教程:5分钟掌握专业级技术分析

想要在瞬息万变的股市中精准把握买卖时机吗&#xff1f;ChanlunX缠论插件将复杂的缠中说禅理论转化为直观的可视化界面&#xff0c;让技术分析变得简单高效。这款通达信专属工具通过智能算法自动识别K线走势中的关键结构&#xff0c;即使是投资新手也能快速上手专业级分析技术。…

作者头像 李华