1. 为什么使用线程池
线程池是作为程序员必备的技能,合理的使用线程池也是必须的,但是在实际开发过程中,使用线程池的参数都是复制粘贴来的,也不会去考察线程池设计是否符合实际使用场景,是否合理,这篇文章不会深入去说线程池的原理,但是基本的配置参数说明还是有的,希望能对大家开发有用。
为什么要使用线程池:
- 降低资源的消耗,降低线程创建和销毁时对资源的消耗;
- 提高响应时间(需要一个新线程执行任务,基本都是创建、执行、销毁,对应的时间分别是:T1、T2、T3,如果每次都是这样操作,是很浪费时间和对计算机资源的浪费,使用线程池后就可以免去T1和T3的时间,大大提高了效率;
- 提高线程的可管理性。
2. 线程的基本参数分析
先看一下ThreadPoolExecutor
线程池的基础类,构造方法如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor
类中有多个构造方法,但是都会在内部调用此构造方法,也就是最终的构造方法,这里就看它了。这个构造方法的参数有corePoolSize
、maximumPoolSize
、keepAliveTime
、unit
、workQueue
、threadFactory
、handler
,下面就依次看一下这些参数的含义和用法。
2.1 corePoolSize(核心线程数)
表示线程池的核心线程数量,当线程池中的线程数量小于这个值得时候,再有新任务进来时,会直接创建新的线程去执行,当线程数量达到corePoolSize时,就会将溢出的任务放到workQueue中,等待执行。
这里需要注意的是,很多小伙伴都认为设置这个参数后,线程池启动就会创建corePoolSize数量的线程,并放在线程池中,等待使用,但事实并非如此,而是有任务进来的时候才会创建线程,初始的线程池是空的,如果需要线程池拥有corePoolSize数量的线程被创建,那就在初始化的时候调用prestartAllCoreThreads()
方法。
2.2 maximumPoolSize(最大线程数)
表示允许最大线程数,上一个参数中说多余的任务会放到workQueue中,当workQueue也满了以后,就会判断当前线程池中线程数量是不是小于maximumPoolSize,如果小于,那就接着创建线程,执行任务。
2.3 keepAliveTime(最大存活时间)
线程空闲下来后存活的时间,这个存活时间只对线程池中线程数量超过corePoolSize的时候才会生效。也就是说,当线程池中的线程数量小于corePoolSize,空闲的线程是不会根据这个时间销毁,而是一直存在于线程池中。
2.4 unit(时间单位)
很简单,就是指定keepAliveTime的时间单位。
2.5 workQueue(工作缓存队列)
保存任务的阻塞队列,是在线程数达到corePoolSize后,新进来的任务会被保存到workQueue中,这里使用的是有界的队列(BlockingQueue<Runnable> workQueue
)。
2.6 threadFactory(线程制造工厂)
采用工厂模式,用于创建线程的工厂,并给线程设定线程名。
2.7 handler(饱和策略)
JDK中给出的RejectedExecutionHandler实现类有四个。
- AbortPolicy:直接抛出异常(默认)
- CallerRunsPolicy:用调用者所在的线程来执行任务
- DiscardOldestPolicy:丢弃阻塞队列中最老的任务,队列中最靠前的任务
- DiscardPolicy:直接忽略当前添加进来的任务
但是实际应用中这四种方式都不会很使用,后两种放弃任务应该是没有使用的,无论任务的重要性,一般都线上项目都不会忍受任务的丢失。那么都不是很好用只能自己来制定符合实际要求的策略,这里很简单,只要实现RejectedExecutionHandler
接口类,实现其中唯一的一个接口方法rejectedExecution()
即可。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
3. 线程池的使用
线程池的使用也就是线程池的创建和任务的提交等。线程池的创建这里就不去说了,只要知道参数的含义,也就很容易创建一个线程池出来。
3.1 提交线程任务
主要的两种提交任务的方式有两种,一种是有返回值,一种是无返回值的。
void execute(Runnable command) 不需要返回(源码)
Future<T> submit(Callable<T> task)需要返回(源码)
execute方法是在ThreadPoolExecutor中实现的,只支持Runnable实现的任务,submit是在ThreadPoolExecutor父类AbstractExecutorService
中实现的,可以支持Callable实现方式也支持Runnable实现方式。
3.2 关闭线程池
shutdown():设置线程池的关闭状态,并关闭未执行的任务线程
shutdownNow():设置线程池的关闭状态,并关闭正在执行或者暂停的任务线程
3.3 执行线程池过程分析
3.3.1 execute()
看一下execute方法的源代码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//判断线程池中线程小于corePoolSize
if (addWorker(command, true)) //创建新线程,执行任务
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//向队列中推入任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//如果上面推入队列失败
reject(command); //执行饱和策略
}
从上面的源代码可以看的出来当使用execute方法加入任务后的一系列操作。在这个方法上面也给出了对应的解释、代码中也给出了关键位置的注释,这里就不多说。
3.3.2 submit()
看一下submit的源代码:
/*----submit方法 Runnable实现方式----*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//看下面的newTaskFor方法
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);//调用execute方法执行任务
return ftask;
}
//使用了FutureTask包装了任务对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/*----submit方法 Callable实现方式----*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
//看下面的newTaskFor方法
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);//调用execute方法执行任务
return ftask;
}
//使用了FutureTask包装了任务对象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
4. 线程池工作队列
4.1 ArrayBlockingQueue
有界队列策略,可以在创建队列的时候指定队列长度,创建时会根据队列长度开辟一段连续的内存,容量设置需谨慎,过大浪费空间,过小易添加失败。
4.2 LinkedBlockingQueue
有界队列策略,可以在创建队列的时候指定队列长度,队列创建时不会开辟大量内存空间,根据节点增加动态的拓展,存储空间非连续。
4.3 DelayQueue
无界队列策略,元素都有一个过期时间,队列内部也是根据过期时间进行排序,从小到大排列,只有过期的元素才会被取出并执行。
4.4 SynchronousQueue
直接提交策略,工作队列本身是不存储元素,相当于通道,插入的数据立即就会被移除并处理,处理速度很快,但是存在数据过多导致线程创建过多问题。
4.5 PriorityBlockingQueue
优先级策略,无边界队列,添加元素根据优先级排序,优先执行级别高的元素。
5. JDK给出的几种线程池实现
5.1 ThreadPoolExecutor
一般的线程池构建类,所有相关的参数都是可以自定义的,一般都是直接使用默认即可。
其执行流程图如下:
5.2 FixedThreadPoolExecutor
创建固定线程数量的线程池(corePoolSize、maximumPoolSize值相同),适用于负载较重的服务器。使用了无界队列LinkedBlockingQueue
。
其执行流程图如下:
5.3 SingleThreadPoolExecutor
创建单个线程,适用需要保证顺序执行任务,不会有多个线程活动。这个流程图就比较简单啦,流程图省略。
5.4 CachedThreadPoolExecutor
会根据需要来创建新线程的,适用于执行很多短周期的任务,短时间可能创建线程量很大,消耗大量资源。使用的是无界队列SynchronousQueue
。
5.5 WorkStealingPool(JDK7)
基于ForkJoin实现
5.6 ScheduledThreadPoolExecutor
需要定期执行周期任务。都是使用无界队列DelayQueue
。newScheduledThreadPoolExecutor
:可以包含多个线程,线程执行周期任务,适度控制后台线程数量。newSingleScheduledThreadPoolExecutor
:只包含单个线程,值需要到单个线程执行周期任务,保证顺序的执行各个任务。
看一下ScheduledThreadPoolExecutor
执行流程图:
主要执行方法
- schedule():只执行一次,任务可以延时执行。
- scheduleAtFixedRate:提交固定时间间隔(上一个任务和下一个任务开始的头之间的时间间隔)
- scheduleAtFixedDelay:提交固定时间延迟间隔执行(上一个任务执行结束到下个任务执行开始之间的时间间隔)
注意:(可以自己试验一下,下面的如有错误,欢迎指正)
1、scheduleAtFixedRate出现超时后,后一个任务会根据情况是否立即执行。
如设定了时间间隔是60秒,如果第一个任务执行了80秒,后一个任务就不会在60秒的时候执行,而是80秒的时候执行。如果第二个任务执行了20秒,那么第三个任务的执行就是在80秒的基础上加60秒,那就是140秒的时候执行。这里只要理解间隔时间60秒是指下一个任务的开始和上一个任务的开始时间间隔是60秒就可以理解了。
2、当任务中出现异常的时候,需要将异常捕捉,否则任务无法进行下去,会影响后面的任务执行。因为这个时候任务会被阻塞。6. 线程池异常处理
异步操作使用线程池,当异步操作出现异常的时候是不会抛出给主线程的,那应该怎么处理,看一下下面几种解决方案。
6.1 直接在异步处理逻辑中添加try……catch
捕获
最为简单,也是最为常用的方式,由异步逻辑本身来捕获异常并加上处理逻辑。
public static void main(String[] args) {
//创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
executor.execute(() -> {
System.out.println("executor submit");
try {
int a = 1 / 0;
} catch (Exception e) {
System.out.println("异步线程自行捕获异常,并作相应的处理逻辑");
}
});
}
如果主线程依赖子线程的处理结果,这样就不行啦,需要使用下面这种方式。
6.2 使用submit提交异步操作,根据Future返回获取异常
这种方式有一个问题是Future的get方式是阻塞的,这样主线程会一直等待子线程执行完成,如果只是为了获取异常信息,且业务逻辑不是强要求获取子线程执行情况的不建议使用。因为这样阻塞就失去了本身多线程并发处理的初衷。
代码示例:
public static void main(String[] args) {
//创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
Future<?> future = executor.submit(() -> {
System.out.println("executor submit");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int a = 1 / 0;
});
try {
future.get();//阻塞
} catch (Exception e) {
System.out.println("get方法拦截异常信息:" + e);
}
}
6.3 线程设置uncaughtExceptionHandler
在ThreadFactory里面将创建的新线程时设置uncaughtExceptionHandler参数,此时就不可以使用原有默认DefaultThreadFactory,需要自定义,自定义代码可以直接使用DefaultThreadFactory内的逻辑,加上对uncaughtExceptionHandler的设值即可。
public class ThreadPoolConfig {
public static void main(String[] args) {
//创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomThreadFactory());
//模拟线程内出现异常
executor.execute(() -> {
System.out.println("直接执行,没有延迟时间……");
int a = 1 / 0;
});
//正常业务逻辑
System.out.println("正常逻辑结束");
}
//自定义线程工厂:CustomThreadFactory
private static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//线程内添加UncaughtExceptionHandler处理逻辑
t.setUncaughtExceptionHandler((t1, e) -> {
System.out.println(namePrefix + threadNumber + " 线程异常处理逻辑");
System.out.println(namePrefix + threadNumber + " 出现的异常信息:" + e);
});
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
6.4 重写ThreadPoolExecutor中的afterExecute方法
ThreadPoolExecutor本身自带的afterExecute方法是没有任何实现逻辑的,需要继承ThreadPoolExecutor并且重写afterExecute方法来实现异常的处理动作。
//重写ThreadPoolExecutor中的afterExecute方法
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("自定义ThreadPoolExecutor,处理异常信息:" + t);
}
}
public static void main(String[] args) {
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
executor.execute(() -> {
System.out.println("重写ThreadPoolExecutor中的afterExecute方法");
int a = 1 / 0;
});
}
7. 合理配置线程池
7.1 任务的性质
- 计算密集型:加密,大数据分析等
- IO密集型:读取文件,数据库连接,网络通讯等
- 混合型:计算型密集型和IO密集型的混合
7.2 线程池配置
对于计算密集型:
最大推荐线程数适当小一点,推荐为机器的CPU核心数+1,这里如何获取机器的CPU核心数,可以使用下面的方法:
Runtime.getRuntime().availableProcessors();
对于IO密集型:
最大推荐线程数适当大一点,机器CPU核心数*2
对于混合型:
尽量拆分出计算密集型和IO密集型,特别是IO密集型约等于计算机密集型,拆分后效率会更高。如果是IO密集型远大于计算密集型,拆分意义不大。
8. 总结
在实际项目中,很少有用到JDK给出的几种线程池,因为他们大多都使用了无界的任务队列,这样会存在一个很大的安全隐患,当阻塞的任务过多的时候,就会导致内存溢出。
更适用的是ThreadPoolExecutor,可以直接配置对应的参数,也可以指定需要的饱和策略,是整个线程是的更利于掌控,更符合实际的需求。