forkjointask就是forkjoinpool里面的每一个任务。他主要有两个子类:recursiveactionrecursivetask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果,

这就是整个过程的运用。他有两个子类,使用这两个子类都可以实现我们的任务分配和计算。

(1)recursiveaction 一个递归无结果的forkjointask(没有返回值)

(2)recursivetask 一个递归有结果的forkjointask(有返回值)

forkjoinpool:中含有一个workqueues队列;

workqueues:由forkjointask数组和workerthread和指向forkjoinpool的引用;

forkjointask数组负责存放程序提交给forkjoinpool的任务,而workerthread数组负责执行这些任务,forkjoinpool的引用是为了当forkjointask数组中的任务处理完之后再次获取任务交给workerthread进行处理。整个结构大致如下图:

知识点扩展:java并发fork-join框架原理解析

1、什么是foirk/join框架

fork/join框架是java7提供用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

2、什么是并行流与顺序流2.1 什么是并行流?

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

2.2 工作窃取模式

某个线程从其他队列里窃取任务来执行,

3、使用fork/join框架

/**
 *  累加运算测试
 */
public class forkjoincalculate extends recursivetask<long> {
 
    /**
     * 
     */
    private static final long serialversionuid = 7125244951292834932l;
     
    private long start;// 起始值
    private long end;// 结束值
    private static final long threshold = 10000l;// 临界值
 
    @override
    protected long compute() {
        long length = end - start;
         
        if(length <= threshold) {
            long sum = 0l;
             
            for (long i = start; i <= end; i++) {
                sum += i;
            }
             
            return sum;
        }else {
            long middle = (start + end) / 2;// 中间值
             
            forkjoincalculate left = new forkjoincalculate(start, middle);// 0-50000000
            left.fork();// 拆分子任务,同时压入线程队列
            forkjoincalculate right = new forkjoincalculate(middle + 1, end);// 50000001-100000000
            right.fork();// 拆分子任务,同时压入线程队列
             
            return left.join() + right.join();// 汇总任务结果
        }
    }
     
    public forkjoincalculate() {
    }
 
    public forkjoincalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }
}

4、java8中的并行流和顺序流

4.1顺序流

/**
     * java8的顺序流
     */
    @test
    public void test3() {
        instant start = instant.now();// java8中新时间日期api
         
        longstream.rangeclosed(0, 10000000000l)
                    .sequential()// 顺序流
                    .reduce(0, long::sum);
         
        instant end = instant.now();
         
        // 5780
        system.out.println("耗费时间为:" + duration.between(start, end).tomillis());// java8中新时间日期api
    }

4.2步行流

/**
     * java8的并行流
     */
    @test
    public void test4() {
        instant start = instant.now();// java8中新时间日期api
         
        longstream.rangeclosed(0, 10000000000l)
                    .parallel()// 并行流
                    .reduce(0, long::sum);
         
        instant end = instant.now();
         
        // 2392
        system.out.println("耗费时间为:" + duration.between(start, end).tomillis());// java8中新时间日期api
    }

到此这篇关于java中fork-join的原理解析的文章就介绍到这了,更多相关java中fork-join原理内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!