一: 线程池定义:
1. 为什么会出现线程池?
频繁创建线程是一个比耗时且有一定的资源开销,故引入了线程池,将线程通过池子的管理,进行重复利用。
2. 如果创建线程:
a ) 通过Executors辅助类创建线程池的实例:
它默认提供了4种线程池策略:
newFixedThreadPool(int nThreads) 创建一个固定数量的线程池,它的缓存任务队列的大小是Int 的最大值,同时拒绝策略采用抛出异常 RejectedExecutionException
Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。
b ) 直接通过调用ThreadPoolExecutor 的构造方法进行创建:
/**
corePoolSize : 线程池线程初始数
maxinumPoolSize : 线程池最大线程数
keepAliveTime : (maxinumPoolSize - corePoolSize) 的线程存活时长
unit : 存活时长的时间单位
workQueue : 任务阻塞队列
ArrayBlockingQueue:构造函数一定要传大小
LinkedBlockingQueue:构造函数不传大小会默认为(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
SynchronousQueue:同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。
PriorityBlockingQueue : 优先队列
threadFactory : 线程工厂
handler : 拒绝执行策略处理器
拒绝策略有4种:
AbortPolicy: 当任务数超出线程池处理数,就会抛出该异常
DiscardPolicy: 当任务数超出线程池处理数,它会将后续提交的任务进行丢弃
DiscardOldestPolicy: 当任务数超出线程池处理数,它会将阻塞队列中第一个位置的元素弹出
CallerUnsPolicy: 当任务数超出线程池处理数,它会将任务抛出调用者线程进行执行
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
二:线程池状态及生命周期:
RUNNING : 表示线程池可以接收新的任务提交,并且还可以正常处理阻塞队列中的任务。
SHUTDOWN : 不再接收新的任务提交,但还会继续处理阻塞队列中的任务。
STOP : 不再接受新的任务提交,同时还会丢弃阻塞队列中的未处理的任务,并且会立即中止当前处理中的任务
TIDYING : 表示所有任务都执行完毕后(包括阻塞队列),当前线程池中的活动线程数量降为0,将会调用terminated方法
TERMIMATED : 线程池终止状态,当terminated() 方法执行完毕后,线程池将会处理该状态之下。
三:线程池的结构:
3.1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
该属性是原子的Integer 类型,可以实现原子操作,Doug Lea 将这个ctl 变量,拆分成二部分,前29位用于记录工作线程数, 后3位表示线程池的状态
3.2 线程池中大量的提供了对 AtomicInteger ctl 的状态原子操作变更 和 工作线程数量的原子变更
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
3.3 private final BlockingQueue<Runnable> workQueue;
该队列用于存放 工作任务 (Runable,Callable)
3.4 private final HashSet<Worker> workers = new HashSet<Worker>();
该HashSet 集合,专门用于存在线程池中正在工作的线程
3.5 private volatile ThreadFactory threadFactory;
用于创建线程池线程的工厂方法,可以自己扩展,方便将线程命名之类
3.6 private volatile RejectedExecutionHandler handler;
用于执行任务过多超载时,执行的拒绝策略
3.7 private volatile int corePoolSize;
线程池核心线程数
3.8 private volatile int maximumPoolSize;
线程池最大的线程数
3.9 private volatile long keepAliveTime;
线程池空余线程(非核心线程)的存活时间
3.10 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}
它是线程池中的内部类,主要就是封装了线程和任务.
四:线程池中一些重要的实现:
1. 当任务提交,会进行判断
如果线程数小于基本线程数(coreSize), 则直接创建线程,并将线程和任务包装成worker 对象放入 工作线程集合中
如果线程数大于基本线程数,则放入到任务队列中
如果任务队列已满,则又会创建线程到线程池最大值
如果线程数已经达到最大值,任务队列已满,则执行拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
//工作线程数量与线程池核心线程数比较,小于核心数线程数量,则创建线程,并且将创建的工作线程及其绑定的任务的worker 对象放入工作线程集合中,而且同时会调用线程的start方法。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池处于运行状态,然后将任务放入到工作队列中去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池是否处于运行状态,如果不是运行状态,则将任务从队列中移除,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程数为0,表示
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//Work 线程中的run 方法,调用了线程池的runWorker 方法:
/** Delegates main run loop to outer runWorker */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
public void run() {
runWorker(this);
}
}
// 在线程调用start 方法后,里面有一个while 循环,其判断条件是:在线程第一次创建时,会直接执行其任务,如果没有任务,则从队列中获取任务,如果没有获取到任务则一直阻塞,直到任务获取到为止。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 要么执行提交过来的任务,要么从工作队列中获取任务 getTask(),没有获取到任务则阻塞,直到获取到任务为止
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 移除空闲多余线程
processWorkerExit(w, completedAbruptly);
}
}
// 在getTask 中的阻塞是通过 workQueue.take() 方法实现,同时在for 循环中,会判断线程是否超时,如果超时,则会将线程打断
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果工作线程数大于核心线程数,且线程存活时间超过设定时间时,返回true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作线程数大于最大线程数或者线程超时 并且 工作线程大于1 或者 工作队列任务数为空时,那么take 方法就会返回null , 从而把流程交给了 runWorker 方法中的 finally 代码块,
// 而finally 代码块,会将空闲多余的线程从工作线程中移除.
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
五:其它相关的信息:
PS: 在ThreadPoolExecutor 中的submit 方法会吞掉异常信息,而execute 方法只能出现部分信息,最好是将异常进行包装,如下:
public Runnable wrap(final Runable task, final Exception ex , String threadName) {
return new Runable(){
try {
task.run();
} catch (Exception e) {
ex.printStackTrace(e);
throw e;
}
}
}