李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
45.ThreadPoolExecutor线程池提交和关闭方法介绍
Leefs
2022-11-20 PM
717℃
0条
[TOC] ### 一、线程池提交任务方法 线程的各种提交方式的概念: ```java // 执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果,会有返回值
Future
submit(Callable
task); // 提交 tasks 中所有任务
List
> invokeAll(Collection extends Callable
> tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间,时间超时后,会放弃执行后面的任务
List
> invokeAll(Collection extends Callable
> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
T invokeAny(Collection extends Callable
> tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
T invokeAny(Collection extends Callable
> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; ``` #### submit方法 > submit方法与execute方法的区别在于: > > + **execute方法**:接收的参数是Runnable类型的参数没有返回值 > + **submit方法**:接收的是Callable类型的参数有返回值且返回值用`Future<>`接收 **示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description submit方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo01") public class ThreadPoolDemo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建线程池 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); //调用submit提交任务 Future
future = threadpool.submit(new Callable
() { @Override public String call() throws Exception { log.debug("running"); Thread.sleep(1000); return "ok"; } }); //获取线程的返回结果 log.debug("{}",future.get()); } } ``` **运行结果** ``` 17:03:36.791 [pool-1-thread-1] DEBUG c.ThreadPoolDemo01 - running 17:03:37.796 [main] DEBUG c.ThreadPoolDemo01 - ok ``` **说明** 从代码中我们看到submit()提交任务中实现了Callable接口,并在睡眠了1s后返回字符串“ok”,最后在主线程中调用get()方法,打印返回值到控制台。 #### invokeAll方法 > invokeAll方法接收的是一个任务集合且有返回值,线程池中的线程执行这个任务集合。 > > **等待所有的任务执行完成后统一返回。** **示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description invokeAll方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo02") public class ThreadPoolDemo02 { public static void main(String[] args) throws InterruptedException { //创建线程池 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); //返回的是一个泛型为Future的集合 List
> futures = threadpool.invokeAll(Arrays.asList( () -> { log.debug("begin1"); Thread.sleep(1000); return "1"; }, () -> { log.debug("begin2"); Thread.sleep(500); return "2"; }, () -> { log.debug("begin3"); Thread.sleep(2000); return "3"; } )); //遍历输出结果 futures.forEach(f -> { try { log.debug("{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } } ``` **运行结果** ``` 17:22:03.437 [pool-1-thread-1] DEBUG c.ThreadPoolDemo02 - begin1 17:22:03.437 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin2 17:22:03.942 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin3 17:22:05.946 [main] DEBUG c.ThreadPoolDemo02 - 1 17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 2 17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 3 ``` **结果分析** 从结果可以看出,任务1与任务2同时被执行,但是因为线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后,线程2执行任务3,中间差了0.5s 是因为任务2执行了0.5s,最终遍历返回结果并打印。 #### invokeAny方法 > invokeAny方法提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消。 **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/20 * Description invokeAny方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo03") public class ThreadPoolDemo03 { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建线程池,核心线程数为3 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); String future = threadpool.invokeAny(Arrays.asList( () -> { log.debug("begin1"); Thread.sleep(1000); log.debug("end1"); return "1"; }, () -> { log.debug("begin2"); Thread.sleep(500); log.debug("end2"); return "2"; }, () -> { log.debug("begin3"); Thread.sleep(2000); log.debug("end3"); return "3"; } )); log.debug("{}",future); } } ``` **运行结果** ``` 17:34:43.143 [pool-1-thread-1] DEBUG c.ThreadPoolDemo03 - begin1 17:34:43.147 [pool-1-thread-3] DEBUG c.ThreadPoolDemo03 - begin3 17:34:43.147 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - begin2 17:34:43.648 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - end2 17:34:43.648 [main] DEBUG c.ThreadPoolDemo03 - 2 ``` **结果分析** 程序中设置三个核心线程数,表示三个任务可以同时运行,不需要加入到任务队列中,从结果可以看出,任务2使用0.5s时间最先执行完,并返回结果,此时整个任务队列停止等待任务1和任务3继续执行,输出结果任务2。 ### 二、关闭线程池 #### shutdown > 线程池状态变为 SHUTDOWN > > - 不会接收新任务 > - 但已提交任务会执行完 > - 此方法不会阻塞调用线程的执行,比如如果主线程此时调用了这个shutDown方法,此时并不会阻塞主线程,如果主线程 ```java void shutdown(); ``` + **源码** ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); } ``` #### shutdownNow > 线程池状态变为 STOP > - 不会接收新任务 > - 会将队列中的任务返回 > - 并用 interrupt 的方式中断正在执行的任务 ```java List
shutdownNow(); ``` + **源码** ```java public List
shutdownNow() { List
tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks; } ``` #### **其他方法**(了解) ```java // 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 // 一般task是Callable类型的时候不用此方法,因为futureTask.get方法自带等待功能。 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; ``` **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description 1.0 */ @Slf4j(topic = "c.ThreadPoolDemo04") public class ThreadPoolDemo04 { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); Future
result1 = threadpool.submit(() -> { log.debug("task 1 running..."); Thread.sleep(1000); log.debug("task 1 finish..."); return 1; }); Future
result2 = threadpool.submit(() -> { log.debug("task 2 running..."); Thread.sleep(1000); log.debug("task 2 finish..."); return 2; }); Future
result3 = threadpool.submit(() -> { log.debug("task 3 running..."); Thread.sleep(1000); log.debug("task 3 finish..."); return 3; }); // shutdown() 部分的代码 log.debug("shutdown"); threadpool.shutdown(); threadpool.submit(()->{ log.debug("task 4 running..."); Thread.sleep(1000); log.debug("task 4 finish"); return "4"; }); threadpool.awaitTermination(3, TimeUnit.SECONDS); log.debug("other..."); // shutdownNow部分代码 // log.debug("shutdownNow"); // List
runnables = threadpool.shutdownNow(); // log.debug("other.... {}" , runnables); } } ``` **shutdown运行结果** ``` 18:03:12.408 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running... 18:03:12.408 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running... 18:03:12.408 [main] DEBUG c.ThreadPoolDemo04 - shutdown 18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 finish... 18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 running... 18:03:13.412 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 finish... 18:03:14.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 finish... 18:03:14.412 [main] DEBUG c.ThreadPoolDemo04 - other... ``` **结果分析** 可以看出在shutdown后依然可以把 shutdown之前的任务运行完毕,但是shutdown之后的任务就没有再运行了。 另外awaitTermination方法的作用是等待shutdown部分的任务运行完后主线程再运行awaitTermination方法之后的代码。 **shutdownNow运行结果** ``` 18:06:29.592 [main] DEBUG c.ThreadPoolDemo04 - shutdownNow 18:06:29.592 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running... 18:06:29.592 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running... 18:06:29.595 [main] DEBUG c.ThreadPoolDemo04 - other.... [java.util.concurrent.FutureTask@6aa8ceb6] ``` **结果分析** 从结果可以看出在shutdownNow之后只有一个任务运行成功了,也就是别的任务都已经被打断了。
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2591.html
上一篇
44.Executors创建线程池方法介绍
下一篇
46.异步模式之工作线程
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
高并发
锁
Spark RDD
BurpSuite
Git
Spark SQL
Stream流
Quartz
Java编程思想
线程池
Hive
Http
RSA加解密
设计模式
前端
Redis
稀疏数组
Livy
SQL练习题
SpringBoot
JavaWEB项目搭建
SpringCloudAlibaba
Flink
Golang基础
Java工具类
Java阻塞队列
递归
Kafka
Elasticsearch
哈希表
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞