李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
49.Fork&Join框架介绍
Leefs
2022-11-24 PM
647℃
0条
[TOC] ### 一、概述 Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的**CPU密集型运算**。 + 所谓的**任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解**。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。 + Fork/Join 在**分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成**,进一步提升了运算效率。 + Fork/Join 默认会创建与CPU核心数大小相同的线程池。 ### 二、Fork/Join框架介绍 **Fork/Join框架主要包含三个模块**: + 线程池:`ForkJoinPool` + 任务对象: `ForkJoinTask` (包括`RecursiveTask`、`RecursiveAction` 和 `CountedCompleter`) + 执行Fork/Join任务的线程: `ForkJoinWorkerThread` ![49.Fork&Join框架介绍01.png](https://lilinchao.com/usr/uploads/2022/11/210904555.png) **三者的关系** `ForkJoinPool`可以通过池中的`ForkJoinWorkerThread`来处理`ForkJoinTask`任务。 `ForkJoinPool` 只接收 `ForkJoinTask` 任务(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 `ForkJoinTask` 类型的任务),`RecursiveTask` 是 `ForkJoinTask` 的子类,是一个可以递归执行的 `ForkJoinTask`,`RecursiveAction` 是一个无返回值的 `RecursiveTask`,`CountedCompleter` 在任务完成执行后会触发执行一个自定义的钩子函数。 在实际运用中,我们一般都会继承 `RecursiveTask` 、`RecursiveAction` 或 `CountedCompleter` 来实现我们的业务需求,而不会直接继承 `ForkJoinTask` 类。 ### 三、Fork/Join框架核心思想 #### 3.1 分治思想(Divide-and-Conquer) 分治算法(Divide-and-Conquer)把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。 首先看一下 Fork/Join 框架的任务运行机制如下图所示: ![49.Fork&Join框架介绍02.png](https://lilinchao.com/usr/uploads/2022/11/2765276694.png) - **fork()**:开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。 - **join()**:等待该任务的处理线程处理完毕,获得返回值。 当然并不会每个 fork 都会创建新线程, 也不是每个 join 都会造成线程被阻塞, 而是采取work-stealing 原理。 #### 3.2 work-stealing(工作窃取)算法 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。 这种特性使得 `ForkJoinPool` 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 `ForkJoinPool` 时,对不需要合并(join)的事件类型任务也非常适用。 在 `ForkJoinPool` 中,线程池中每个工作线程(`ForkJoinWorkerThread`)都对应一个任务队列(`WorkQueue`),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。 **具体思路如下**: - 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。 - 队列支持三个功能push、pop、poll - push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。 - 划分的子任务调用fork时,都会被push到自己的队列中。 - 默认情况下,工作线程从自己的双端队列获取任务并执行。 - 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务 ![49.Fork&Join框架介绍03.png](https://lilinchao.com/usr/uploads/2022/11/4009308786.png) ### 四、基本使用 > 案例:计算对 `1~n` 之间的整数求和的任务 **注意**:提交给 Fork/Join 线程池的任务需要继承 `RecursiveTask`(有返回值)或 `RecursiveAction`(没有返回值) #### 通过递归来实现 ![49.Fork&Join框架介绍04.png](https://lilinchao.com/usr/uploads/2022/11/4189210341.png) **代码实现** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * Created by lilinchao * Date 2022/11/24 * Description 计算对 1~n 之间的整数求和的任务 */ public class AddTaskDemo01 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new AddTask1(5))); } } @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask
{ int n; public AddTask1(int n) { this.n = n; } @Override public String toString() { return "{" + n + "}"; } @Override protected Integer compute() { // 如果 n 已经为1,可以求得结果 if ( n == 1) { log.debug(" join() {}",n); return n; } // 将任务进行拆分(fork) AddTask1 t1 = new AddTask1(n - 1); // 安排在当前任务正在运行的池中异步执行此任务(如果适用) t1.fork(); log.debug("fork() {} + {}",n,t1); // 合并(join)结果 int result = n + t1.join(); log.debug("join() {} + {} = {}", n, t1, result); return result; } } ``` **运行结果** ``` 21:50:06.238 [ForkJoinPool-1-worker-2] DEBUG c.AddTask - fork() 4 + {3} 21:50:06.238 [ForkJoinPool-1-worker-1] DEBUG c.AddTask - fork() 5 + {4} 21:50:06.238 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - fork() 3 + {2} 21:50:06.242 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - join() 1 21:50:06.238 [ForkJoinPool-1-worker-0] DEBUG c.AddTask - fork() 2 + {1} 21:50:06.242 [ForkJoinPool-1-worker-0] DEBUG c.AddTask - join() 2 + {1} = 3 21:50:06.243 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - join() 3 + {2} = 6 21:50:06.243 [ForkJoinPool-1-worker-2] DEBUG c.AddTask - join() 4 + {3} = 10 21:50:06.243 [ForkJoinPool-1-worker-1] DEBUG c.AddTask - join() 5 + {4} = 15 15 ``` #### 优化ForkJoin 可以在计算的时候中间再次拆分任务 ![49.Fork&Join框架介绍05.png](https://lilinchao.com/usr/uploads/2022/11/3048897368.png) **代码实现** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * Created by lilinchao * Date 2022/11/24 * Description 1.0 */ public class AddTaskDemo02 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new AddTask2(1, 5))); } } @Slf4j(topic = "c.AddTask2") class AddTask2 extends RecursiveTask
{ int begin; int end; public AddTask2(int begin, int end) { this.begin = begin; this.end = end; } @Override public String toString() { return "{" + begin + "," + end + "}"; } @Override protected Integer compute() { // 当 begin == end 时, 无法再进行拆分 // 5,5 if (begin == end) { log.debug("join() {}", begin); return begin; } // 4, 5 if (end - begin == 1) { log.debug("join() {} + {} = {}", begin, end, end + begin); return end + begin; } // 1 5 int mid = (end + begin) / 2; // 3 AddTask2 t1 = new AddTask2(begin, mid); // 1,3 t1.fork(); AddTask2 t2 = new AddTask2(mid + 1, end); // 4,5 t2.fork(); log.debug("fork() {} + {} = ?", t1, t2); int result = t1.join() + t2.join(); log.debug("join() {} + {} = {}", t1, t2, result); return result; } } ``` **运行结果** ``` 21:58:03.222 [ForkJoinPool-1-worker-0] DEBUG c.AddTask2 - join() 1 + 2 = 3 21:58:03.222 [ForkJoinPool-1-worker-2] DEBUG c.AddTask2 - fork() {1,2} + {3,3} = ? 21:58:03.222 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - fork() {1,3} + {4,5} = ? 21:58:03.222 [ForkJoinPool-1-worker-3] DEBUG c.AddTask2 - join() 4 + 5 = 9 21:58:03.226 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - join() 3 21:58:03.226 [ForkJoinPool-1-worker-2] DEBUG c.AddTask2 - join() {1,2} + {3,3} = 6 21:58:03.226 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - join() {1,3} + {4,5} = 15 15 ``` *附参考文章链接* *https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ForkJoinPool.html*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2602.html
上一篇
48.Tomcat线程池简单介绍
下一篇
50.AQS实现原理介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Elastisearch
散列
Stream流
Filter
随笔
JavaScript
微服务
递归
序列化和反序列化
JavaWeb
Java
栈
CentOS
Redis
Spring
Kibana
HDFS
ClickHouse
Hadoop
正则表达式
Scala
JVM
JavaSE
Nacos
MySQL
排序
Golang基础
JavaWEB项目搭建
nginx
持有对象
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞