ThreadPool 线程池
1. 线程池简介
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
- 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
- Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
2. 线程池参数说明
2.1 常用参数
corePoolSize
线程池的核心线程数maximumPoolSize
能容纳的最大线程数keepAliveTime
空闲线程存活时间unit
存活的时间单位workQueue
存放提交但未执行任务的队列threadFactory
创建线程的工厂类handler
等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize
-核心线程数,也即最小的线程数。workQueue
-阻塞队列。maximumPoolSize
-最大线程数当提交任务数大于corePoolSize
的时候,会优先将任务放到workQueue阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize
最大线程数配置。此时再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size()+maximumPoolSize),就会触发线程池的拒绝策略。
2.2 拒绝策略
- CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。
- AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
- DiscardPolicy: 直接丢弃,其他啥都没有。
- DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue中最老的一个任务,并将新任务加入。
3. 线程池的种类与创建
3.1 newCachedThreadPool
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
特点:
- 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
- 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
- 当线程池中,没有可用线程,会重新创建一个线程
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景。
System.out.println("一池可扩容线程-------------------------");
ExecutorService executorService3 = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int finalI = i + 1;
executorService3.execute(() -> {
System.out.println("客户" + finalI + "来办理业务, "+Thread.currentThread().getName()+"处理请求");
});
}
executorService3.shutdown();
运行结果:
3.2 newFixedThreadPool
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
特征:
- 线程池中的线程处于一定的量,可以很好的控制线程的并发量。
- 线程可以重复被使用,在显示关闭之前,都将一直存在。
- 超出一定量的线程被提交时候需在队列中等待。
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景。
System.out.println("一池N线程-------------------------");
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
int finalI = i + 1;
executorService1.execute(() -> {
System.out.println("客户" + finalI + "来办理业务, "+Thread.currentThread().getName()+"处理请求");
});
}
executorService1.shutdown();
运行结果:
3.3 newSingleThreadExecutor
作用:创建一个使用单个worker线程的Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。 特征: 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行。
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景。
System.out.println("一池一线程-------------------------");
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
int finalI = i + 1;
executorService2.execute(() -> {
System.out.println("客户" + finalI + "来办理业务, "+Thread.currentThread().getName()+"处理请求");
});
}
executorService2.shutdown();
运行结果:
3.4 newScheduleThreadPool
作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池。
特征:
- 线程池中具有指定数量的线程,即便是空线程也将保留
- 可定时或者延迟执行线程活动
System.out.println("一池调度线程-------------------------");
ScheduledExecutorService executorService4 = Executors.newScheduledThreadPool(2);
executorService4.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName()+"处理请求: "+ LocalDateTime.now());
}, 5, 5, TimeUnit.SECONDS);
executorService4.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName()+"处理请求: "+ LocalDateTime.now());
}, 5, 5, TimeUnit.SECONDS);
//executorService4.shutdown();
运行结果:
3.5 newWorkStealingPool
jdk1.8提供的线程池,底层使用的是ForkJoinPool实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu核数的线程来并行执行任务。
场景: 适用于大耗时,可并行执行的场景。
4. 线程池底层工作原理
- 在创建了线程池后,线程池中的线程数为零。
- 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务(比如银行去办理业务,窗口还有3个可以直接办);
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入阻塞队列(窗口满了,在银行的板凳坐着等);
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize, 那么会创建非核心线程立刻运行这个任务(银行窗口和等候的板凳都满了,额外的工作人员拉你去机器上处理);
- 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行(银行处于饱和状态,银行工作人员建议你去其他分行办)。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
5. 注意事项
- 项目中创建多线程时,不会使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool和SingleThreadExecutor底层都是用 LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM。所以实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池。
- 创建线程池推荐适用ThreadPoolExecutor及其7个参数手动创建
corePoolSize
线程池的核心线程数maximumPoolSize
能容纳的最大线程数keepAliveTime
空闲线程存活时间unit
存活的时间单位workQueue
存放提交但未执行任务的队列threadFactory
创建线程的工厂类handler
等待队列满后的拒绝策略
- 为什么不允许使用Executors的方式手动创建线程池,如下图
自定义线程池代码:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数目
3, // 最大线程数量
10L, // 存活时间 15秒
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), // 阻塞等待队列
Executors.defaultThreadFactory(), // 创建线程工厂
new ThreadPoolExecutor.DiscardOldestPolicy()); // 拒绝策略
System.out.println("请求前-----当前线程池中线程总数目: "+ executor.getPoolSize());
for (int i = 0; i < 6; i++) {
int finalI = i + 1;
executor.execute(() -> {
System.out.println("客户" + finalI + "来办理业务, " + Thread.currentThread().getName() + "处理请求");
});
}
System.out.println("请求中------当前线程池中线程总数目: "+ executor.getPoolSize());
TimeUnit.SECONDS.sleep(10);
System.out.println("当前线程池中线程总数目: "+ executor.getPoolSize());
executor.shutdown();
System.out.println("结束-----当前线程池中线程总数目: "+ executor.getPoolSize());
}
运行结果: 可以看出线程是在
execute()
方法执行才创建线程的,shutdown()
方法会把所有线程在线程池中清理掉。