1. 线程池的原理
-
线程池优点:
- 降低资源消耗
- 提高响应速度
- 提高线程可管理性
-
线程池流程
- 判断核心线程池中线程是否都在执行任务,否则创建新工作线程,是则进入下一步
- 判断工作队列是否已满,否则提交新的任务存储在这个工作队列里,是则进入下一步
- 判断线程池中的线程是否都处于工作状态,否则创建一个新的工作线程来执行任务,是则交给饱和策略来处理这个任务
-
ThreadPoolExecutor执行execute方法有4中情况:
- 当前运行线程少于corePoolSize,创建新线程执行任务(需要全局锁)
- 运行线程多余corePoolSize,将任务加入BlockingQueue
- BlockingQueue已满,创建新线程来处理任务(需要全局锁)
- 创建新线程超过maximumPoolSize,拒绝任务,调用RejectedExecutionHandler.rejectedExecution()方法
-
executor()源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 工作线程:线程池创建线程时会将线程封装成工作线程Worker,Worker在完成任务后会循环获取工作队列里的任务来执行:
- 线程池中的线程执行任务有两种情况:
- execute()方法创建一个线程时,会让线程执行当前任务
- 线程完成当前任务后,会反复从BlockingQueue中获取任务来执行
2. 线程池的使用
2.1 线程池的创建
大致有以下几个参数:
-
corePoolSize:线程池基本大小;提交任务时,创建线程,直到达到这个大小就不新建了。调用prestartAllCoreThreads方法会提前创建好。
-
maximumPoolSize:线程池最大数量,队列满了,而且创建的线程数小于最大线程数,会创建新线程执行任务,注意使用了无界队列作为任务队列这个参数就没效果了。
-
keepAliveTime:线程活动保持时间,线程池工作线程空闲后保持存活的时间
-
TimeUnit:keepAliveTime 的单位
-
workQueue:任务队列,用于保存等待执行的任务的队列,可以是阻塞队列(BlockingQueue)的各种实现类。
-
threadFactory:创建线程的工厂,可以给线程起标志性的名称等。
-
RejectedExecutionHandler:饱和策略:
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:调用者所在线程来运行任务
- DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务
- DiscardPolicy:不处理,丢弃
-
构造器源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2.2 线程池提交任务
- execute():用于提交不需要返回值的任务
- submit():用于提交需要返回值的任务:线程池会返回一个future类型对象,通过该对象可以判断任务是否执行成功,通过future.get()方法获取返回值,该方法会阻塞当前线程知道任务完成,也有get(long timeout, TimeUnit unit)作为超时等待。
2.3 关闭线程池
- shutdown()和shutdownNow()方法可以关闭线程池:遍历线程池中的工作线程,逐个调用interrupt方法来中止。
- shutdownNow首先将线程池置为STOP,然后尝试停止所有正在执行或者暂停任务的线程,并返回等待执行任务的列表
- shutdown只是将线程置为SHUTDOWN状态,然后中断所有没有正在执行的任务的线程
2.4 合理分配线程池
- 任务性质:CPU密集,IO密集,混合
- 任务优先级:高,中,低
- 任务执行时长:长,中,短
- 任务依赖性:是否依赖其他系统资源
2.5 线程监控
常用属性:
- taskCount:需要执行的任务数量
- completedTaskCount:线程池在运行过程中已经完成的数量
- largestPoolSize:线程池曾经创建过的最大线程数量
- getPoolSize:线程池线程数量
- getActiveCount:活动线程数
网友评论