Skip to content

Instantly share code, notes, and snippets.

@0532
Created May 26, 2017 08:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 0532/4399ceb033dca950b8583c7091dee950 to your computer and use it in GitHub Desktop.
Save 0532/4399ceb033dca950b8583c7091dee950 to your computer and use it in GitHub Desktop.
fork/join多线程使用
package com.doraemoney.test.own;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import com.doraemoney.core.utils.ThreadPoolUtils;
/**
* Created By WangLichao On 2017年5月26日.
* @Desc
* RecursiveTask 并行计算,同步有返回值
* ForkJoin框架处理的任务基本都能使用递归处理,比如求斐波那契数列等,但递归算法的缺陷是:
* 一只会只用单线程处理,
* 二是递归次数过多时会导致堆栈溢出;
* ForkJoin解决了这两个问题,使用多线程并发处理,充分利用计算资源来提高效率,同时避免堆栈溢出发生。
* 当然像求斐波那契数列这种小问题直接使用线性算法搞定可能更简单,实际应用中完全没必要使用ForkJoin框架,所以ForkJoin是核弹,是用来对付大家伙的,比如超大数组排序。
* 最佳应用场景:多核、多内存、可以分割计算再合并的计算密集型任务
*/
public class RecursiveTaskTest extends RecursiveTask<Integer> {
/**
*
*/
private static final long serialVersionUID = 6253771003381008573L;
//分片阈值
public static final int threshold = 20;
private int start;
private int end;
public RecursiveTaskTest(int start,int end) {
this.start = start;
this.end = end;
}
/**
* fork()方法:将任务放入队列并安排异步执行,一个任务应该只调用一次fork()函数,除非已经执行完毕并重新初始化。
* tryUnfork()方法:尝试把任务从队列中拿出单独处理,但不一定成功。
* join()方法:等待计算完成并返回计算结果。
* isCompletedAbnormally()方法:用于判断任务计算是否发生异常。
*/
@Override
protected Integer compute() {
int total = 0;
String threadName = Thread.currentThread().getName();
boolean flag = (end-start)<=threshold;
if(flag){
for(int i= start;i<end;i++){
total+=i;
}
System.err.println(threadName+" total:"+total);
}else{
//拆解任务
int middle = (start+end)/2;
RecursiveTaskTest leftTask = new RecursiveTaskTest(start, middle);
RecursiveTaskTest rightTask = new RecursiveTaskTest(middle, end);
//使用invokeAll 当前线程会执行也会继续分发
invokeAll(leftTask, rightTask);
// 执行子任务
//fork()方法:将任务放入队列并安排异步执行,一个任务应该只调用一次fork()函数,除非已经执行完毕并重新初始化。
//使用fork 会形成一个二分法树,上面的节点得等待下面的节点做完才可以走,会导致上面的线程挂起
// leftTask.fork();
// rightTask.fork();
int left = leftTask.join();
// System.err.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
int right = rightTask.join();
total= left+right;
System.err.println(threadName+" left+right:"+total);
}
return total;
}
static int total(int start,int end){
int total = 0;
for(int i=start;i<end;i++){
total +=i;
}
return total;
}
public static void main(String[] args) {
//占用是线程时间长的,单独抽出来使用
ForkJoinPool forkJoinPool = ThreadPoolUtils.getForkJoinPool();
// ForkJoinPool forkJoinPool = new ForkJoinPool();
long s = System.currentTimeMillis();
RecursiveTaskTest task = new RecursiveTaskTest(1, 1000);
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
// Integer invoke = forkJoinPool.invoke(task);
try {
System.err.println(Thread.currentThread().getName()+" result: "+ submit.get() +" 耗时:" +(System.currentTimeMillis()-s));
} catch (Exception e) {
e.printStackTrace();
}
long s1 = System.currentTimeMillis();
int total = total(1,1000);
/**
* new ForkJoinPool();
* new 出来的线程池一定要shutdown
* 如果是使用共享的线程池资源,请勿关闭,其他模块使用线程池的时候,shutdown会报错
*/
forkJoinPool.shutdown();
System.err.println("total:"+total+" 耗时:"+(System.currentTimeMillis()-s1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment