AnthonyZero's Bolg

JAVA中的线程池

Alt text

前言

在开发过程中,合理的使用线程池能够带来3个好处
第一:降低资源消耗,通过重复地利用已创建的线程降低线程创建和销毁带来的消耗
第二:提高响应速度,当任务到达时任务可以不需要等待线程创建就能立即执行
第三:提高线程的可管理性,线程是稀缺资源,应当将其放入一个池子中进行统一分配、调优和监控

实现原理

如下:ThreadPoolExecutor执行execute()方法时,线程的处理流程图

  1. 判断当前线程池线程数量是否小于核心线程池大小,是则创建线程并启动,否则到第2步
  2. 判断任务队列是否已满,未满则将任务加入阻塞队列,已满则到第3步
  3. 判断当前线程池线程数量是否小于最大线程池大小,是则创建线程并启动,否则执行饱和策略 Alt text

当前运行的线程少于corePoolSize,直接创建新线程来执行任务(这是一个预热阶段,而且需要获取全局锁),后面当前运行的线程大于等于corePoolSize的时候,几乎所有的execute()方法调用都是执行步骤2,这步不需要获取全局锁

通过ThreadPoolExecutor的execute方法源码了解线程池的工作原理

public void execute(Runnable command) {
    //任务为空,抛出空指针异常
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //判断当前线程数量是否小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //是则添加一个核心线程到线程池,并且启动线程执行任务(addWorker方法里会启动)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //线程池处于运行状态则向阻塞队列添加该任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //判断线程池是否处于运行状态,不是就移除刚才添加的任务
        if (! isRunning(recheck) && remove(command))
            //移除成功就执行饱和策略,这样整个方法就结束了
            reject(command);
        //否则若处于运行状态或移除失败,这时无论处于哪种情况任务都在阻塞队列里,判断当前线程数量是否为0
        else if (workerCountOf(recheck) == 0)
            //若是则添加一个线程并启动
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

线程池创建线程时,会将线程封装为工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。

线程池的创建

我们可以通过ThreadPoolExecutor来创建一个线程池

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

corePoolSize:线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在线程池中处于闲置状态。除非我们将ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所设置的超时时间,闲置的核心线程就会被终止。

maximumPoolSize:线程池中所容纳的最大线程数,如果队列满了并且已经创建的线程小于这个值,则线程会在创建新的线程执行任务。如果使用了无界队列这个参数就没什么效果。

keepAliveTime:线程池的工作线程空闲后,保持存活的时间。所以如果任务很多并且每个任务的执行时间比较短,可以调大时间提高线程的利用率

unit:用于指定keepAliveTime参数的时间单位。可以使用的单位有天(TimeUnit.DAYS),小时(TimeUnit.HOURS),分钟(TimeUnit.MINUTES),毫秒(TimeUnit.MILLISECONDS)等

workQueue:线程池中保存等待执行的任务的阻塞队列,可选择ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等

threadFactory: 线程工厂,为线程池提供新线程的创建。ThreadFactory是一个接口,里面只有一个newThread方法。 默认为DefaultThreadFactory类

handler: 饱和策略,是RejectedExecutionHandler对象,而RejectedExecutionHandler是一个接口,里面只有一个rejectedExecution方法。当任务队列已满并且线程池中的活动线程已经达到所限定的最大值或者是无法成功执行任务,这时候ThreadPoolExecutor会调用RejectedExecutionHandler中的rejectedExecution方法。在ThreadPoolExecutor中有四个内部类实现了RejectedExecutionHandler接口。在线程池中它默认是AbortPolicy,在无法处理新任务时抛出RejectedExecutionException异常。

提交任务

execute: 当我们使用execute来提交任务时,由于execute方法没有返回值,所以说我们也就无法判定任务是否被线程池执行成功。

service.execute(new  Runnable() {
    public void run() {
        System.out.println("execute方式");
    }
});

submit: 当我们使用submit来提交任务时,它会返回一个future,我们就可以通过这个future来判断任务是否执行成功,还可以通过future的get方法来获取返回值.如果子线程任务没有完成,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时候有可能任务并没有执行完。

Future<Integer> future = service.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("submit方式");
                    return 666;
                }
            });
try {
    Integer number = future.get();
    System.out.println("返回值为:" + number);
} catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

关闭线程池

调用线程池的shutdown()或shutdownNow()方法来关闭线程池
shutdown原理:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程,(已提交的任务会被继续执行)。
shutdownNow原理:将线程池的状态设置成STOP状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。

中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止 但调用上述的两个关闭之一,isShutdown()方法都会返为true,当所有任务都已关闭后,才表示线程池关闭完成,这时候isTerminated()方法返为true。
通常调用shutdown方法来关闭线程池,当需要立刻中断所有的线程,如果任务不一定要执行完的话,可直接调用shutdownNow()方法

线程池监控

ThreadPoolExecutor提供的线程池监控相关方法
getPoolSize():获取当前线程池大小
getQueue():获取工作队列实例
getLargestPoolSize():获取工作者线程数曾达到的最大数
getAliveCount():获取线程池中正在执行任务的工作者线程数
getTaskCount():获取线程池接收到的任务数
getCompletedTaskCount():获取已处理完毕的任务数

合理配置线程池

通常我们是需要根据分析这批任务执行的性质来确定:

  • IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
  • CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数 + 1相当的大小
  • 选择合适的阻塞队列(建议使用有界队列):有界队列能增加系统的稳定性和预警能力。
  • 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲的时间就越长,那么线程数应该设置得越大,这样才能更好的利用CPU.