在做一个长的任务的时候,需要消耗的时间很长,但是这个时候主流程又要等待这个长任务执行结束后才能执行。如这里这个长任务可能是从数据库中查出报表数据,计算封装出可以写入到报表的数据,主流程在等待往报表中写数据,但是长任务中迟迟计算不出报表数据,因此这里就需要考虑多线程,如果直接创建一个子线程,主线程还是需要等待这个子线程执行结束,这个方案不可选。基本思路就是常见多个线程同时对数据进行计算,然后将计算的结果封装到一起返回给主流程。
使用Fork-Join工具就是一个很好的解决方案,Fork-Join是在jdk1.7的时候发布出来,由Doug Lea大师开发,采用的是分而治之的思想(如二分法),将一个大任务按照一定的规则分隔成多个子任务,交给多个线程执行,然后再将子任务执行的结果合并,返回最终的结果。
1. Fork-Join
1.1 Fork-Join核心类
ForkJoinPool:任务池
ForkJoinTask:任务类
RecursiveTask:继承任务类ForkJoinTask,执行后有返回值
RecursiveAction:继承任务类ForkJoinTask,执行后没有返回值
在做任务分隔的时候,创建的类必须继承RecursiveTask
、RecursiveAction
其中一个,重写compute
方法,具体的分隔算法和数据计算的逻辑都是在此方法中实现
1.1 基本实现范式
- Fork-Join执行后会创建一个执行队列;
- 在
compute
方法中根据实现的分隔规则,分隔出多个子任务,一个子任务对应一个工作线程,并被加入到执行队列中; - 执行队列中的任务,正常得到执行结果;
- 如果需要返回结果,会将工作线程的执行结果汇合到一起得到最终结果返回。
2. 场景模拟
通过计算一个长数组的总和值来模拟(数组长度为7000)。这里这个长数组可以映射成一个大的数据量。
分隔任务实现类:
import java.util.concurrent.RecursiveTask;
// 继承RecursiveTask类
public class MyForkTask extends RecursiveTask<Integer> {
private static final Integer THRESHOLD = 1000;//表示最长长度为1000,设置阈值
private int[] arr;//需要计算的数据源
private int fromIndex;//计算开始角标
private int toIndex;//计算结束角标
public MyForkTask(int[] arr, int fromIndex, int toIndex) {
this.arr = arr;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
//重写compute方法
@Override
protected Integer compute() {
if (toIndex - fromIndex < THRESHOLD) {
//执行计算操作业务逻辑
int sum = 0;
for (int i = fromIndex; i <= toIndex; i++) {
try {
Thread.sleep(1); //模拟执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
sum += arr[i];
}
System.out.println(Thread.currentThread().getName() + "==>执行计算逻辑!");
return sum;
} else {
//执行分割子任务操作逻辑
int mid = (fromIndex + toIndex) / 2;
MyForkTask leftFork = new MyForkTask(arr, fromIndex, mid);
MyForkTask rightFork = new MyForkTask(arr, mid + 1, toIndex);
invokeAll(leftFork, rightFork);//可以是两个或者是多个任务
return leftFork.join() + rightFork.join();
}
}
}
测试类代码:
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
public class ForkTest {
public static void main(String[] args) throws InterruptedException {
int[] arr = generateArr(7000);
forkJoinTest(arr);//使用Fork-Join方式实现
normalTest(arr);//使用正常的for循环计算方式实现
}
private static void forkJoinTest(int[] arr) {
ForkJoinPool pool = new ForkJoinPool();
MyForkTask task = new MyForkTask(arr, 0, arr.length - 1);
long start = System.currentTimeMillis();
Integer forkResult = pool.invoke(task); //====代码块一
System.out.println("执行结果:" + forkResult + ",ForkJoin执行时间==>" + (System.currentTimeMillis() - start) + "ms");
}
private static void normalTest(int[] arr) throws InterruptedException {
int sum = 0;
long start = System.currentTimeMillis();
for (int i : arr) {
Thread.sleep(1);//模拟执行时间
sum += i;
}
System.out.println("执行结果:" + sum + ",Normal执行时间==>" + (System.currentTimeMillis() - start) + "ms");
}
// 生成数组
private static int[] generateArr(int length) {
int[] arr = new int[length];
for (int i = 0; i < length; i++) {
arr[i] = new Random().nextInt(length * 3);
}
return arr;
}
}
/*
执行结果:
ForkJoinPool-1-worker-2==>执行计算逻辑!
ForkJoinPool-1-worker-6==>执行计算逻辑!
ForkJoinPool-1-worker-3==>执行计算逻辑!
ForkJoinPool-1-worker-0==>执行计算逻辑!
ForkJoinPool-1-worker-5==>执行计算逻辑!
ForkJoinPool-1-worker-1==>执行计算逻辑!
ForkJoinPool-1-worker-4==>执行计算逻辑!
ForkJoinPool-1-worker-7==>执行计算逻辑!
执行结果:74059425,ForkJoin执行时间==>1098ms
执行结果:74059425,Normal执行时间==>9480ms
*/
看一下测试类中代码块一
的内容,将任务交给ForkJoinPool来执行。从执行的结果来看,当使用Fork-Join方式执行的时候,整体的执行时间要小于普通循环计算方式。性能上的提高也可见一斑。
3. Fork-Join执行源码分析
3.1 ForkJoinPool的invoke方法
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
进入invoke
方法中,首先是判断任务是否存在,然后执行externalPush
方法,最后的join
方法是让当前任务加入到主线程中,需要等当前任务执行结束才会执行主线程。看一下externalPush
方法代码如下:
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
// if判断逻辑
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
这里的if判断后的内部逻辑是不会执行的,因为这里workQueues
是空的,所以会直接进入externalSubmit
方法。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
这段代码很长,但是基本核心逻辑就是如下:
- 创建工作队列
workQueues
- 把当前任务加入到工作队列
- 最后调用
signalWork(ws,q)
唤醒工作
进入signalWork
方法,看具体的实现:
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
这里面的核心方法是tryAddWorker(c)
,就是尝试将工作线程向队列中添加。方法中会调用createWorker
方法去创建工作线程,其实到这里,MyForkTask
类中的compute
方法还没有执行。createWorkder
代码如下:
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
createWorker
中使用到ForkJoinWorkerThreadFactory
,也就是创建工作线程的工厂,将当前的ForkJoinPool
放到工厂中,工厂类在newThread
内就会去执行之前添加进来的任务MyForkTask
内的compute
方法,这里面会将compute
内创建出来的子任务放到新的工作线程中,添加到workQueues
中。待虽有任务都添加完成后,createWorker
方法中会通过wt.start()
执行队列中的任务。
4. 回首场景模拟代码
4.1 先看看join的用法
join()
方法在线程中,是将就绪状态的线程变为可执行状态,如果在主线程中,使用子线程对象调用join()
方法,主线程就会处于阻塞状态,待子线程执行结束后才会继续执行主线程的内容。也可以理解为子线程加入到主线程称为主线程的一部分。
4.2 结合源码回顾模拟代码
在ForkJoinPool
的invoke
方法中,里面也有task.join()
出现,这个就是为什么外部调用invoke
方法后的逻辑阻塞的原因。同样的在MyForkTask
类中,在invoke
执行后,会执行left.join()
,right.join()
方法,这个就是讲left和right的执行结果返回给当前的线程。说的可能会有点抽象、难懂。
图解梳理:
- task线程执行join加入了主线程,主线程需要等待task执行结束;
- task的left和right也执行了join,这个时候task就被阻塞了,需要等left和right执行结束;
- 同理left和right又会创建新的left和right,同样执行join方法;
根据这样分析可以看出,最终除了最底层的子线程执行外,高层的线程都会在阻塞状态,一直等待子线程执行结束,逐层向上递交执行结果,最后执行结果会被递交给主线程。这个方式是不是感觉和递归很像,的确这里是利用了递归的思想。
5. 和递归的比较
递归是在一个线程中执行,递归的方法都会被压入到同一个线程栈中,然后再依次执行被弹出,整个过程都是线性的,执行效率有一定的局限性,如果执行的数据量比较大的话,都堆在一个线程中,也会显得很冗余,同时可能导致栈溢出。Fork-Join利用了递归的思想,但是这里使用的多个线程实现,每次递归都会fork出新的线程,效率肯定要比普通的递归方式要高很多,但是也会造成线程创建太多的问题。因此这里要去合理的设置分隔子任务的阈值来控制线程的个数,同时也能提高效率。