Fork-Join内部实现原理分析


在做一个长的任务的时候,需要消耗的时间很长,但是这个时候主流程又要等待这个长任务执行结束后才能执行。如这里这个长任务可能是从数据库中查出报表数据,计算封装出可以写入到报表的数据,主流程在等待往报表中写数据,但是长任务中迟迟计算不出报表数据,因此这里就需要考虑多线程,如果直接创建一个子线程,主线程还是需要等待这个子线程执行结束,这个方案不可选。基本思路就是常见多个线程同时对数据进行计算,然后将计算的结果封装到一起返回给主流程。
使用Fork-Join工具就是一个很好的解决方案,Fork-Join是在jdk1.7的时候发布出来,由Doug Lea大师开发,采用的是分而治之的思想(如二分法),将一个大任务按照一定的规则分隔成多个子任务,交给多个线程执行,然后再将子任务执行的结果合并,返回最终的结果。

1. Fork-Join

1.1 Fork-Join核心类

ForkJoinPool:任务池
ForkJoinTask:任务类
RecursiveTask:继承任务类ForkJoinTask,执行后有返回值
RecursiveAction:继承任务类ForkJoinTask,执行后没有返回值
在做任务分隔的时候,创建的类必须继承RecursiveTaskRecursiveAction其中一个,重写compute方法,具体的分隔算法和数据计算的逻辑都是在此方法中实现

1.1 基本实现范式

  • Fork-Join执行后会创建一个执行队列;
  • compute方法中根据实现的分隔规则,分隔出多个子任务,一个子任务对应一个工作线程,并被加入到执行队列中;
  • 执行队列中的任务,正常得到执行结果;
  • 如果需要返回结果,会将工作线程的执行结果汇合到一起得到最终结果返回。

    2. 场景模拟

    通过计算一个长数组的总和值来模拟(数组长度为7000)。这里这个长数组可以映射成一个大的数据量。
    分隔任务实现类:
import java.util.concurrent.RecursiveTask;
// 继承RecursiveTask类
public class MyForkTask extends RecursiveTask<Integer> {

    private static final Integer THRESHOLD = 1000;//表示最长长度为1000,设置阈值
    private int[] arr;//需要计算的数据源
    private int fromIndex;//计算开始角标
    private int toIndex;//计算结束角标

    public MyForkTask(int[] arr, int fromIndex, int toIndex) {
        this.arr = arr;
        this.fromIndex = fromIndex;
        this.toIndex = toIndex;
    }

	//重写compute方法
    @Override
    protected Integer compute() {
        if (toIndex - fromIndex < THRESHOLD) {
            //执行计算操作业务逻辑
            int sum = 0;
            for (int i = fromIndex; i <= toIndex; i++) {
                try {
                    Thread.sleep(1); //模拟执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                sum += arr[i];
            }
            System.out.println(Thread.currentThread().getName() + "==>执行计算逻辑!");
            return sum;
        } else {
            //执行分割子任务操作逻辑
            int mid = (fromIndex + toIndex) / 2;
            MyForkTask leftFork = new MyForkTask(arr, fromIndex, mid);
            MyForkTask rightFork = new MyForkTask(arr, mid + 1, toIndex);
            invokeAll(leftFork, rightFork);//可以是两个或者是多个任务
            return leftFork.join() + rightFork.join();
        }
    }
}

测试类代码:

import java.util.Random;
import java.util.concurrent.ForkJoinPool;

public class ForkTest {

    public static void main(String[] args) throws InterruptedException {
        int[] arr = generateArr(7000);
        forkJoinTest(arr);//使用Fork-Join方式实现
        normalTest(arr);//使用正常的for循环计算方式实现
    }

    private static void forkJoinTest(int[] arr) {
        ForkJoinPool pool = new ForkJoinPool();
        MyForkTask task = new MyForkTask(arr, 0, arr.length - 1);
        long start = System.currentTimeMillis();
        Integer forkResult = pool.invoke(task); //====代码块一
        System.out.println("执行结果:" + forkResult + ",ForkJoin执行时间==>" + (System.currentTimeMillis() - start) + "ms");
    }

    private static void normalTest(int[] arr) throws InterruptedException {
        int sum = 0;
        long start = System.currentTimeMillis();
        for (int i : arr) {
            Thread.sleep(1);//模拟执行时间
            sum += i;
        }
        System.out.println("执行结果:" + sum + ",Normal执行时间==>" + (System.currentTimeMillis() - start) + "ms");
    }

    // 生成数组
    private static int[] generateArr(int length) {
        int[] arr = new int[length];
        for (int i = 0; i < length; i++) {
            arr[i] = new Random().nextInt(length * 3);
        }
        return arr;
    }
}
/*
执行结果:
ForkJoinPool-1-worker-2==>执行计算逻辑!
ForkJoinPool-1-worker-6==>执行计算逻辑!
ForkJoinPool-1-worker-3==>执行计算逻辑!
ForkJoinPool-1-worker-0==>执行计算逻辑!
ForkJoinPool-1-worker-5==>执行计算逻辑!
ForkJoinPool-1-worker-1==>执行计算逻辑!
ForkJoinPool-1-worker-4==>执行计算逻辑!
ForkJoinPool-1-worker-7==>执行计算逻辑!
执行结果:74059425,ForkJoin执行时间==>1098ms
执行结果:74059425,Normal执行时间==>9480ms
*/

看一下测试类中代码块一的内容,将任务交给ForkJoinPool来执行。从执行的结果来看,当使用Fork-Join方式执行的时候,整体的执行时间要小于普通循环计算方式。性能上的提高也可见一斑。

3. Fork-Join执行源码分析

3.1 ForkJoinPool的invoke方法

public <T> T invoke(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();
}

进入invoke方法中,首先是判断任务是否存在,然后执行externalPush方法,最后的join方法是让当前任务加入到主线程中,需要等当前任务执行结束才会执行主线程。看一下externalPush方法代码如下:

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    // if判断逻辑
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}

这里的if判断后的内部逻辑是不会执行的,因为这里workQueues是空的,所以会直接进入externalSubmit方法。

private void externalSubmit(ForkJoinTask<?> task) {
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    for (;;) {
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        else if ((rs & STARTED) == 0 ||     // initialize
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a = q.array;
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);
                        U.putOrderedInt(q, QTOP, s + 1);
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) {
                    signalWork(ws, q);
                    return;
                }
            }
            move = true;                   // move on failure
        }
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q;                 // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

这段代码很长,但是基本核心逻辑就是如下:

  • 创建工作队列workQueues
  • 把当前任务加入到工作队列
  • 最后调用signalWork(ws,q)唤醒工作

进入signalWork方法,看具体的实现:

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        if ((sp = (int)c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c);
            break;
        }
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

这里面的核心方法是tryAddWorker(c),就是尝试将工作线程向队列中添加。方法中会调用createWorker方法去创建工作线程,其实到这里,MyForkTask类中的compute方法还没有执行。createWorkder代码如下:

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    deregisterWorker(wt, ex);
    return false;
}

createWorker中使用到ForkJoinWorkerThreadFactory,也就是创建工作线程的工厂,将当前的ForkJoinPool放到工厂中,工厂类在newThread内就会去执行之前添加进来的任务MyForkTask内的compute方法,这里面会将compute内创建出来的子任务放到新的工作线程中,添加到workQueues中。待虽有任务都添加完成后,createWorker方法中会通过wt.start()执行队列中的任务。

4. 回首场景模拟代码

4.1 先看看join的用法

join()方法在线程中,是将就绪状态的线程变为可执行状态,如果在主线程中,使用子线程对象调用join()方法,主线程就会处于阻塞状态,待子线程执行结束后才会继续执行主线程的内容。也可以理解为子线程加入到主线程称为主线程的一部分。

4.2 结合源码回顾模拟代码

ForkJoinPoolinvoke方法中,里面也有task.join()出现,这个就是为什么外部调用invoke方法后的逻辑阻塞的原因。同样的在MyForkTask类中,在invoke执行后,会执行left.join()right.join()方法,这个就是讲left和right的执行结果返回给当前的线程。说的可能会有点抽象、难懂。

图解梳理:

  • task线程执行join加入了主线程,主线程需要等待task执行结束;
  • task的left和right也执行了join,这个时候task就被阻塞了,需要等left和right执行结束;
  • 同理left和right又会创建新的left和right,同样执行join方法;

根据这样分析可以看出,最终除了最底层的子线程执行外,高层的线程都会在阻塞状态,一直等待子线程执行结束,逐层向上递交执行结果,最后执行结果会被递交给主线程。这个方式是不是感觉和递归很像,的确这里是利用了递归的思想。

5. 和递归的比较

递归是在一个线程中执行,递归的方法都会被压入到同一个线程栈中,然后再依次执行被弹出,整个过程都是线性的,执行效率有一定的局限性,如果执行的数据量比较大的话,都堆在一个线程中,也会显得很冗余,同时可能导致栈溢出。Fork-Join利用了递归的思想,但是这里使用的多个线程实现,每次递归都会fork出新的线程,效率肯定要比普通的递归方式要高很多,但是也会造成线程创建太多的问题。因此这里要去合理的设置分隔子任务的阈值来控制线程的个数,同时也能提高效率。


文章作者: 程序猿洞晓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 程序猿洞晓 !
评论
 上一篇
Spring cloud Eureka实现思路和源码解析 Spring cloud Eureka实现思路和源码解析
现在很多公司都在使用Spring Boot和Spring Cloud构建微服务,目前来说这些技术在国内虽然应用很多,但是相关的中文版资料不是很多,关于内部原理分析的文章也是少之又少。现在团队开始准备使用Spring Cloud,最近也在研究相关的知识点,参考了很多博客,写了这篇文章,让我们一起来探讨Spring Cloud的组件Eureka的实现原理。
2018-07-02
下一篇 
Spring中Lazy、Scope注解对IOC容器Bean初始化的影响分析 Spring中Lazy、Scope注解对IOC容器Bean初始化的影响分析
面试的时候总是会遇到各种Spring主要功能点的问题,因为Spring对于java来说太重要。如Spring的IOC容器、动态代理、事务、切面编程等等。后期再更新文章的时候我们会慢慢讨论这些东西,这里现在我们先看其中一个功能点IOC容器,其实也不算是说IOC容器,主要的重点是放在Spring注解对IOC容器初始化的影响,也是在面试中最常说漏的。
2018-06-27
  目录