线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。
我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。
因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:
线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。
为了实现线程池功能,需要考虑下面几个设计要点:
看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。
现在我们将我们的设计思路转换为代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
/** * <p>自定义任务队列, 用来存放任务 </p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 10:15 * @version: 1.0.0 */ @Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { // 容量 private int capcity; // 双端任务队列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入锁 private ReentrantLock lock = new ReentrantLock(); // 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 生产者条件变量 private Condition emptyWaitSet = lock.newCondition();
public BlockingQueue(int capcity) { this.capcity = capcity; }
// 阻塞的方式添加任务 public void put(T task) { lock.lock(); try { // 通过while的方式 while (deque.size() >= capcity) { log.debug("wait to add queue"); try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } deque.offer(task); log.debug("task add successfully"); emptyWaitSet.signal(); } finally { lock.unlock(); } }
// 阻塞获取任务 public T take() { lock.lock(); try { // 通过while的方式 while (deque.isEmpty()) { try { log.debug("wait to take task"); emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); T task = deque.poll(); log.debug("take task successfully"); // 从队列中获取元素 return task; } finally { lock.unlock(); } } } |
1.定义执行器接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
/** * <p>定义一个执行器的接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 12:31 * @version: 1.0.0 */ public interface Executor {
/** * 提交任务执行 * @param task 任务 */ void execute(Runnable task); } |
2.定义线程池类实现该接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor {
/** * 任务队列 */ private BlockingQueue<Runnable> taskQueue;
/** * 核心工作线程数 */ private int coreSize;
/** * 工作线程集合 */ private Set<Worker> workers = new HashSet<>();
/** * 创建线程池 * @param coreSize 工作线程数量 * @param capcity 阻塞队列容量 */ public ThreadPool(int coreSize, int capcity) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); }
/** * 提交任务执行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作线程数小于阈值,直接开始任务执行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超过了阈值,加入到队列中 taskQueue.put(task); } } }
/** * 工作线程,对执行的任务做了一层包装处理 */ class Worker extends Thread { private Runnable task;
public Worker(Runnable task) { this.task = task; }
@Override public void run() { // 如果任务不为空,或者可以从队列中获取任务 while (task != null || (task = taskQueue.take()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 执行完后,设置任务为空 task = null; } }
// 移除工作线程 synchronized (workers){ log.debug("remove worker successfully"); workers.remove(this); } } } } |
3.演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Test public void testThreadPool1() throws InterruptedException { Executor executor = new ThreadPool(2, 4); // 提交任务 for (int i = 0; i < 6; i++) { final int j = i; executor.execute(() -> { try { Thread.sleep(10); log.info("run task {}", j); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(10); }
Thread.sleep(10000); } |
运行结果:
目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
@Slf4j(topic = "c.TimeoutBlockingQueue") public class TimeoutBlockingQueue<T> { // 容量 private int capcity; // 双端任务队列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入锁 private ReentrantLock lock = new ReentrantLock(); // 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 生产者条件变量 private Condition emptyWaitSet = lock.newCondition();
public TimeoutBlockingQueue(int capcity) { this.capcity = capcity; }
// 带超时时间的获取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (deque.isEmpty()){ try { if (nanos<=0){ return null; } // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return deque.getFirst(); }finally { lock.unlock(); } }
// 带超时时间的增加 public boolean offer(T task , long timeout , TimeUnit unit){ lock.lock(); try{ // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (deque.size() == capcity){ try { if (nanos<=0){ return false; } // 更新剩余需要等待的时间 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); deque.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } } |
新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。
目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。
1.定义策略模式的函数式接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/** * <p>拒绝策略的函数式接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 13:15 * @version: 1.0.0 */ @FunctionalInterface public interface RejectPolicy<T> {
/** * 拒绝策略的接口 * @param queue * @param task */ void reject(BlockingQueue<T> queue, T task); } |
2.添加函数式接口的调用入口
我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
@Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { ........
/** * 尝试添加任务 * @param rejectPolicy * @param task */ public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try{ // 如果队列超过容量 if (deque.size()> capcity){ log.debug("task too much, do reject"); rejectPolicy.reject(this, task); }else { deque.offer(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } } |
3.修改ThreadPool类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { .....
/** * 拒绝策略 */ private RejectPolicy rejectPolicy;
// 通过构造方法传入执行的拒绝策略 public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); this.rejectPolicy = rejectPolicy; }
/** * 提交任务执行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作线程数小于阈值,直接开始任务执行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超过了阈值,加入到队列中 //taskQueue.put(task);
// 调用tryPut的方式 taskQueue.tryPut(rejectPolicy, task); } } }
.... } |
通过构造方法的方式传入要执行的拒绝策略
调用tryPut方法添加任务
4.演示