简介
Java7引入的线程池实现,适合于计算可以被递归执行的任务,并且这些任务都是是计算密集型的,不会有IO阻塞。
ForkJoinPool中有两个关键点:
- 分治法:将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立、并且与原问题性质相同,求出子问题的解后,将这些解合并,就可以得出原问题的解,例如二分法。
- 工作窃取:当某个线程的任务队列中没有任务时,从其他线程的任务队列中获取任务来执行,以充分利用工作线程的窃取能力,减少由于线程获取不到任务而造成的空闲浪费。在工作窃取机制中,每个线程都有自己的双端队列存储任务,线程从自身队列的尾部获取任务,此时是后进先出,如果当前线程空闲,会从其他线程的任务队列的头部获取任务,此时是先进先出,获取任务均是通过CAS操作实现的。
ForkJoinPool基于工作窃取算法,能高效利用多核处理器,提升并发性能。
入门案例
案例1:求start到end之间的和
需求:
/** * 求x到y之间的和 */publicclassSumTaskextendsRecursiveTask<BigInteger>{privatestaticfinalLongTHRESHOLD=102400000L;privatefinalIntegerstart;privatefinalIntegerend;publicSumTask(Integerstart,Integerend){this.start=start;this.end=end;}@OverrideprotectedBigIntegercompute(){if(end-start<THRESHOLD){BigIntegerresult=BigInteger.ZERO;longlen=end-start+1;longs=start;for(longi=0;i<len;i++){result=result.add(BigInteger.valueOf(s));s++;}returnresult;}else{intmid=start+(end-start)/2;SumTaskt1=newSumTask(start,mid);SumTaskt2=newSumTask(mid+1,end);invokeAll(t1,t2);BigIntegerjoin=t1.join();BigIntegerjoin1=t2.join();returnjoin.add(join1);}}}publicclassSumTaskTest{publicstaticvoidmain(String[]args){longstartTime=System.currentTimeMillis();ForkJoinPoolcommonPool=ForkJoinPool.commonPool();BigIntegerresult=commonPool.invoke(newSumTask(1,Integer.MAX_VALUE));longexecuteTime=System.currentTimeMillis()-startTime;System.out.println("结果 = "+result.toString()+" 耗时 = "+executeTime+"ms");}}案例2:斐波那契数列
斐波那契数列,第一项是0还是1? 第一项为0和第一项为1,都符合定义,都可以正确计算。最初斐波那契在《计算之书》(1202年)中以兔子繁殖为例提出数列时,起始项为 1、1。 随着数学体系的发展,第一项为0被引入进来。通常如果没有特殊说明,第一项通常是0
importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.RecursiveTask;publicclassFibonacciTaskextendsRecursiveTask<Long>{privatestaticfinalIntegerTHRESHOLD=10;privatefinallongn;publicFibonacciTask(Longn){if(n==null){thrownewIllegalArgumentException("参数 n 不可为null");}if(n<0){thrownewIllegalArgumentException("参数 n 不可小于0");}this.n=n;}@OverrideprotectedLongcompute(){if(n<=THRESHOLD){returncomputeSequentially();}List<FibonacciTask>list=newArrayList<>();list.add(newFibonacciTask(n-1));list.add(newFibonacciTask(n-2));invokeAll(list);returnlist.get(0).join()+list.get(1).join();}/** * 计算斐波那契数列 * @return 成员变量n对应的数列值 */privateLongcomputeSequentially(){if(n<=1){returnn;}longa=0,b=1;for(longi=2;i<n+1;i++){longtmp=b;b=a+b;a=tmp;}returnb;}}importjava.util.concurrent.ForkJoinPool;publicclassFibonacciTaskTest{publicstaticvoidmain(String[]args){ForkJoinPoolcommonPool=ForkJoinPool.commonPool();longstartTime=System.currentTimeMillis();FibonacciTaskfibonacciTask=newFibonacciTask(100L);Longresult=commonPool.invoke(fibonacciTask);longexecuteTime=System.currentTimeMillis()-startTime;System.out.println(executeTime+"ms, result = "+result);}}案例3:无返回值的异步任务,归并排序
publicclassMergeSortTask2<TextendsComparable<T>>extendsRecursiveAction{privatestaticfinalIntegerTHRESHOLD=100000;privatefinalT[]arr;privatefinalintstart;privatefinalintend;publicMergeSortTask2(T[]arr){this.arr=arr;start=0;end=arr.length-1;}privateMergeSortTask2(T[]arr,intstart,intend){this.arr=arr;this.start=start;this.end=end;}@Overrideprotectedvoidcompute(){if(end-start<=THRESHOLD){mergeSort(arr,start,end);}else{intmid=calMid(start,end);MergeSortTask2<T>task1=newMergeSortTask2<>(arr,start,mid);MergeSortTask2<T>task2=newMergeSortTask2<>(arr,mid+1,end);// 先提交task1到线程池,再在当前线程计算task2,然后再阻塞地等待task1的结果task1.fork();task2.compute();task1.join();merge(arr,start,mid,end);}}/** * 计算start和end的中间值 */privateintcalMid(intstart,intend){returnstart+(end-start)/2;}// 归并排序privatevoidmergeSort(T[]arr,intleft,intright){if(left==right){return;}intmid=calMid(left,right);mergeSort(arr,left,mid);mergeSort(arr,mid+1,right);merge(arr,left,mid,right);}@SuppressWarnings("unchecked")privatevoidmerge(T[]arr,intstart,intmid,intend){intlen=end-start+1;T[]tmpArr=(T[])newComparable[len];inti=start;intj=mid+1;intk=0;while(i<=mid&&j<=end){if(arr[i].compareTo(arr[j])<=0){tmpArr[k++]=arr[i++];}else{tmpArr[k++]=arr[j++];}}while(i<=mid){tmpArr[k++]=arr[i++];}while(j<=end){tmpArr[k++]=arr[j++];}System.arraycopy(tmpArr,0,arr,start,len);}}测试:
@Testpublicvoidtest3(){Integer[]arr=buildIntegerArr(n);// 100000000longstartTime=System.currentTimeMillis();MergeSortTask2<Integer>integerMergeSortTask=newMergeSortTask2<>(arr);integerMergeSortTask.invoke();// 1亿大小的数组,使用ForkJoinPool排序,花费 98438ms// System.out.println("arr = " + Arrays.toString(arr));System.out.println("result = "+(System.currentTimeMillis()-startTime)+"ms");}/** * 构建待排序的数组 */privatestaticInteger[]buildIntegerArr(intlimit){longstartTime=System.currentTimeMillis();Integer[]arr=newInteger[limit];Randomrandom=newRandom();for(inti=0;i<limit;i++){arr[i]=random.nextInt(limit*10);}System.out.println("构建测试"+limit+"大小的数组,花费 "+(System.currentTimeMillis()-startTime)+"ms");returnarr;}案例4:ForkJoinPool 状态监控
/** * 打印ForkJoinPool的状态 */publicstaticvoidprintCommonPoolStatus(ForkJoinPoolcommonPool){if(commonPool==null){return;}System.out.println("----------------------");System.out.println("并行度 = "+commonPool.getParallelism()+"\n"+"工作线程数 = "+commonPool.getPoolSize()+"\n"+"活跃的线程数 = "+commonPool.getActiveThreadCount()+"\n"+"运行中的线程数 = "+commonPool.getRunningThreadCount()+"\n"+"排队提交的数量 = "+commonPool.getQueuedSubmissionCount()+"\n"+"排队任务的数量 = "+commonPool.getQueuedTaskCount()+"\n"+"当前所有线程是否空闲中 = "+commonPool.isQuiescent()+"\n"+"偷窃任务数 = "+commonPool.getStealCount()+"\n"+"是否异步模式 = "+commonPool.getAsyncMode()+"\n"+"是否还有未执行的任务 = "+commonPool.hasQueuedSubmissions()+"\n"+"线程工厂 = "+commonPool.getFactory()+"\n"+"异常处理器 = "+commonPool.getUncaughtExceptionHandler());System.out.println("======================");}提交任务后,另起一个线程,循环调用这个方法,打印线程池的状态
基本使用
ForkJoinPool,自带一个公共线程池,线程数是CPU核数减1,如果用户向ForkJoinPool中提交任务时,没有指定线程池,就是案例中的那种写法,就使用默认线程池。
向ForkJoinPool中提交任务时指定线程池:
ForkJoinPoolforkJoinPool=newForkJoinPool(4);forkJoinPool.execute(integerMergeSortTask);ForkJoinPool中执行的任务,需要继承RecursiveTask或RecursiveAction,一个是有返回值的任务,一个是无返回值的任务,任务本身负责自身的拆分,直到拆分到一定阈值再执行。这里就是把单线程执行的计算任务拆分为多线程可以执行的,最后再合并它的结果。同时基于工作窃取机制,确保执行任务时所有线程都不会空闲,除非数据倾斜导致某些任务的计算量比较大,否则所有线程都可以保证一个很好的效率,好的一点是如何拆分数据取决于用户,他可以在拆分时避免数据倾斜。
向线程池中提交task,或者调用task本身的invoke方法,都可以把任务提交到线程池。