Java线程池的工作原理

   日期:2020-08-05     浏览:93    评论:0    
核心提示:线程池的创建下方代码块是线程池的完整构造函数。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, Blockin_.

线程池的创建

下方代码块是线程池的完整构造函数。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:线程池的核心线程数,可以理解为初始化之后就存活的线程,即使没有任何任务也会在线程池中等待任务
  • maximumPoolSize:最大线程数,创建的线程不能大于此线程数
  • keepAliveTime:线程的存活时间。适用于corePoolSize的之外的线程,如果到最大存活时间还未被调用就退出此线程
  • unit:用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS
  • workQueue:官方文档的直译是在执行任务之前用于保存任务的队列。此队列将只保存由execute方法提交的可运行任务。其实就是一个阻塞队列,若核心线程被占满时提交的任务会先放到这个队列里。
  • threadFactory:线程工厂,用来创建线程。
  • handler:官方文档直译是当执行被阻塞时要使用的处理程序,也称作拒绝策略。

线程池的执行流程

下方是ThreadPoolExecutor的execute方法
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//步骤1
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//步骤2
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//步骤3
            reject(command);
    }

官方文档给出了三个步骤

  1. If fewer than corePoolSize threads are running, try to
    start a new thread with the given command as its first
    task. The call to addWorker atomically checks runState and
    workerCount, and so prevents false alarms that would add
    threads when it shouldn’t, by returning false.
    如果当前线程数小于核心线程数,则创建一个线程并尝试使用给定的指令作为其第一个任务启动新线程。对addWorker方法调用会自动检查 runState和workerCount,以此通过返回false来防止在不应该添加线程的情况下出现添加线程的错误。
  2. If a task can be successfully queued, then we still need to
    double-check whether we should have added a thread (because existing
    ones died since last checking) or that the pool shut down since
    entry into this method. So we recheck state and if necessary roll
    back the enqueuing if stopped, or start a new thread if there are
    none.
    如果任务可以成功进入队列,仍然需要再次检查是否应该添加一个线程(因为现有的线程在上次检查后死亡了),或者池在进入此方法后关闭。因此,我们会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。
  3. If we cannot queue task, then we try to add a new thread. If it
    fails, we know we are shut down or saturated and so reject the task.
    如果不能将任务放入队列,则尝试添加一个新线程。如果失败了,则认为终止或者饱和,因此拒绝这个任务。

文档的解释里,第一步中的addWorker是比较难理解的,所以需要继续查看addWorker方法的源码。

addWorker上半部分源码

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查队列是否为空.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

		.........
	}

第一个if中表示,如果当前运行的状态是SHUTDOWN,并且传入的任务为空,并且队列不为空的情况,则()中为true,前面加了个!则在这种情况下&&整个判断为false,则不会执行下面的return false。因此这部分主要是为了判断队列是否为空。

wc是当前工作的线程数量
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
这块代码是在创建非核心线程时,即core等于false。判断当前线程数是否大于等于maximumPoolSize,如果大于等于则返回false,即上边说到的步骤3中创建线程失败的情况。

下半部分代码

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//创建Worker对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

这里的步骤主要是

  1. 创建Worker对象,同时也会实例化一个Thread对象。
  2. 启动这个线程

在这个步骤中,他再一次判断了是否需要添加新线程,如上述步骤2中所说,因为现有的线程在上次检查后死亡了,或者池在进入此方法后关闭。因此会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。

接下来看看Woker的逻辑

        
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        
        public void run() {
            runWorker(this);
        }

在Woker中会调用ThreadFactory来创建一个新线程。在上方的t.start()中就会调用这个run方法。

接下来继续看runWoker的逻辑,可以发现主要就是判断任务是否为空,不为空则运行。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//任务判空
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//任务运行
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

最后再来看看任务判空中的getTask()方法

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 默认是false,后者是判断当前线程数是否大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //当前线程数比核心线程数大,则执行poll()方法获取任务,超时时长和单位设置为以下参数
            //否则调用take()方法阻塞队列
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  1. allowCoreThreadTimeOut,这个变量默认值是false。wc>corePoolSize则是判断当前线程数是否大于corePoolSize。
  2. 如果当前线程数大于corePoolSize,则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime。如果超过keepAliveTime时长,poll返回了null,上边提到的while循序就会退出,线程也就执行完了。如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前队列。

所以总结一下流程如下图所示

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服