Fork/Join框架简介

在之前Kafka改造业务流程的文章中有提到改造流程就是fork-join模型的对调版,但当时只是借用了fork-join框架的分离和归并的思想,没有涉及到Java中对Fork/Join框架的实现。

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

工作流程如下图:

“任务窃取”机制

Fork/Join本质上也是一种生产者与消费者的实现,但与ThreadPoolExecutor不同的是,Fork/Join内部有多个任务队列,当调用提交任务方法(submit、invoke)时,ForkJoinPool会按照一定的规则将任务提交到一个任务队列中,而提交后的任务在执行的过程中新创建的子任务也会工作线程对应的工作队列中。

虽然工作线程与工作队列是一一对应的,但是如果工作线程对应的工作队列中的任务空了,此时空闲的工作线程可以窃取其他工作队列中的任务,帮其他工作线程分担压力,但如果两个工作线程同时到一个队列里获取任务会导致线程间的竞争,并且需要队列同步机制来保证任务获取的可见性和原子性。

为了降低线程间竞争ForkJoinPool使用双端队列,原本工作线程和来窃取任务的线程分别从不同端来获取任务以减少数据竞争。

工作流程如下图(来自极客时间):

  • 工作窃取机制优点:

充分利用线程进行并行计算,减少了线程间的竞争。

  • 工作窃取机制缺点:

该机制会消耗了更多的系统资源,因为需要创建多个线程和多个双端队列。并且如果双端队列里只有一个任务时,还是会存在数据竞争。

Fork/Join框架的设计

首先,分割任务。需要一个fork类来把大任务分割成子任务,如果子任务还是很大,还需要不停地分割,直到分割出的子任务足够小。

其次,执行任务并合并结果。分割的子任务分别放在双端队列里,然后一些工作线程分别从双端队列里获取任务执行。如果子任务执行完成,就将结果都统一放在一个队列里,然后启动一个线程(join)从队列里拿数据,然后合并这些数据。

Fork/Join框架相关的类有两个:

  • ForkJoinTask:使用ForkJoin框架前,必须先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,ForkJoinTask类提供了以下两个子类。
    • RecursiveAction:递归方式处理,用于没有返回结果的任务。
    • RecursiveTask:递归方式处理,用于有返回结果的任务。
  • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。如果工作线程的队列里暂时没有任务,会采用任务窃取机制。

Fork/Join框架的使用

下面代码是使用Fork/Join 这个并行框架计算斐波那契数列,首先创建一个分治任务的线程池以及计算斐波那契数列的分治任务,然后调用分治任务线程池的invoke()方法启动分治任务。fork()方法相当于启动了一个异步任务。

static void main(String[] args){
  // 创建分治任务线程池
  ForkJoinPool fjp = new ForkJoinPool(4);
  // 创建分治任务
  Fibonacci fib = new Fibonacci(30);
  // 启动分治任务
  Integer result = fjp.invoke(fib);
  // 输出结果
  System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 = new Fibonacci(n - 1);
    // 创建子任务
    f1.fork();
    Fibonacci f2 = new Fibonacci(n - 2);
    // 等待子任务结果,并合并结果
    return f2.compute() + f1.join();
  }
}

Fork/Join框架的异常处理

ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。

if(task.isCompletedAbnormally()) {
    System.out.println(task.getException());
}

如果任务被取消了,getException方法返回CancellationException。如果任务没有完成或者没有抛出异常getException方法返回null。

Fork/Join框架的实现原理

  • ForkJoinTask的fork方法实现原理

当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    } else {
        ForkJoinPool.common.externalPush(this);
    }

    return this;
}

workQueue.push(this)方法把当前任务存放在WorkQueue队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。

final void push(ForkJoinTask<?> task) {
    int s = this.top;
    ForkJoinTask<?>[] a = this.array;
    if (this.array != null) {
        int m;
        int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
        U.putOrderedObject(a, (long)j, task);
        int n;
        if ((n = (this.top = s + 1) - this.base) <= 2) {
            ForkJoinPool p = this.pool;
            if (this.pool != null) {
                p.signalWork(this);
            }
        } else if (n >= m) {
            this.growArray();
        }
    }
}
  • ForkJoinTask的join方法实现原理

join方法的主要作用是阻塞当前线程并等待获取结果。

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

join()方法调用doJoin()根据返回值状态来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
    wt.pool.awaitJoin(w, this, 0L) :
    externalAwaitDone();
}

doJoin()方法首先通过查看任务的状态,看任务是否已经执行完成;
如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行;
如果任务顺利执行完成,则设置任务状态为NORMAL;
如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。

参考

最后修改日期: 2019年10月4日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。