Executor简介
Executor 框架结构(主要由三大部分组成)
任务:执行任务需要实现的Runnable接口或Callable接口。两者区别在于:Runnable接口不会返回结果但是Callable接口可以返回结果。
任务的执行:任务执行机制的核心接口Executor ,以及继承自Executor 接口的ExecutorService接口。ThreadPoolExecutor和ScheduledThreadPoolExecutor这两个关键类实现了ExecutorService接口。
异步计算的结果:Future接口以及Future接口的实现类FutureTask类。当我们把Runnable接口或Callable接口的实现类提交(调用submit方法)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,会返回一个FutureTask对象。
Executor框架的使用示意图
主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以实现Runnable对象封装成一个Callable对象。(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))。
然后可以把创建完成的Runnable对象直接交给ExecutorService执行:(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))
如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象。
最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行
执行execute()方法和submit()方法的区别是什么呢?
1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
ThreadPoolExecutor浅析
ThreadPoolExecutor是线程池的真正实现,通常使用工厂类Executors来创建,它的构造方法提供了一系列参数来配置线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
由于ThreadPoolExecutor的参数在上一篇JAVA中的线程池详细说明过,这里不再累赘。主要介绍3种常见的线程池,它们都直接或者间接地通过配置ThreadPoolExecutor来实现自己的功能特性,这个3种线程池分别是FixedThreadPool,CachedThreadPool,以及SingleThreadExecutor。
FixedThreadPool
FixedThreadPool会使用一个优先固定数目的线程来处理若干数目的任务。规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。FixedThreadPool模式下最多的线程数目是一定的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
(1)如果当前运行线程数小于corePoolSize,则创建一个新的线程来执行任务。
(2)如果当前线程池的运行线程数等于corePoolSize,那么后面提交的任务将加入LinkedBlockingQueue。
(3)线程在执行完任务之后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
这里还有点要说明的是FixedThreadPool使用的是无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)。使用该队列作为工作队列会对线程池产生如下影响
(1)当前线程池中的线程数量达到corePoolSize后,新的任务将在无界队列中等待。
(2)由于我们使用的是无界队列,所以参数maximumPoolSize和keepAliveTime无效。(maximumPoolSize跟核心线程数一致)
(3)由于使用无界队列,运行中的FixedThreadPool不会拒绝任务(当然此时是未执行shutdown和shutdownNow方法),所以不会去调用RejectExecutionHandler的rejectExecution方法抛出异常
SingleThreadExecutor
SingleThreadExecutor模式只会创建一个线程。它和FixedThreadPool比较类似,不过线程数是一个。如果多个任务被提交给SingleThreadExecutor的话,那么这些任务会被保存在一个队列中,并且会按照任务提交的顺序,一个先执行完成再执行另外一个线程。SingleThreadExecutor模式可以保证只有一个任务会被执行。这种特点可以被用来处理共享资源的问题而不需要考虑同步的问题。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
(1)如果当前线程数少于corePoolSize即线程池中没有线程运行,则创建一个新的线程来执行任务。
(2)在线程池的线程数量等于corePoolSize时,将任务加入到LinkedBlockingQueue。
(3)线程执行完成(1)中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
CachedThreadPool
CachedThreadPool首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,而maximumPoolSize被设置Integer.MAX_VALUE,即maximumPoolSize是无界的,而keepAliveTime被设置为60L,单位为秒。也就是空闲线程等待时间最长为60秒,超过该时间将会被终止。而且在这里CachedThreadPool使用的是没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPoolSize是无界的,也就是意味着如果主线程提交任务的速度高于线程池中线程处理任务的速度时CachedThreadPool将会不断的创建新的线程,在极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源
(1)首先执行SynchronousQueue.offer(Runnable task),添加一个任务。如果当前CachedThreadPool中有空闲线程正在执行队列poll操作,那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则进入第(2)步。
(2)当CachedThreadPool初始线程数为空时,或者当前没有空闲线程,将没有线程去执行SynchronousQueue.poll()。这样的情况下,步骤(1)将会失败,此时CachedThreadPool会创建一个新的线程来执行任务,execute()方法执行完成。
(3)在步骤(2)中创建的新线程将任务执行完成后,会执行SynchronousQueue.poll(),这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒,如果60秒内主线程提交了一个新任务,那么这个空闲线程将会执行主线程提交的新任务,否则,这个空闲线程将被终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool是不会使用任何资源的。
线程池中的线程数量可增可减,但是要特别注意提交任务速度一直大于线程消费任务速度会造成OOM
ScheduledThreadPoolExecutor浅析
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor的执行主要分为两大部分:
当调用ScheduledThreadPoolExecutor的 scheduleAtFixedRate() 方法或者scheduleWirhFixedDelay() 方法时,会向ScheduledThreadPoolExecutor的 DelayQueue 添加一个实现了 RunnableScheduledFutur 接口的 ScheduledFutureTask
线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改:
- 使用 DelayQueue 作为任务队列;
- 获取任务的方不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor执行周期任务的步骤
- 线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前系统的时间;
- 线程1执行这个ScheduledFutureTask;
- 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间;
- 线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
总结
各自的适用场景:
FixedThreadPool:适用于为了满足资源管理需求,而需要限制当前线程的数量的应用场景,它适用于负载比较重的服务器。
SingleThreadExecutor:适用于需要保证执行顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的场景。
CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。
ScheduledThreadPoolExecutor:适用于需要多个后台执行周期任务,同时为了满足资源管理需求而需要限制后台线程的数量的应用场景,
SingleThreadScheduledExecutor:适用于需要单个后台线程执行周期任务,同时保证顺序地执行各个任务的应用场景。