线程池ThreadPool浅层面原理分析


1. 为什么使用线程池

线程池是作为程序员必备的技能,合理的使用线程池也是必须的,但是在实际开发过程中,使用线程池的参数都是复制粘贴来的,也不会去考察线程池设计是否符合实际使用场景,是否合理,这篇文章不会深入去说线程池的原理,但是基本的配置参数说明还是有的,希望能对大家开发有用。
为什么要使用线程池:

  • 降低资源的消耗,降低线程创建和销毁时对资源的消耗;
  • 提高响应时间(需要一个新线程执行任务,基本都是创建、执行、销毁,对应的时间分别是:T1、T2、T3,如果每次都是这样操作,是很浪费时间和对计算机资源的浪费,使用线程池后就可以免去T1和T3的时间,大大提高了效率;
  • 提高线程的可管理性。

2. 线程的基本参数分析

先看一下ThreadPoolExecutor线程池的基础类,构造方法如下:

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor类中有多个构造方法,但是都会在内部调用此构造方法,也就是最终的构造方法,这里就看它了。这个构造方法的参数有corePoolSizemaximumPoolSizekeepAliveTimeunitworkQueuethreadFactoryhandler,下面就依次看一下这些参数的含义和用法。

2.1 corePoolSize(核心线程数)

表示线程池的核心线程数量,当线程池中的线程数量小于这个值得时候,再有新任务进来时,会直接创建新的线程去执行,当线程数量达到corePoolSize时,就会将溢出的任务放到workQueue中,等待执行。
这里需要注意的是,很多小伙伴都认为设置这个参数后,线程池启动就会创建corePoolSize数量的线程,并放在线程池中,等待使用,但事实并非如此,而是有任务进来的时候才会创建线程,初始的线程池是空的,如果需要线程池拥有corePoolSize数量的线程被创建,那就在初始化的时候调用prestartAllCoreThreads()方法。

2.2 maximumPoolSize(最大线程数)

表示允许最大线程数,上一个参数中说多余的任务会放到workQueue中,当workQueue也满了以后,就会判断当前线程池中线程数量是不是小于maximumPoolSize,如果小于,那就接着创建线程,执行任务。

2.3 keepAliveTime(最大存活时间)

线程空闲下来后存活的时间,这个存活时间只对线程池中线程数量超过corePoolSize的时候才会生效。也就是说,当线程池中的线程数量小于corePoolSize,空闲的线程是不会根据这个时间销毁,而是一直存在于线程池中。

2.4 unit(时间单位)

很简单,就是指定keepAliveTime的时间单位。

2.5 workQueue(工作缓存队列)

保存任务的阻塞队列,是在线程数达到corePoolSize后,新进来的任务会被保存到workQueue中,这里使用的是有界的队列(BlockingQueue<Runnable> workQueue)。

2.6 threadFactory(线程制造工厂)

采用工厂模式,用于创建线程的工厂,并给线程设定线程名。

2.7 handler(饱和策略)

JDK中给出的RejectedExecutionHandler实现类有四个。

  • AbortPolicy:直接抛出异常(默认)
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldestPolicy:丢弃阻塞队列中最老的任务,队列中最靠前的任务
  • DiscardPolicy:直接忽略当前添加进来的任务

但是实际应用中这四种方式都不会很使用,后两种放弃任务应该是没有使用的,无论任务的重要性,一般都线上项目都不会忍受任务的丢失。那么都不是很好用只能自己来制定符合实际要求的策略,这里很简单,只要实现RejectedExecutionHandler接口类,实现其中唯一的一个接口方法rejectedExecution()即可。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

3. 线程池的使用

线程池的使用也就是线程池的创建和任务的提交等。线程池的创建这里就不去说了,只要知道参数的含义,也就很容易创建一个线程池出来。

3.1 提交线程任务

主要的两种提交任务的方式有两种,一种是有返回值,一种是无返回值的。

void execute(Runnable command) 不需要返回(源码)
Future<T> submit(Callable<T> task)需要返回(源码)

execute方法是在ThreadPoolExecutor中实现的,只支持Runnable实现的任务,submit是在ThreadPoolExecutor父类AbstractExecutorService中实现的,可以支持Callable实现方式也支持Runnable实现方式。

3.2 关闭线程池

shutdown():设置线程池的关闭状态,并关闭未执行的任务线程
shutdownNow():设置线程池的关闭状态,并关闭正在执行或者暂停的任务线程

3.3 执行线程池过程分析

3.3.1 execute()

看一下execute方法的源代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//判断线程池中线程小于corePoolSize
        if (addWorker(command, true)) //创建新线程,执行任务
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//向队列中推入任务
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//如果上面推入队列失败
        reject(command); //执行饱和策略
}

从上面的源代码可以看的出来当使用execute方法加入任务后的一系列操作。在这个方法上面也给出了对应的解释、代码中也给出了关键位置的注释,这里就不多说。

3.3.2 submit()

看一下submit的源代码:

/*----submit方法 Runnable实现方式----*/
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //看下面的newTaskFor方法
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);//调用execute方法执行任务
    return ftask;
}
//使用了FutureTask包装了任务对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
	return new FutureTask<T>(runnable, value);
}

/*----submit方法 Callable实现方式----*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    //看下面的newTaskFor方法
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);//调用execute方法执行任务
    return ftask;
}
//使用了FutureTask包装了任务对象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

4. 线程池工作队列

4.1 ArrayBlockingQueue

有界队列策略,可以在创建队列的时候指定队列长度,创建时会根据队列长度开辟一段连续的内存,容量设置需谨慎,过大浪费空间,过小易添加失败。

4.2 LinkedBlockingQueue

有界队列策略,可以在创建队列的时候指定队列长度,队列创建时不会开辟大量内存空间,根据节点增加动态的拓展,存储空间非连续。

4.3 DelayQueue

无界队列策略,元素都有一个过期时间,队列内部也是根据过期时间进行排序,从小到大排列,只有过期的元素才会被取出并执行。

4.4 SynchronousQueue

直接提交策略,工作队列本身是不存储元素,相当于通道,插入的数据立即就会被移除并处理,处理速度很快,但是存在数据过多导致线程创建过多问题。

4.5 PriorityBlockingQueue

优先级策略,无边界队列,添加元素根据优先级排序,优先执行级别高的元素。

5. JDK给出的几种线程池实现

5.1 ThreadPoolExecutor

一般的线程池构建类,所有相关的参数都是可以自定义的,一般都是直接使用默认即可。

其执行流程图如下:

5.2 FixedThreadPoolExecutor

创建固定线程数量的线程池(corePoolSize、maximumPoolSize值相同),适用于负载较重的服务器。使用了无界队列LinkedBlockingQueue

其执行流程图如下:

5.3 SingleThreadPoolExecutor

创建单个线程,适用需要保证顺序执行任务,不会有多个线程活动。这个流程图就比较简单啦,流程图省略。

5.4 CachedThreadPoolExecutor

会根据需要来创建新线程的,适用于执行很多短周期的任务,短时间可能创建线程量很大,消耗大量资源。使用的是无界队列SynchronousQueue

5.5 WorkStealingPool(JDK7)

基于ForkJoin实现

5.6 ScheduledThreadPoolExecutor

需要定期执行周期任务。都是使用无界队列DelayQueue
newScheduledThreadPoolExecutor:可以包含多个线程,线程执行周期任务,适度控制后台线程数量。
newSingleScheduledThreadPoolExecutor:只包含单个线程,值需要到单个线程执行周期任务,保证顺序的执行各个任务。

看一下ScheduledThreadPoolExecutor执行流程图:

主要执行方法

  • schedule():只执行一次,任务可以延时执行。
  • scheduleAtFixedRate:提交固定时间间隔(上一个任务和下一个任务开始的头之间的时间间隔)
  • scheduleAtFixedDelay:提交固定时间延迟间隔执行(上一个任务执行结束到下个任务执行开始之间的时间间隔)
    注意:(可以自己试验一下,下面的如有错误,欢迎指正)
    1、scheduleAtFixedRate出现超时后,后一个任务会根据情况是否立即执行。
    如设定了时间间隔是60秒,如果第一个任务执行了80秒,后一个任务就不会在60秒的时候执行,而是80秒的时候执行。如果第二个任务执行了20秒,那么第三个任务的执行就是在80秒的基础上加60秒,那就是140秒的时候执行。这里只要理解间隔时间60秒是指下一个任务的开始和上一个任务的开始时间间隔是60秒就可以理解了。
    2、当任务中出现异常的时候,需要将异常捕捉,否则任务无法进行下去,会影响后面的任务执行。因为这个时候任务会被阻塞。

    6. 线程池异常处理

异步操作使用线程池,当异步操作出现异常的时候是不会抛出给主线程的,那应该怎么处理,看一下下面几种解决方案。

6.1 直接在异步处理逻辑中添加try……catch捕获

最为简单,也是最为常用的方式,由异步逻辑本身来捕获异常并加上处理逻辑。

public static void main(String[] args) {
    //创建线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            20,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));
    executor.execute(() -> {
        System.out.println("executor submit");
        try {
            int a = 1 / 0;
        } catch (Exception e) {
            System.out.println("异步线程自行捕获异常,并作相应的处理逻辑");
        }
    });
}

如果主线程依赖子线程的处理结果,这样就不行啦,需要使用下面这种方式。

6.2 使用submit提交异步操作,根据Future返回获取异常

这种方式有一个问题是Future的get方式是阻塞的,这样主线程会一直等待子线程执行完成,如果只是为了获取异常信息,且业务逻辑不是强要求获取子线程执行情况的不建议使用。因为这样阻塞就失去了本身多线程并发处理的初衷。

代码示例:

public static void main(String[] args) {
    //创建线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            20,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));
    Future<?> future = executor.submit(() -> {
        System.out.println("executor submit");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int a = 1 / 0;
    });
    try {
        future.get();//阻塞
    } catch (Exception e) {
        System.out.println("get方法拦截异常信息:" + e);
    }
}

6.3 线程设置uncaughtExceptionHandler

在ThreadFactory里面将创建的新线程时设置uncaughtExceptionHandler参数,此时就不可以使用原有默认DefaultThreadFactory,需要自定义,自定义代码可以直接使用DefaultThreadFactory内的逻辑,加上对uncaughtExceptionHandler的设值即可。

public class ThreadPoolConfig {

    public static void main(String[] args) {
        //创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10,
                20,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                new CustomThreadFactory());

        //模拟线程内出现异常
        executor.execute(() -> {
            System.out.println("直接执行,没有延迟时间……");
            int a = 1 / 0;
        });
        //正常业务逻辑
        System.out.println("正常逻辑结束");
    }

    //自定义线程工厂:CustomThreadFactory
    private static class CustomThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        CustomThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            //线程内添加UncaughtExceptionHandler处理逻辑
            t.setUncaughtExceptionHandler((t1, e) -> {
                System.out.println(namePrefix + threadNumber + " 线程异常处理逻辑");
                System.out.println(namePrefix + threadNumber + " 出现的异常信息:" + e);
            });
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

6.4 重写ThreadPoolExecutor中的afterExecute方法

ThreadPoolExecutor本身自带的afterExecute方法是没有任何实现逻辑的,需要继承ThreadPoolExecutor并且重写afterExecute方法来实现异常的处理动作。

//重写ThreadPoolExecutor中的afterExecute方法
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("自定义ThreadPoolExecutor,处理异常信息:" + t);
    }
}
public static void main(String[] args) {
    CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10,
            20,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));
    executor.execute(() -> {
        System.out.println("重写ThreadPoolExecutor中的afterExecute方法");
        int a = 1 / 0;
    });
}

7. 合理配置线程池

7.1 任务的性质

  • 计算密集型:加密,大数据分析等
  • IO密集型:读取文件,数据库连接,网络通讯等
  • 混合型:计算型密集型和IO密集型的混合

7.2 线程池配置

对于计算密集型:
最大推荐线程数适当小一点,推荐为机器的CPU核心数+1,这里如何获取机器的CPU核心数,可以使用下面的方法:

Runtime.getRuntime().availableProcessors();

对于IO密集型:
最大推荐线程数适当大一点,机器CPU核心数*2

对于混合型:
尽量拆分出计算密集型和IO密集型,特别是IO密集型约等于计算机密集型,拆分后效率会更高。如果是IO密集型远大于计算密集型,拆分意义不大。

8. 总结

在实际项目中,很少有用到JDK给出的几种线程池,因为他们大多都使用了无界的任务队列,这样会存在一个很大的安全隐患,当阻塞的任务过多的时候,就会导致内存溢出。
更适用的是ThreadPoolExecutor,可以直接配置对应的参数,也可以指定需要的饱和策略,是整个线程是的更利于掌控,更符合实际的需求。


文章作者: 程序猿洞晓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 程序猿洞晓 !
评论
 上一篇
死磕Java并发:深入分析synchronized的实现原理 死磕Java并发:深入分析synchronized的实现原理
记得刚刚开始学习Java的时候,一遇到多线程情况就是synchronized,相对于当时的我们来说synchronized是这么的神奇而又强大,那个时候我们赋予它一个名字“同步”,也成为了我们解决多线程情况的百试不爽的良药。但是,随着我们学习的进行我们知道synchronized是一个重量级锁,相对于Lock,它会显得那么笨重,以……
2018-07-14
下一篇 
Spring IOC初始化bean对象创建的N种实现方式理解 Spring IOC初始化bean对象创建的N种实现方式理解
Ioc—Inversion of Control,即“控制反转”,不是什么技术,而是一种设计思想。在Java开发中,Ioc意味着将你设计好的对象交给容器控制,而不是传统的在你的对象内部直接控制。如何理解好Ioc呢?理解好Ioc的关键是要明确“谁控制谁,控制什么,为何是反转(有反转就应该有正转了),哪些方面反转了”,那我们来深入分析一下。
2018-07-10
  目录