Skip to content

ThreadPool线程池

1. 线程池简介

它的主要特点为:

  • 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2. 线程池参数说明

2.1 常用参数

  • corePoolSize 线程池的核心线程数
  • maximumPoolSize 能容纳的最大线程数
  • keepAliveTime 空闲线程存活时间
  • unit 存活的时间单位
  • workQueue 存放提交但未执行任务的队列
  • threadFactory 创建线程的工厂类
  • handler 等待队列满后的拒绝策略

2.2 拒绝策略

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize-核心线程数,也即最小的线程数。workQueue-阻塞队列。
maximumPoolSize-最大线程数当提交任务数大于corePoolSize的时候,会优先将任务放到workQueue阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize最大线程数配置。此时再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size()+maximumPoolSize),就会触发线程池的拒绝策略,拒绝策略如下:

  • CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。
  • AbortPolicy: 丢弃任务,并抛出拒绝执行RejectedExecutionException异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
  • DiscardPolicy: 直接丢弃,其他啥都没有。
  • DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列workQueue中最老的一个任务,并将新任务加入。

3. 线程池的创建

参考阿里开发手册的话,需要摒弃通过Executors创建线程池的方式,而是使用ThreadPoolExecutor的构造函数。 Alt text

java
public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,   // 核心线程数目
            3,  // 最大线程数量
            10L, // 存活时间 15秒
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(3),   // 阻塞等待队列
            Executors.defaultThreadFactory(),   // 创建线程工厂
            new ThreadPoolExecutor.DiscardOldestPolicy());   // 拒绝策略
    System.out.println("请求前-----当前线程池中线程总数目: "+ executor.getPoolSize());
    for (int i = 0; i < 6; i++) {
        int finalI = i + 1;
        executor.execute(() -> {
            System.out.println("客户" + finalI + "来办理业务, " + Thread.currentThread().getName() + "处理请求");
        });
    }
    System.out.println("请求中------当前线程池中线程总数目: "+ executor.getPoolSize());
    TimeUnit.SECONDS.sleep(10);
    System.out.println("当前线程池中线程总数目: "+ executor.getPoolSize());
    executor.shutdown();
    System.out.println("结束-----当前线程池中线程总数目: "+ executor.getPoolSize());
}

运行结果:
Alt text 可以看出线程是在execute()方法执行才创建线程的,shutdown()方法会把所有线程在线程池中清理掉。

4. 线程池底层工作原理

Alt text

  1. 在创建了线程池后,线程池中的线程数为
  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
    • 如果正在运行的线程数量<corePoolSize,那么马上创建线程运行这个任务(比如银行去办理业务,窗口还有3个可以直接办);
    • 如果正在运行的线程数量≥corePoolSize,那么将这个任务放入阻塞队列(窗口满了,在银行的板凳坐着等);
    • 如果这个时候队列满了且正在运行的线程数量<maximumPoolSize, 那么会创建非核心线程立刻运行这个任务(银行窗口和等候的板凳都满了,额外的工作人员拉你去机器上处理)
    • 如果队列满了且正在运行的线程数量≥maximumPoolSize,那么线程池会启动饱和拒绝策略来执行(银行处于饱和状态,银行工作人员建议你去其他分行办)。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
    • 如果当前运行的线程数>corePoolSize,那么这个线程就被停掉。
    • 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

5. 注意事项

  1. 项目中创建多线程时,实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池。
  2. 创建线程池推荐适用ThreadPoolExecutor及其7个参数手动创建:
    • corePoolSize 线程池的核心线程数
    • maximumPoolSize 能容纳的最大线程数
    • keepAliveTime 空闲线程存活时间
    • unit 存活的时间单位
    • workQueue 存放提交但未执行任务的队列
    • threadFactory 创建线程的工厂类
    • handler 等待队列满后的拒绝策略

6. 线程池优化

如果阻塞队列满了,会进行拒绝策略执行,如果不想请求不被拒绝,常见的解决办法有:

6.1 调整阻塞队列参数

把阻塞队列设置很大,甚至使用无界队列(不推荐,毕竟服务器资源是有限的,有造成OOM的风险)

6.2 调整拒绝策略

设置拒绝策略为CallerRunsPolicy,使得任务交给调用方去执行,从而避免任务被丢弃。但后果就是导致业务线程被拖慢,没有办法去及时处理请求。所以这种拒绝策略只适用于低并发又不想丢弃任务的场景。

6.3 任务持久化

  1. 方法1:
    通过自定义拒绝策略,把需要拒绝的任务信息放到MySQL、Redis或者Kafka中,然后重新线程池的afterExecute()方法,当线程池执行完一个任务的时候,判断阻塞队列有任务不多以及空闲的线程比较多,就可以读取MySQL、Redis或者Kafka中的任务信息,然后放入线程池中的阻塞队列中再进行处理。
  2. 方法2:
    通过自定义阻塞队列,当线程从阻塞队列中取任务的时候,判断阻塞队列中任务量的多少,如果阻塞队列中任务比较少,那就从MYSQL/REDIS/MQ中去读数据,然后放入线程池中的阻塞队列中再进行处理。

7. 线程池动态调整

7.1 使用get/set方法

线程池ThreadPool提供了get/set方法,方便我们进行监控并动态调整(就是不重启机器)线程池参数:
Alt text

7.2 使用配置中心

把线程池的参数配置到配置中心,比如Appllo\Nacos, 然后让线程池去监听配置的变化,就能够通过set方法动态的调整线程池参数。
如果需要动态调整阻塞队列的大小,队列大小被final修饰,就需要重写一个阻塞队列。
Alt text

8. 线程池监控

8.1 监控的具体指标

  1. 线程维度:核心线程数、最大线程数、活跃线程数、历史最大线程数等
  2. 任务维度:总任务数、排队任务数、已完成任务数、拒绝任务数等
  3. 性能维度:任务平均耗时、线程空闲(保活)时间、线程池负载率等

ThreadPoolExecutor类中提供了获取这些指标数据的API:

  1. poo1.getcorePoosize()- 核心线程数
  2. poo1.getMaximumPoolsize()-最大线程数
  3. poo1.getActivecount()- 活跃线程数(正在执行的线程数)
  4. poo1.getQueue().size()-队列中等待的任务数
  5. pool.getcompletedTaskcount()-已完成任务数
  6. pool.getTaskCount()-总任务数
  7. pool.getLargestPoolSize()-运行的最大任务数

8.2 实时采集方式

  1. 自定义带监视功能的线程池类(继承ThreadPoolExecutor)
  2. 监控位埋点(任务执行前、任务执行后、线程池终止后)

注意

实时采集是通过在线程池预先"埋点",对每个运行的任务实时采集统计,对性能有影响

java
public class MonitorThreadPoolExecutor extends ThreadPoolExecutor {
    public MonitorThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }
    @Override
    protected void terminated() {
        super.terminated();
    }
}

8.3 定时采集

SpringBoot项目为演示案例:
2. CommandLineRunner, ApplicationRunner两个接口在SpringBoot应用启动后自动执行,通过实现他们的run()方法可以完成初始化逻辑、提交后台任务等。
3. CommandLineRunner -run- 循环不断的往线程池中提交任务(间隔1s/次) 4. ApplicationRunner -run- 定时采集线程池的指标数据(首次间隔5s、间隔10s/次)

8.4 总结

没有监控的线程池相当于没有消防报警的大厦;
埋点实时监控,迅速反映但对性能有影响; 定时监控,数据有延迟但降低了对性能的影响。
完善的监控体系:高峰期扩容、低谷期节能地健康运行(削峰填谷)。