1、 线程池-高级篇

概述 :

提到池,大家应该能想到的就是水池。水池就是一个容器,在该容器中存储了很多的水。那么什么是线程池呢?线程池也是可以看做成一个池子,在该池子中存储很多个线程。

线程池存在的意义:

系统创建一个线程的成本是比较高的,因为它涉及到与操作系统交互,当程序中需要创建大量生存期很短暂的线程时,频繁的创建和销毁线程对系统的资源消耗有可能大于业务处理是对系

统资源的消耗,这样就有点”舍本逐末”了。针对这一种情况,为了提高性能,我们就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就

会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。

线程池的设计思路 :

1.1.线程的使用 我们知道实现多线程的方法有两种:

1,继承Thread类;

2,实现Runnable接口;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
 public static void main(String[] args) {

//继承Thread
MyThread myThread1 = new MyThread();
myThread1.setName("myThread1_");
myThread1.start();

MyThread myThread2 = new MyThread();
myThread2.setName("myThread2_");
myThread2.start();

//实现Runnable
MyRunnable myRunnable1 = new MyRunnable();
Thread thread1 = new Thread(myRunnable1);
thread1.setName("myRunnable1_");
thread1.start();

Thread thread2 = new Thread(myRunnable1);
thread2.setName("myRunnable2_");
thread2.start();
System.out.println(Thread.currentThread().getName()+"_"+System.currentTimeMillis());
}
}
class MyThread extends Thread{

@Override
public void run() {
System.out.println(Thread.currentThread().getName()+System.currentTimeMillis());
}
}


class MyRunnable implements Runnable{

@Override
public void run() {
System.out.println(Thread.currentThread().getName()+System.currentTimeMillis());

}

}

实现多线程的这两种方式之间有什么区别?

1:继承Tread类,实例化一个线程时,调用start()也只能启动一个线程。实例化多个线程实例,每个实例调用 start()可以启动多个线程,但线程中的资源也是多份的。

2:实现Runnable接口只需要实例化一个线程类就可以创建多个线程,并且多个线程共享同一份资源

3:继承了Tread类后不能同时继承其它类了,而实现了Runnable接口后还可实现其它接口和继承其它类。

4:Thread本身也是Runnable接口的一个实现类。

1.2.java线程池框架Executor

1.2.1.为什么引入线程池框架

​ 1:new Thread()的缺点是耗费性能,调用new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限 制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪,不利于扩展,比如如定时执行、定期执行、线程 中断等

​ 2:采用线程池的优点是:重用存在的线程,减少对象创建、消亡的开销,性能佳,可有效控制最大并发线程 数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞,提供定时执行、定期执行、单线程、并发数控制 等功能

1.2.2.Executor和ExecutorService

1:Executor接口定义为:
1
2
3
4
5
public interface Executor {

void execute(Runnable command);

}

​ 执行已提交的Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用 的细节、调度等)分离开来的方法

2:ExecutorService接口定义为:

​ ExecutorService其实是一个更通用的线程池接口,它可以接收任务,然后根据配置来分配线程,并控制其调 度,可以对线程统一管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface ExecutorService extends Executor{
//关闭线程池,不再接收新的任务,但是会将当前线程池中的所有 任务执行完毕后再关闭
void shutdown();
//立即关闭,不再接收新的任务,抛弃线程池中还未执行的任务并中断所有正在执行的任务,返回等待执行的
任务列表
List<Runnable> shutdownNow();
//判断线程池是否关闭
boolean isShutdown();
//线程池是否终止
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 提交任务,任务是一个实现了Callable的类,该类中返回任务结果,该方法返回Future,可以通过Future
获得任务结果
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
//提交任务,任务是实现了Runnable的类,该类中的run方法是void因此无法返回结果,该方法仍然会返回
Future,但是通过Future得不到结果
Future<?> submit(Runnable task);
// 提交任务集合,返回Future集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) ;
// 提交任务集合,返回其中一个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
}

核心方法说明:

​ execute(Runnable)

execute(Runnable) 方法接受一个java.lang.Runable对象的实例,并异步执行

1
2
3
4
5
6
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
}
);

其中executorService是ExecutorService接口的一个实现类对象,这种方式不能获得Runnable执行的结果,如 果有这种需要,需要要使用Callable

​ 自从Java 1.5开始,就提供了Callable和Future,通过 它们可以在任务执行完毕之后得到任务执行结果。

submit(Callable)

submit(Callable)方法与submit(Runnable)方法相似,除了接收的参数有所不同。Callable实例非常类似于 Runnable,不同的是Callable.call()方法可以返回一个结果,Runnable.run()方法不能返回一个结果

1
2
3
4
5
6
7
8
9
10
11
//Callable的声明
public interface Callable<V> {
V call() throws Exception;
}
Future future = executorService.submit(new Callable<String>(){
public String call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
submit(Runnable)

submit(Runnable) 方法也可以接收一个Runnable接口的具体实现,并返回一个Future对象。Future对象可以 用来检测Runable是否执行完成。

1
2
3
4
5
6
    Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); //如果任务正常完成则该方法返回null

此方法仍然不能获得Runnable的执行结果!

shutdown()/shutdownNow()

​ 使用完ExecutorService后,你应该关闭它,使得线程不能持续运行。例如,你的应用程序从main()方法开始 并且你的主线程退出应用程序,这时如果存在激活状态的ExecutorService,你的应用程序将仍然会保持运行。 ExecutorService中激活的线程会阻止JVM关闭。

​ 为了终止ExecutorService中的线程,你需要调用shutdown()方法。ExecutorService不会立即关闭,但是它也 不会接受新的任务,直到它里面的所有线程都执行完毕,ExecutorService才会关闭。所有提交到ExecutorService 中的任务会在调用shutdown()方法之前被执行。

​ 如果你想立即关闭ExecutorService,你可以调用shutdownNow()方法。这将会尝试立即停止所有正在执行的任 务,并且忽略所有提交的但未被处理的任务。对于正在执行的任务是不能确定的,也许它们停止了,也行它们执行 直到结束

1.2.3.ThreadPoolExecutor及参数

ThreadPoolExecutor是ExecutorService接口的实现,jdk中提供的executor框架结构图如图所示:

image

​ 其中AbstractExecutorService是一个抽象类,它的作用是对ExecutorService接口中执行方法有默认实现, ThreadPoolExecutor继承AbstractExecutorService。

​ 首先来看如何创建一个ThreadPoolExecutor,下面是ThreadPoolExecutor常用的一个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private volatile boolean allowCoreThreadTimeOut;
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//常用的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//核心的构造函数,其他构造函数都是调用该构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}

线程池核心参数介绍:

image

corePoolSize:核心线程数量

​ 1:线程池刚创建时,线程数量为0,当每次执行execute添加新的任务时会在线程池创建一个新的线程,直到 线程数量达到corePoolSize为止。

​ 2:核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程池也 会优先创建新线程处理

​ 3:设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

workQueue:阻塞队列

​ 1:当线程池正在运行的线程数量已经达到corePoolSize,那么再通过execute添加新的任务则会被加 workQueue队列中,在队列中排队等待执行,而不会立即执行。

​ 一般来说,这里的阻塞队列有以下几种选择: ArrayBlockingQueue;

​ LinkedBlockingQueue;

​ SynchronousQueue;

maximumPoolSize:最大线程数

​ 1:当池中的线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务

​ 2:当池中的线程数=maximumPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

keepAliveTime:线程空闲时间

​ 1:当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize

​ 2:如果allowCoreThreadTimeout=true,则会直到线程数量=0

threadFactory:线程工厂,主要用来创建线程 rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务

​ 1:当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务

​ 2:当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown() 和线程池真正shutdown之间提交任务,会拒绝新任务

​ 3:当拒绝处理任务时线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是 AbortPolicy,另外在ThreadPoolExecutor类有几个内部实现类来处理这类情况

​ ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException异常。 ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

​ ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常

​ ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

1.2.4.execute方法的执行流程

​ 提交一个任务到线程池,线程池具体是如何处理的呢?

​ ThreadPoolExecutor中对execute(Runnable)方法就是具体的实现,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

图解如下:

image

​ 对于可以返回任务结果的submit(Callable)方法又是如何实现的呢?

​ submit(Callable)方法是在ThreadPoolExecutor的父类AbstractExecutorService中实现的,

1
2
3
4
5
6
7
8
9
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

​ 可以发现submit底层还是调用的execute,submit方法需要传入的是一个实现了Callable的任务类,

​ 而 execute方法需要传入一个实现了Runnable的类,其中newTaskFor方法就是通过callable构造RunnableFuture的 子类,

其中他们的关系是:

1
2
3
4
5
6
7
8
9
10
11
12
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
...
}

1.2.5.Executors线程池工厂

​ Executors提供了一些创建线程池的工具方法

​ 1:Executors.newSingleThreadExecutor()

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

​ corePoolSize和maximumPoolSize都为1,也就是创建了一个固定大小是1的线程池,workQueue是new LinkedBlockingQueue < Runnable >()是一种无界阻塞队列,队列的大小是Integer.MAX_VALUE,可以认为是队列 的大小不限制。

​ 由此可以得出通过该方法创建的线程池,每次只能同时运行一个线程,当有多个任务同时提交时,那也要一个 一个排队执行

​ 2:Executors.newFixedThreadPool(int nThreads)

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

创建了一个固定大小的线程池,可以指定同时运行的线程数量为nThreads。

​ 3:Executors.newCachedThreadPool()

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

​ 构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执 行;线程空闲超过60s将会销毁

​ 4:Executors.newScheduledThreadPool(int corePoolSize)

1
2
3
4
5
6
7
8
9
10
11
12
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

​ 构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue; maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的

1
2
3
4
5
6
7
知识小贴士:注意:阿里巴巴编程规约中对于并发处理有几条强制要求如下:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方法让写的同学更加
明确线程池的运行规则,规避资源耗尽的风险;对于Executors返回线程池对象的弊端有:
FixedThreadPool和SingleThreadPool允许请求的任务等待队列长度为Integer.MAX_VALUE,可能会堆积大量
的请求任务,从而导致OOM
CachedThreadPool和ScheduledThreadPool允许创建的最大线程数为Integer.MAX_VALUE,可能会创建大量
的线程,从而导致OOM

​ 从阿里的编码规约来看推荐我们自主的创建ThreadPoolExecutor,那对于线程池最核心的几个参数应该如何 选取呢?线程池参数不能胡乱制定否则对服务的性能影响很大,需要根据任务的性质来决定,我们主要参考以下两 个方面

​ 1: I/O密集型:

​ CPU使用率较低,程序中会存在大量I/O操作占据时间,导致线程空余时间出来,线程个 数为CPU核数的两倍。当其中的线程在IO操作的时候,其他线程可以继续用CPU,提高了CPU的利用率

​ 2:CPU密集型:

​ CPU使用率较高(也就是一些复杂运算,逻辑处理),所以线程数一般只需要CPU核数的线程就可以了。 这一 类型的在开发中多出现的一些业务复杂计算和逻辑处理过程中。线程个数为CPU核数。这几个线程可以并行执行, 不存在线程切换到开销,提高了CPU的利用率的同时也减少了切换线程导致的性能损耗

1.3.SpringBoot中线程池的使用

1.3.1.Spring对线程池的封装

​ 多线程并发处理起来通常比较麻烦,如果使用spring容器事情就好办了多了。spring封装了Java的多线程的实 现,我们只需要关注于并发事物的流程以及一些并发负载量等特性。

​ TaskExecutor接口

1
2
3
public interface TaskExecutor extends Executor {
void execute(Runnable task);
}

​ 继承自java.util.concurrent.Executor接口,设计之初是为了其他Spring组件提供一个线程池的抽象,根据线 程池的配置接受一个任务并执行。

TaskExecutor接口的实现类简要说明

1:SimpleAsyncTaskExecutor类

这个实现不重用任何线程,或者说它每次调用都启动一个新线程,性能消耗比较严重。

2:ConcurrentTaskExecutor 类

Java SE 5.0引入了ThreadPoolExecutor、ScheduledThreadPoolExecutor。Spring 2.x借助 ConcurrentTaskExecutor和ThreadPoolTaskExecutor能够通过IoC配置形式自定义它们暴露的各个属性。很少需 要使用ConcurrentTaskExecutor,有另一个备选, ThreadPoolTaskExecutor类

3:ThreadPoolTaskExecutor 类

ThreadPoolTaskExecutor内部对ThreadPoolExecutor进行了包装,同时提供能够通过IOC的形式来配置线程 池的各个参数,比较常用

4:ThreadPoolTaskScheduler类

ThreadPoolTaskScheduler内部对ScheduledThreadPoolExecutor进行了包装,除了能执行异步任务外支持 定时/延迟任务的执行,属于一种高级特性。

1.3.2.ThreadPoolTaskExecutor

​ SpringBoot默认情况下帮我们自动配置了ThreadPoolTaskExecutor到IOC容器中,我们需要的时候直接注入 使用即可,

​ 1:在测试包com.chongba.executor下创建测试类ThreadPoolTaskExecutorTest,添加测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class ThreadPoolTaskExecutorTest {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
public void test(){
//向线程池中提交100个任务
for(int i=0;i<100;i++){
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("ThreadPoolTaskExecutor test
"+Thread.currentThread().getName());
}
});
}
//ThreadPoolTaskExecutor支持对线程池参数的ioc配置
System.out.println("核心线程数:"+threadPoolTaskExecutor.getCorePoolSize());
System.out.println("最大线程数:"+threadPoolTaskExecutor.getMaxPoolSize());
System.out.println("线程等待超时时间:"+threadPoolTaskExecutor.getKeepAliveSeconds());
System.out.println("当前活跃的线程数:"+threadPoolTaskExecutor.getActiveCount());
System.out.println("线程池内线程的名称前
缀:"+threadPoolTaskExecutor.getThreadNamePrefix());
}
}

测试结果:

1
核心线程数:8 最大线程数:2147483647 线程等待超时时间:60 当前活跃的线程数:5 线程池内线程的名称 前缀:task- ThreadPoolTaskExecutor test task-4 ThreadPoolTaskExecutor test task-5 ThreadPoolTaskExecutor test task-7 ThreadPoolTaskExecutor test task-6 ......................................
配置ThreadPoolTaskExecutor参数

​ 如果我们不想要SpringBoot帮我们默认配置的线程池参数,我们可以自行配置,ThreadPoolTaskExecutor支 持对线程池核心参数的重新配置

​ 在启动类ScheduleApplication中,向容器中添加参数重新配置后的ThreadPoolTaskExecutor实例,需要借助 两个注解:

@Configuration

@Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Bean
public ThreadPoolTaskExecutor mythreadpool(){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(5);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//设置线程超时等待时间
threadPool.setKeepAliveSeconds(60);
//设置任务等待队列的大小
threadPool.setQueueCapacity(100);
//设置线程池内线程的名称前缀---阿里编码规约推荐----出错了方便调试
threadPool.setThreadNamePrefix("myThreadPool-");
//设置任务拒绝策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//直接初始化
threadPool.initialize();
return threadPool;
}

回到测试类ThreadPoolTaskExecutorTest中再次测试test方法,运行结果如下

1
核心线程数:5 最大线程数:100 线程等待超时时间:60 当前活跃的线程数:5 线程池内线程的名称前 缀:myThreadPool- ThreadPoolTaskExecutor test myThreadPool-1 ThreadPoolTaskExecutor test myThreadPool-1

1.3.3.SpringBoot中的异步调用

​ SpringBoot对多线程执行异步任务有了更深入的封装,我们只需要关注任务的执行流程和逻辑,不用关心如何 将任务提交给线程池以及任务如何调用,这就需要用到@Async注解

​ 1:定义任务方法

​ 创建包com.chongba.schedule.async,在该包下创建AsyncTask类,注意需要将该类放入容器中,然后在该 类中定义一个方法,在方法上添加@Async注解表明需要异步执行,最后在配置类上开启异步任务。

1
2
3
4
5
6
7
8
@Component
public class AsyncTask {
@Async
public void myAsync(){
System.out.println("spring boot async task test " +
Thread.currentThread().getName());
}
}

​ 2:在启动类上添加@EnableAsync注解

​ 3:回到测试类ThreadPoolTaskExecutorTest中编写测试方法test2,然后测试

1
2
3
4
5
6
7
8
@Autowired
private AsyncTask asyncTask;
@Test
public void test2(){
for(int i=0;i<100;i++){
asyncTask.myAsync();
}
}

结果如下:

image

​ 我们发现此时SpringBoot采用的是SimpleAsyncTaskExecutor线程池,我们可以将线程池更换为自己定义的线 程池,在@Async注解上指定要使用的线程池

1
2
3
4
5
6
7
8
@Component
public class AsyncTask {
@Async("mythreadpool")
public void myAsync(){
System.out.println("spring boot async task test " +
Thread.currentThread().getName());
}
}

1.4.线程池监控

​ 在前面的案例中,我们获取并打印了线程池的核心参数信息,但如果要对线程池的运行状况加以监控,我们就 需要对ThreadPoolTaskExecutor进行扩展,当有任务提交到线程池中执行的时候自动的打印线程池的运行参数状 况。

​ 1:在com.chongba.schedule.service包下创建VisiableThreadPool类,对ThreadPoolTaskExecutor进行扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class VisiableThreadPool extends ThreadPoolTaskExecutor {
private void logs(String info){
//收集线程池运行状况数据信息
//线程名称前缀
String prefix = this.getThreadNamePrefix();
//任务总数
long taskCount = this.getThreadPoolExecutor().getTaskCount();
//已完成的任务数
long completedTaskCount = this.getThreadPoolExecutor().getCompletedTaskCount();
//当前正在执行任务的线程数
int activeCount = this.getThreadPoolExecutor().getActiveCount();
//任务等待队列中任务数
int queueSize = this.getThreadPoolExecutor().getQueue().size();
log.info("{},{},taskCout={},completedTaskCount={},activeCount={},queueSize={}",
prefix,info,taskCount,completedTaskCount,activeCount,queueSize);
}
@Override
public void execute(Runnable task) {
super.execute(task);
logs("do execute ");
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> future = super.submit(task);
logs("do submit");
return future;
}
}

常用监控状态:

​ getThreadNamePrefix:线程池内线程名称前缀,方便定位

​ getTaskCount:线程池已经执行的和未执行的任务总数;

​ getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;

​ getActiveCount:当前线程池中正在执行任务的线程数量。 queueSize:缓冲队列大小。

​ 2:在启动类/配置类中将VisiableThreadPool添加到容器中,比如在启动类ScheduleApplication中,添加如 下代码

1
2
3
4
5
6
7
8
9
10
11
12
@Bean("visiableThreadPool")
public ThreadPoolTaskExecutor visiableThreadPool(){
ThreadPoolTaskExecutor visiableThreadPool = new VisiableThreadPool();
visiableThreadPool.setCorePoolSize(10);
visiableThreadPool.setMaxPoolSize(1000);
visiableThreadPool.setKeepAliveSeconds(60);
visiableThreadPool.setQueueCapacity(1000);
visiableThreadPool.setThreadNamePrefix("visiableThreadPool-");
visiableThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
visiableThreadPool.initialize();
return visiableThreadPool;
}

​ 3:回到测试类ThreadPoolTaskExecutorTest编写测试方法testVisiable(),只向线程池中提交任务,查看控制 台日志输出

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testVisable(){
for(int i=0;i<1000;i++){
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("visiableThreadPool test-
"+Thread.currentThread().getName());
}
});
}
}

会发现执行报错!!!!!

​ 原因是因为我们在启动类中向容器中放入了两个ThreadPoolTaskExecutor的bean,在我们的 ThreadPoolTaskExecutorTest中都是采用的自动注入,因此我们需要修改注入方式,将自动注入修改为按名称注 入

1
2
3
// @Autowired
@Resource(name = "visiableThreadPool")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

https://1902756969.github.io/Hexo/2023/02/25/多线程/线程池高级篇/
作者
发布于
2023年2月25日
许可协议