美文网首页
ThreadPoolExecutor实现

ThreadPoolExecutor实现

作者: kindol | 来源:发表于2018-08-02 23:24 被阅读0次

构造器

ThreadPoolExecutor提供了四种构造器,但都是统一调用了同一个构造器,只是传入的参数设置不同

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    //参数要求判断
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数的意义看线程池基础部分

从继承角度看

ThreadPoolExecutor是继承了AbstractExecutorService的,而AbstractExecutorService实现了ExecutorService接口,ExecutorService接口又继承了Executor接口,这是最底层了,Executor接口仅有一个方法,

void execute(Runnable command);

那么,往上面一层看看,也就是ExecutorService接口,它声明了一些方法:submit、invokeAll、invokeAny以及shutDown、shutDownNow等

而AbstractExecutorService,主要是实现了以下几个方法

//将runnable构造为futureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable[, T value]) {
    return new FutureTask<T>(runnable, value(或者为null));
}

//submit方法
//无论传入runnable或者callable对象,都转换为futureTask对象,再调用excute()执行,看看其中一个实现即可
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

//invokeAny方法
//invokeAll方法
这两个方法先占坑,后面补上

下面就是重头戏了,ThreadPoolExecutor

线程池的状态

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;    //调用了shutdown(),此时线程池不能够接受新的任务,它会等待所有任务执行完毕
static final int STOP       = 2;    //调用了shutdownNow()方法,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
static final int TERMINATED = 3;    //当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为此状态

可以看到,runState是一个volatile变量,以此标志线程池的状态

任务的执行

任务相关的一些重要变量

private final BlockingQueue<Runnable> workQueue;
//任务缓存队列,存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();
//线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  
//存放工作集

private volatile long keepAliveTime;    
//线程存活时间   
private volatile boolean allowCoreThreadTimeOut;
//是否允许为核心线程设置存活时间
private volatile int corePoolSize;
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize;   
//线程池最大能容忍的线程数
private volatile int poolSize;       
//线程池中当前的线程数

private volatile RejectedExecutionHandler handler; 
//任务拒绝策略
private volatile ThreadFactory threadFactory;   
//线程工厂,用来创建线程
private int largestPoolSize;   
//记录线程池中曾经出现过的最大线程数
private long completedTaskCount;   
//记录已经执行完毕的任务个数

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,尽管也可以通过submit执行,但submit也是调用的execute,看看execute定义

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
  1. 首先传入任务,如果传入的任务为空,抛出异常
  2. 判断当前线程池线程数目小于核心线程数量的情况,如果小于则调用addIfUnderCorePoolSize往线程池中添加;添加失败(失败的情况见下面关于此函数的分析)或者线程池的数目大于核心池就进入判断
  3. 如果当前线程池在运行并且往workQueue(阻塞队列)中成功添加任务,则还要再次进入判断避免添加完成后其他线程突然调用shutdown或者shutdownNow方法关闭了线程池,此时需要一种应急措施使得缓存队列的任务得到处理,调用ensureQueuedTaskHandled()
  4. 如果线程池处于非运行状态或者队列满了,那么就调用addIfUnderMaximumPoolSize直接创建线程添加到线程池中,如果还失败,那么就采用拒绝策略

其实以上就是线程池添加一个任务的过程,不过上面留了几个函数没解释,现在就来看看实现

addIfUnderCorePoolSize方法

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //创建线程去执行firstTask任务   
        } finally {
        mainLock.unlock();      //好的习惯
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

这个方法只有在线程池的数目低于核心线程数的时候才会执行,往线程池中添加线程首先得获取本线程池对象的锁,然后,还要再次确认当前线程数是否小于核心池的数目并且线程池要在运行,因为也有可能有其他线程同时进入本方法,而且可能发生在当前线程进入本方法的同时,另外一个线程调用了shutdown()等的情况,结合以上execute(),不难发现这其实就是经典的DCL(双重检查机制)

好了,假设检查通过,那就到了addThread()(很重要的方法!),传进去提交的任务,返回一个Thread类型的变量,接着判断此变量是否为空,为空说明创建线程失败,否则启动线程,下面看看这个addThread方法

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将创建的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

一上来就创建了一个Worker对象,Worker是线程池中真正工作的,其实本质上是一个线程,只是线程池对他进行了包装,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中,看看Worker类的实现

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);
            //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据,自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;  //第一次创建worker的时候会通过构造器传入一个任务
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作       
        }
    }
}

看到开头实现了Runnable接口,立马就知道是任务载体,那么最自然的,线程执行时调用的就是run方法,由于初始化worker的时候会通过构造器传进来任务firstTask,然后firstTask=null,后面就需要使用getTask()从阻塞队列中获得任务,获得任务成功就进入执行runTask(),失败就跳出for循环,说明线程池没有任务了,就调用workerDone(),先来看看getTask()

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)      
            //非阻塞取出任务,如果没任务直接返回null,线程池清理完成
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)     //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,则通过poll取任务,若等待一定的时间取不到任务,则返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();   //不满足以上条件时,take会一直阻塞
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中断处于空闲状态的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

getTask方法中,先判断线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null,因为这时候不用做任务了(但如果为SHUTDOWN还是要做任务的);如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

没有取到任务,也就是r为null,这时候调用workerCanExit()判断是否允许当前worker退出,看看workerCanExit的实现

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大于等于STOP,或者任务缓存队列为空了
    //或者 允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

OK,返回到上面getTask中,判断如果允许worker退出,那么调用interruptIdleWorkers()中断处于空闲状态的worker,这个函数就是遍历每一个worker,然后调用worker里的interruptIfIdle()

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的,如果成功获取了锁,说明当前worker处于空闲状态
        try {
            if (thread != Thread.currentThread())  
            thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

好了,终于可以回到execute(),继续,也就是当缓冲区满或者线程池不在运行的情况,此时尝试直接创建线程,这时候会发现addIfUnderMaximumPoolSize()和addIfUnderCorePoolSize()基本一样,就是poolSize < maximumPoolSize这里不同而已

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

线程池中的线程初始化

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}

上面传进去的参数是null,因而在最后执行线程的时候会阻塞在getTask()方法中的r = workQueue.take();

参考:
http://www.importnew.com/19011.html

相关文章

网友评论

      本文标题:ThreadPoolExecutor实现

      本文链接:https://www.haomeiwen.com/subject/ipeevftx.html