高并发专题 - 线程池
在当今计算机的CPU计算速度非常快的情况下,为了能够充分利用CPU性能提高程序运行效率我们在程序中使用了线程。但是在高并发情况下会频繁的创建和销毁线程,这样就变相的阻碍了程序的执行速度,所以为了管理线程资源和减少线程创建以及销毁的性能消耗就引入了线程池。
# 1. 什么场景下适合使用线程池
当服务器接收到大量任务时,如果使用线程池可以大量减少线程的创建与销毁次数,从而提升程序执行效率。
在实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。
# 2. 线程池参数介绍以及特点
参数名 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 保持存活时间 |
workQueue | BlockingQueue | 任务存储队列 |
threadFactory | ThreadFactory | 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程 |
Handler | RejectedExecutionHandler | 由于线程池无法接收你所提交的的任务时的拒绝策略 |
# 2.1 corePoolSize 和 maxPoolSize
corePoolSize:线程池在创建完时,里面并没有线程,只有当任务到来时再去创建线程。
maxPoolSize:线程池可能会在核心线程数的基础上额外增加一些线程,但是线程数量的上限是maxPoolSize。
# 2.1.1 添加线程的规则
当线程数量小于 corePoolSize 即使线程没有在执行任务,也会创建新的线程。
如果线程数量等于(或大于)corePoolSize,但小于 maxPoolSize 则将任务放入队列。
如果队列已满,并且线程数小于 maxPoolSize,则创建新的线程运行任务。
如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
# 2.1.2 增减线程的特点
- 将 corePoolSize 和 maxPoolSize 设置为相同的值,那么就会创建固定大小的线程池。
- 线程池希望保持更少的线程数,并且只有在负载变得很大时才会增加它。
- 如果将线程池的maxPoolSize参数设置为很大的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
- 只有在队列满了的时候才会去创建大于 corePoolSize 的线程,所以如果使用了无界队列(如:LinkedBlockingQueue)就不会创建到超过 corePoolSize 的线程数。
# 2.2 keepAliveTime
如果线程池当前的线程数大于 corePoolSize,那么如果多余的线程的空闲时间大于 keepAliveTime,它们就会被终止。
keepAliveTime 参数的使用可以减少线程数过多,冗余时的资源消耗。
# 2.3 threadFactory
新的线程由 ThreadFactory 创建,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。通常情况下直接使用 defaultThreadFactory 就行。
# 2.4 workQueue
直接交接(SynchronousQueue):任务不多时,只需要用队列进行简单的任务中转,这种队列无法存储任务,在使用这种队列时,需要将 maxPoolSize 设置的大一点。
无界队列(LinkedBlockingQueue):如果使用无界队列当作 workQueue,将 maxQueue 设置的多大都没有用,使用无界队列的优点是可以防止流量突增,缺点是如果处理任务的速度跟不上提交任务的速度,这样就会导致无界队列中的任务越来越多,从而导致OOM异常。
有界队列(ArrayBlockingQueue):使用有界队列可以设置队列大小,让线程池的 maxPoolSize 有意义。
# 3. 线程池应该手动创建还是自动创建
手动创建更好,因为这样可以让我们更加了解线程池的运行规则,避免资源耗尽的风险。
# 3.1 直接调用JDK封装好的线程池会带来的问题
# 3.1.1 newFixedThreadPool
// 新建线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(new CustomThread());
// newFixedThreadPool方法源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnabl>());
}
2
3
4
5
6
7
8
newFixedThreadPool 线程池通过传入相同的 corePoolSize 和 maxPoolSize 可以保证线程数量固定,0L 的 keepAliveTime 表示时刻被销毁,workQueue 使用的是无界队列。
这样潜在的问题就是当处理任务的速度赶不上任务提交的速度的时候,就可能会让大量任务堆积在workQueue中,从而引发OOM异常。
# 3.1.2 newSingleThreadExecutor
// 新建线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Task());
// newSingleThreadExecutor 方法源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable()));
}
2
3
4
5
6
7
8
9
从源码可以看出 newSingleThreadExecutor 和 newFixedThreadPool 基本类似,不同的只是 corePoolSize 和 maxPoolSize 的值,所以 newSingleThreadExecutor 也存在内存溢出问题。
# 3.1.3 newCachedThreadPool
// 新建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Task());
// newCachedThreadPool 方法源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
2
3
4
5
6
7
8
9
newCachedThreadPool也被称为可缓存线程池,它是一个无界线程池,具有自动回收多余线程的功能。
newCachedThreadPool的maxPoolSize设置的值为Integer.MAX_VALUE,所以可能会导致线程被无限创建,最终导致OOM异常。
# 3.1.4 newScheduledThreadPool
// 新建线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
// newScheduledThreadPoo 方法源码
public static ScheduledExecutorService newScheduledThreadPoo(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2
3
4
5
6
7
8
9
10
11
12
13
该线程池支持周期性任务的执行
# 3.2 线程池里的线程数量设置多少比较合适
CPU密集型(加密、计算hash等):最佳线程数设置为CPU核心数的1——2倍。
耗时I/O型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM监控显示繁忙情况为依据,保证线程空闲可以衔接上。参考Brain Goezt推荐的计算方法:线程数 = CPU核心数 × (1 + 平均等待时间 / 平均工作时间)
# 3.3 对比各线程池的特点
- FixedThreadPool:通过手动传入 corePoolSize 和 maxPoolSize ,以固定的线程数来执行任务
- SingleThreadExecutor:corePoolSize 和 maxPoolSize 默认都是1,全程只以1条线程执行任务
- CachedThreadPool:它没有需要维护的核心线程数,每当需要线程的时候就进行创建,因为它的线程存活时间是60秒,所以它也凭借着这个参数实现了自动回收的功能。
- ScheduledThreadPool:这个线程池可以执行定时任务,corePoolSize 是通过手动传入的,它的 maxPoolSize 为 Integer.MAX_VALUE,并且具有自动回收线程的功能。
# 3.3.1 为什么 FixedThreadPool 和 SingleThreadExecutor 的 Queue 是 LinkedBlockingQueue?
因为这两个线程池的核心线程数和最大线程数都是相同的,也就无法预估任务量,所以需要在自身进行改进,就使用了无界队列。
# 3.3.2 为什么 CachedThreadPool 使用的 Queue 是 SynchronousQueue?
因为缓存线程池的最大线程数是“无上限”的,每当任务来的时候直接创建线程进行执行就好了,所以不需要使用队列来存储任务。这样避免了使用队列进行任务的中转,提高了执行效率。
# 3.3.3 为什么 ScheduledThreadPool 使用延迟队列 DelayedWorkQueue?
因为 ScheduledThreadPool 是延迟任务线程池,所以使用延迟队列有利于对执行任务的时间做延迟。
# 3.3.4 JDK1.8中加入的workStealingPool
workStealingPool适用于执行产生子任务的环境,例如进行二叉树的遍历。
workStealingPool具有窃取能力。
使用时最好不要加锁,而且不保证执行顺序。
# 3. 停止线程池的正确方法
shutdown:调用了shutdown()方法不一定会立即停止,这个方法仅仅是初始整个关闭过程。因为线程池中的线程有可能正在运行,并且队列中也有待处理的任务,不可能说停就停。所以每当调用该方法时,线程池会把正在执行的任务和队列中等待的任务都执行完毕再关闭,并且在此期间如果接收到新的任务会被拒绝。
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.shutdown();
// isShutdown:可以用于判断线程池是否被shutdown了
executorService.isShutdown();
// isTerminated:可以判断线程是否被完全终止了
executorService.isTerminated();
// awaitTermination:传入等待时间,等待时间达到时判断是否停止了,主要用于检测。
executorService.awaitTermination(3L, TimeUnit.SECONDS);
// shutdownNow:调用了这个方法时,线程池会立即终止,并返回没有被处理完的任务。如果需要继续执行这里的任务可以再次让线程池执行这些返回的任务。
executorService.shutdownNow();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 4. 任务太多,怎么拒绝
# 4.1 拒绝的时机
当Executor关闭时,新提交的任务会被拒绝。
以及Executor对最大线程数和工作队列容量使用有限边界并且已经饱和时。
# 4.2 拒绝策略
- AbortPolicy(中断策略):直接抛出异常进行拒绝
- DiscardPolicy(丢弃策略):不会得到通知,默默的抛弃掉任务
- DiscardOldestPolicy(丢弃最老的):由于队列中存储了很多任务,这个策略会丢弃在队列中存在时间最久的任务。
- CallerRunsPolicy:比如主线程给线程池提交任务,但是线程池已经满了,在这种策略下会让提交任务的线程去执行。
总结:第四种拒绝策略相对于前三种更加“机智”一些,可以避免前面三种策略产生的损失。在第四种策略下可以降低提交的速度,达到负反馈的效果。
# 5. 线程池实现原理
# 5.1 线程池组成部分
- 线程池管理器
- 工作线程
- 任务队列
- 任务
# 5.2 线程池实现任务复用的原理
总结:核心原理就是获取到task,如果task不为空就调用run()方法,这样就实现了线程的复用,达到让相同的线程执行不同任务的目的。
# 5.3 线程池状态
- RUNNING:接受新任务并处理排队任务
- SHUTDOWN:不接受新的任务但是处理排队任务
- STOP:不接受新的任务,也不处理排队的任务,并中断正在执行的任务,就是调 shutdownNow() 带来的效果
- TIDYING:中文意思是整洁,意思就是说任务都已经终止,workerCount 为零时线程会转换到 TIDYING 状态,并将运行 terminate() 钩子方法
- TERMINATED:terminate() 运行完成
# 5.4 使用线程池的注意点
- 避免任务的堆积(堆积容易产生内存溢出)
- 避免线程数过多增加(缓存线程池会导致线程数过度增加)
- 排查线程泄漏(线程已经执行完毕却无法被回收)