李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
42.并发编程之自定义线程池
Leefs
2022-11-16 PM
526℃
0条
[TOC] ### 线程池模型架构 + 自定义线程池包括:Thread Pool(线程池)+ Blocking Queue(阻塞队列) ![42.并发编程之自定义线程池01.jpg](https://lilinchao.com/usr/uploads/2022/11/2985369599.jpg) **图例分析** 图中内容表示,三个消费线程或者说是核心线程 `t1`、`t2`、`t3` 通过poll方法从阻塞队列中执行任务,main线程不断地往阻塞队列中put任务task,如果核心线程处于忙碌状态,task就放进阻塞队列中。 ### 实现步骤 **步骤1:自定义拒绝策略接口** ```java @FunctionalInterface // 拒绝策略 interface RejectPolicy
{ void reject(BlockingQueue
queue, T task); } ``` **作用** 当核心线程都被占用,并且阻塞队列中的任务也满时,就会触发拒绝策略。 简单理解就是,任务实在太多了,分配给线程池的所有线程都参与进来处理任务,但还是处理不过来,同时存放任务的缓存队列空间也满了,剩下来的任务该如何进行处理呢,这个时候就涉及到拒绝策略。 **拒绝策略的方式**: + 死等 + 带超时等待 + 让调用者放弃任务执行 + 让调用者抛出异常 + 让调用者自己执行任务等等 **步骤2:自定义任务队列** ```java //自定义任务队列 @Slf4j(topic = "c.BlockingQueue") class BlockingQueue
{ // 1.任务队列 private Deque
queue = new ArrayDeque<>(); // 2.锁 (防止多个线程获取同一个任务) private ReentrantLock lock = new ReentrantLock(); // 3.生产者条件变量 (当阻塞队列满了以后,生产者线程等待) private Condition fullWaitSet = lock.newCondition(); // 4.消费者条件变量 (当阻塞队列为空以后,消费者线程等待) private Condition emptyWaitSet = lock.newCondition(); // 5.容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } // 带超时阻塞获取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try { // 将timeout统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { // 返回值是剩余时间 if (nanos <= 0){ return null; } //返回值是:等待时间-经过的时间 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } //移除队列中的第一个元素 T t = queue.removeFirst(); //唤醒生产者线程继续生产 fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T task() { lock.lock(); try { while (queue.isEmpty()) { try { //当任务队列为空,消费者就没有任务可以消费,那么就进入等待的状态 emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //此时任务队列不为空,取出任务队列当中队头的任务返回 T t = queue.removeFirst(); //当从任务队列当中取出一个任务的时候,任务队列就有空位了,就可以唤醒因为队列满了而等待的生产者 fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞添加 public void put(T task) { lock.lock(); try { // 队列已满 while (queue.size() == capcity) { try { log.debug("等待加入任务队列{}...",task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列{}",task); //当有空位的时候,将新的任务放到队列的尾部 queue.addLast(task); //添加完新的元素之后,需要唤醒等待当中的消费者队列,因为有新的任务进队列 emptyWaitSet.signal(); } finally { lock.unlock(); } } // 带超时时间阻塞添加 public boolean offer(T task,long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0) { return false; } log.debug("等待加入任务队列 {} ...", task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列{}",task); // 向队列中添加元素 queue.addLast(task); // 唤醒消费者线程 emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } // 获取阻塞队列的大小 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } // 带自定义拒绝策略的的添加 public void tryPut(RejectPolicy
rejectPolicy,T task) { lock.lock(); try { // 判断队列是否满 if(queue.size() == capcity) { // 执行拒绝策略的方法 rejectPolicy.reject(this,task); } else { // 有空闲 log.debug("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } ``` **说明** + **queue(队列)**:生产者创建的任务都放在queue中 ,`Deque`是一个双向链表,比`LinkedList`效率要高,当然这里也可以用`LinkedList`。 + **lock(锁)**:为了防止多个线程获取同一个任务,本次使用的是`ReentrantLock` 锁,目的是因为它可以提供两个条件变量(集合)`fullWaitSet` 和`emptyWaitSet` 。 + **fullWaitSet(生产者条件变量)**:当任务队列queue满的时候,生产者线程将要进入`fullWaitSet` 阻塞状态,不能再进行生产。 + **emptyWaitSet(消费者条件变量)**:当queue为空的时候,消费者线程(t1、t2、t3)就无任务可以进行消费任务,同理也应该阻塞,进入`emptyWaitSet` 。 + **capcity(任务容量)**:初始化创建任务队列的容量,与上面两个不一样,这个是放任务的,上面两个是放线程的。 + **BlockingQueue(int capcity)**:构造方法,用来初始化任务队列的容量。 + **poll(long timeout, TimeUnit unit)**:核心线程带有超时的获取任务的方法 如果任务是空的就阻塞,而且是带有超时的阻塞,如果获取任务成功,说明queue获取后就不是满的状态了,所以应该唤醒`fullWaitSet` 中阻塞的生产者线程,让生产者线程继续生产任务。 + **offer(T task, long timeout, TimeUnit timeUnit)** :生产者线程用于向队列queue中添加任务的方法 这个方法也是带有阻塞,如果queue是满的,就不应该添加任务,main线程就应该阻塞,这里也是使用了超时阻塞,原因和上面一样,不想让它阻塞太久,任务添加不进就不添加,一直阻塞势必消耗CPU资源。 + **size()**:获取当前任务的数量 + **tryPut(RejectPolicy rejectPolicy, T task)**:调用生产者提供的拒绝策略,它的调用时机是,核心线程用完了(t1、t2、t3都忙),如果任务队列满了,就执行拒绝策略,如果没满就放任务队列中。 **步骤3:自定义线程池** ```java @Slf4j(topic = "c.ThreadPool") class ThreadPool { // 任务队列 private BlockingQueue
taskQueue; // 线程集合 private HashSet
workers = new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务时的超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy
rejectPolicy; // 执行任务 public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); // 交给工作线程 log.debug("新增 worker{}, {}", worker, task); workers.add(worker); // 将工作线程交给工作线程队列 worker.start(); // 启动 } else { // taskQueue.put(task); // 1) 死等 // 2) 带超时等待 // 3) 让调用者放弃任务执行 // 4) 让调用者抛出异常 // 5) 让调用者自己执行任务 //拒绝策略,即到底如何处理多余的任务,交由创建线程池的创建者选择 taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy
rejectPolicy) { this.coreSize = coreSize; //核心线程数 this.timeout = timeout; //获取任务的超时时间 this.timeUnit = timeUnit; //转换时间器 this.taskQueue = new BlockingQueue<>(queueCapcity); //阻塞队列 this.rejectPolicy = rejectPolicy; //拒绝策略,在构建线程池的时候定义 } // 工作线程 class Worker extends Thread{ // 任务 private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 // while(task != null || (task = taskQueue.take()) != null) { while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.debug("正在执行...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } //如果都没有任务了,那么该工作线程被移除 synchronized (workers) { log.debug("worker 被移除{}", this); workers.remove(this); } } } } ``` **说明** - **taskQueue**:任务队列,这个队列中有封装好的取任务和添加任务的方法,以及线程的阻塞队列等属性。 - **workers** :存放工作线程,也就是核心线程。 - **coreSize**:定义核心线程的数量。 - **timeout和timeUnit**:任务的超时时间,下面调用之前方法传参用 - **rejectPolicy**:传参用 - **execute**:线程池向外提供的执行方法 + `ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,` `RejectPolicy rejectPolicy)`:创建线程池的初始化方法 + **class Worker**:线程实体,它的逻辑是先执行当前任务,如果当前任务执行结束后从任务队列中取任务执行。 **步骤4:测试** 按照定义的决策策略分别进行演示。 **(1)死等** > 初始化核心线程数是1,超时取任务的时间是1秒,任务队列容量是1,拒绝策略是死等 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 死等 queue.put(task); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:15:56.232 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:15:56.236 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:15:56.236 [main] DEBUG c.BlockingQueue - 等待加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2... 16:15:56.236 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e ``` **结果分析** - 首先创建了一个核心线程,开始执行第一个任务; - 由于任务执行周期长,核心线程一直处于忙碌状态,因此第二个任务到来时,放入任务队列等待核心线程进入空闲状态; - 当再来第三个任务时,此时任务队列已满,同时核心线程中的第一个任务也未执行完毕,此时主线程就进入到阻塞队列`fullWaitSet`一直死等,等待queue有位置。 **(2)带超时等待** > 每个任务的执行周期是1秒,拒绝策略是main线程等待1.5秒,如果还添加不进去任务,不再进行添加 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 1. 死等 // queue.put(task); // 2. 带超时等待 queue.offer(task, 1500, TimeUnit.MILLISECONDS); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:25:30.918 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:30.922 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:30.922 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:25:30.922 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:31.923 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:25:31.923 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:31.923 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:32.924 [main] DEBUG c.CustomThreadPoolDemo - 2 16:25:32.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:25:32.925 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:33.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 2 16:25:34.927 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` 可以看到三个任务都被执行了。 > 如果将主线程的拒绝策略改成0.5秒呢? ``` 16:28:37.554 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.557 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:37.558 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.558 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:28:38.558 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:28:39.059 [main] DEBUG c.CustomThreadPoolDemo - 2 16:28:39.059 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:40.059 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:28:41.060 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` 可以发现第三个任务没有执行。 **(3)让调用者放弃任务执行** > 直接让main线程打印一下任务,不执行任何添加操作 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 3. 让调用者放弃任务执行 log.debug("放弃{}", task); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:34:53.012 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:53.016 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:53.016 [main] DEBUG c.CustomThreadPoolDemo - 放弃com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:34:53.016 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:54.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:34:54.018 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:55.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:34:56.019 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **(4)让调用者抛出异常** ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 4. 让调用者抛出异常 throw new RuntimeException("任务执行失败 " + task); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:46:30.854 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:30.857 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f Exception in thread "main" java.lang.RuntimeException: 任务执行失败 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.lambda$main$0(CustomThreadPoolDemo.java:29) at com.lilinchao.concurrent.demo_05.BlockingQueue.tryPut(CustomThreadPoolDemo.java:270) at com.lilinchao.concurrent.demo_05.ThreadPool.execute(CustomThreadPoolDemo.java:78) at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.main(CustomThreadPoolDemo.java:35) 16:46:30.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:31.859 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:46:31.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:46:32.860 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:46:33.861 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **结果分析** 从结果可以看出,核心线程执行第一到来的任务,将第二个任务加入到阻塞队列中,当第三个任务再来时,直接抛出异常,同时不再尝试添加之后的任务。 (5)让调用者自己执行任务 ```java @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ // 5. 让调用者自己执行任务 task.run(); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } ``` **运行结果** ``` 16:52:05.328 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:05.331 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:05.332 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:06.332 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:52:06.332 [main] DEBUG c.CustomThreadPoolDemo - 2 16:52:07.333 [main] DEBUG c.CustomThreadPoolDemo - 3 16:52:07.333 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:08.333 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:52:09.334 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main] ``` **结果分析** 从结果可以看出,后面再来的两个线程直接由主线程进行执行。
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2580.html
上一篇
41.并发编程之final详解
下一篇
43.ThreadPoolExecutor线程池状态和构造方法
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Spring
Spark
设计模式
Elastisearch
JavaWeb
Shiro
Redis
Map
Git
Elasticsearch
Typora
Beego
Jenkins
NIO
Golang基础
GET和POST
稀疏数组
Hadoop
递归
哈希表
Filter
CentOS
Java阻塞队列
Http
Tomcat
查找
Linux
BurpSuite
二叉树
Spark Streaming
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞