线程池

Author Avatar
xuanzh.cc 11月 15, 2017
  • 在其它设备中阅读本文章

为了避免频繁的创建和销毁线程带来的开销,我们可以使用线程池,让线程得以复用。JDK为我们提供了之方面的支持。类图如下:

Executor

线程池的顶级接口,虽然该接口是顶级接口,但是它里面只有一个 void execute(Runnable command); 方法,所以它可以被看做是一个命令执行器,而不是一个线程池。

ExecutorService

它是真正的线程池接口,该接口内声明方法如下图,里面已经包含了线程池的方法。

ScheduledExecutorService

它在 ExecutorService 的基础上扩展了定时任务和周期性任务 的功能,如下方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
在指定的时间后执行任务
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
在指定的时间后执行一个 callable,并返回ScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
在指定的延时 initialDelay 后开始每隔 period 时间就执行一次 command,
如果 任务执行的时间超过了 period,则在该任务执行完毕后马上开始下一次执行,不会出现任务堆叠的情况。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
在指定的延时 initialDelay 后开始周期性的执行任务 command,每隔任务调度(上一个任务执行完毕到下一个任务开始)之间的间隔为 delay
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

ThreadPoolExecutor

线程池的默认实现

Executors

类似一个工厂类,通过该类里面的静态方法可以方便的得到有特定功能的线程池,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
该方法返回一个固定线程数量的线程池,池中的线程数量始终不变。当有新任务提交时,如果池中有空闲的线程,则立即执行,否则就把该任务缓存到任务队列LinkedBlockingQueue中,等到后面又空闲线程时,再执行该任务。
*/
public static ExecutorService newFixedThreadPool(int nThread)
/**
该方法返回一个只有一个线程的线程池。如果有新的任务被提交时,且线程不空闲的时候,该任务会被缓存带一个任务队列LinkedBlockingQueue中,等到线程空闲时候再执行。
*/
public static ExecutorService newSingleThreadExecutor()
/**
该方法返回一个可以根据实际情况调整的线程数量的线程池,线程池里面线程的数量不确定。当有新的任务提交时,如果有空闲的线程,则用空闲的线程来执行该任务,否则就会创建一个新的线程来执行任务。
*/
public static ExecutorService newCachedThreadPool()
/**
该方法返回一个数量为 1 的ScheduledExecutorService 线程池对象。该对象可以支持定时任务。
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
/**
该方法返回一个指定线程数量的 ScheduledExecutorService 对象。
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
Executors内部的实现:
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数说明:

  • corePoolSize :指定了线程池中的线程数量
  • maximumPoolSize :制定了线程池中的最大线程数量
  • keepAliveTime :当线程池的数量超过 corePoolSize 时,在超过keepAliveTime后如果这些多余的线程空闲,就会被回收
  • unit :时间单位
  • workQueue :任务队列,当心提交的任务来不及执行的时候,就会被缓存到该队列
  • threadFactory : 线程工厂,用于创建线程
  • handler :拒绝策略。当任务来不及执行时,如何拒绝任务
线程池任务队列(BlockingQueue workQueue)的种类和各自的特点:
直接提交的队列(SynchronousQueue)

SynchronousQueue是一种特殊的 BlockQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作,每一个删除操作都必须等待一个相应的插入操作。当任务队列使用 SynchronousQueue 时,任务不会被缓存到该队列中,而总是提交给线程执行,如果没有空闲线程,或者线程数量已经到了最大值,就回执行拒绝策略。所以在使用 SynchronousQueue 的时候,一般要设置比较大的 maxxmumPoolSize,否则会很容易执行拒绝策略。

有界的任务队列(ArrayBlockingQueue)

使用ArrayBlockingQueue作为任务队列时候,要指定一个队列的最大值用于构造ArrayBlockingQueue,当有新的任务提交且没有空闲线程,如果线程池数量小于corePoolSize,则创建新的线程用于执行任务,否则就将任务缓存到任务队列中,如果任务队列已满,且线程池数量小于 maximumPoolSize 的时候,创建新的线程用于执行任务,否则执行拒绝策略。(只有当任务队列满的时候,才会将线程数量提升到 corePoolSize 之上)

无界的任务队列(LinkedBlockingQueue)

当有新的任务提交的时候,如果线程池内线程数量不到 corePoolSize,则创建新的线程执行任务,否则就讲任务缓存到队列中,由于队列是无界的,所以线程池数量到达 corePoolSzie 后就不会再增加了,如果有很多任务不断提交,且任务执行速度跟不上任务提交的速度的话,会造成队列里面的任务不断积累,很容易耗尽系统内存。

优先任务队列(PriorityBlockingQueue)

PriorityBlockingQueue是一个特殊的无界队列,但是它可以可以根据任务的优先级进行排序,让优先级高的任务优先执行。

线程池拒绝策略:
AbortPolicy

该策略直接抛出异常 RejectedExecutionException ,阻止系统正常工作。 实现类为 public static class AbortPolicy implements RejectedExecutionHandler

CallerRunsPolicy

只要线程池未关闭,该策略就在调用者的线程中执行该任务,这样很可能造成系统的性能急剧下降。实现类为:public static class CallerRunsPolicy implements RejectedExecutionHandler

DiscardOldestPolicy

丢弃最老的一个任务,不做任何的处理。实现类为:public static class DiscardOldestPolicy implements RejectedExecutionHandler

DiscardPolicy

直接丢弃无法处理的任务,不做任何处理。实现类为: public static class DiscardPolicy implements RejectedExecutionHandler

线程工厂(ThreadFactory)

线程工厂可以让我们自定义线程,通过为线程设置特定的名称,让我们后续的调试更方便。也可以自由的设置线程的状态和优先级等等。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
可命名线程组 线程工厂
*/
public class NamedThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
public NamedThreadFactory(ThreadGroup group, String name) {
this.group = group;
namePrefix = group.getName() + ":" + name;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
return t;
}
}
线程池出错时候的异常堆栈处理
  • 当使用 execute() 方法提交的任务在执行的时候出现异常,只会打印出执行该任务的线程的堆栈信息,而不会打印出任务提交线程的堆栈信息。
  • 使用 submit() 方法提交的任务在执行的时候出现异常,如果不 get() 的haunted,是不会打印出任何异常信息的,但也只会得到和execute() 方法一样的异常信息。

如果想要得到完整的堆栈异常信息,则需要将ThreadPoolExecutor 进行扩展,在提交任务的时候传入将提交任务的线程的堆栈信息保存起来。。如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public TraceThreadPoolExecutor extends ThreadPollExecutor{
......
//略
......
@Override
public void execute(Runnable command) {
super.execute(wrap(command, new Exception("exception stack trace"), Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, new Exception("exception stack trace"), Thread.currentThread().getName()));
}
/**
clientStack 记录提交的线程的堆栈信息
*/
private Runnable wrap(final Runnable command, final Exception clientStack, String threadName){
return new Runnable() {
@Override
public void run() {
try{
command.run();
}catch (Exception e){
clientStack.printStackTrace();
throw e;
}
}
};
}
......
//略
......
}

上述代码中,wrap 方法被execute 和 submit 方法调用, 传入了一个 exception 信息,该exception 记录了提交任务线程的堆栈信息,当任务执行出异常的时候,就打印出该堆栈信息,从而可以定位到任务是在哪个位置被提交的,很方便bug的排查和修改。