当前位置: 首页 > news >正文

乐清网站制作/外链发布工具下载

乐清网站制作,外链发布工具下载,蒙文网站建设,昆明做网站比较牛的java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的。其他的实现基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓。 其实要理解一个东西,一般地,我们最好是要抱着…

  java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的。其他的实现基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓。

  其实要理解一个东西,一般地,我们最好是要抱着自己的疑问或者理解去的。否则,往往收获甚微。

 

  理解 ThreadPoolExecutor, 我们可以先理解一个线程池的意义: 本质上是提供预先定义好的n个线程,供调用方直接运行任务的一个工具。

 

线程池解决的问题:

热门小说网 v1122.com

  1. 提高任务执行的响应速度,降低资源消耗。任务执行时,直接立即使用线程池提供的线程运行,避免了临时创建线程的CPU/内存开销,达到快速响应的效果。

  2. 提高线程的可管理性。线程总数可预知,避免用户主动创建无限多线程导致死机风险,还可以进行线程统一的分配、调优和监控。

  3. 避免对资源的过度使用。在超出预期的请求任务情况,响应策略可控。

 

线程池提供的核心接口:

  要想使用线程池,自然是要理解其接口的。一般我们使用 ExecotorService 进行线程池的调用。然而,我们并不针对初学者。

  整体的接口如下:

 

   我们就挑几个常用接口探讨下:

    submit(Runnable task): 提交一个无需返回结果的任务。
    submit(Callable<T> task): 提交一个有返回结果的任务。
    invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同时执行n个任务并返回结果列表。
    shutdown(): 关闭线程程池。
    awaitTermination(long timeout, TimeUnit unit): 等待关闭结果,最长不超过timeout时间。

 

以上是ThreadPoolExector 提供的特性,针对以上特性。

我们应该要有自己的几个实现思路或疑问:

  1. 线程池如何接受任务?

  2. 线程如何运行任务?

  3. 线程池如何关闭?

 

接下来,就让我们带着疑问去看实现吧。

ThreadPoolExecutor 核心实现原理

1. 线程池的处理流程

  我们首先重点要看的是,如何执行提交的任务。我可以通过下图来看看。

 

   总结描述下就是:

    1. 判断核心线程池是否已满,如果不是,则创建线程执行任务
    2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
    3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
    4. 如果线程池也满了,则按照拒绝策略对任务进行处理

 

  另外,我们来看一下 ThreadPoolExecutor 的构造方法,因为这里会体现出每个属性的含义。

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

  从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:

    corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该sizemaximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位workQueue: 阻塞队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费;  ArrayBlockingQueue: 构造函数一定要传大小LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。PriorityBlockingQueue: 优先队列threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略AbortPolicy: 直接抛弃(默认)CallerRunsPolicy: 用调用者的线程执行任务DiscardOldestPolicy: 抛弃队列中最久的任务DiscardPolicy: 抛弃当前任务

 

2. submit 流程详解

  当调用 submit 方法,就是向线程池中提交一个任务,处理流程如步骤1所示。但是我们需要更深入理解。

  submit 方法是定义在 AbstractExecutorService 中,最终调用 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的应用。

    // java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();// 封装任务和返回结果为 RunnableFuture, 统一交由具体的子类执行RunnableFuture<T> ftask = newTaskFor(task, result);// execute 将会调用 ThreadPoolExecutor 的实现,是我们讨论的重要核心
        execute(ftask);return ftask;}// FutureTask 是个重要的线程池组件,它承载了具体的任务执行流/*** Returns a {@code RunnableFuture} for the given runnable and default* value.** @param runnable the runnable task being wrapped* @param value the default value for the returned future* @param <T> the type of the given value* @return a {@code RunnableFuture} which, when run, will run the* underlying runnable and which, as a {@code Future}, will yield* the given value as its result and provide for cancellation of* the underlying task* @since 1.6*/protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}// ThreadPoolExecutor 的任务提交过程// java.util.concurrent.ThreadPoolExecutor#execute/*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// ctl 是一个重要的控制全局状态的数据结构,定义为一个线程安全的 AtomicInteger// ctl = new AtomicInteger(ctlOf(RUNNING, 0));int c = ctl.get();// 当还没有达到核心线程池的数量时,直接添加1个新线程,然后让其执行任务即可if (workerCountOf(c) < corePoolSize) {// 2.1. 添加新线程,且执行command任务// 添加成功,即不需要后续操作了,添加失败,则说明外部环境变化了if (addWorker(command, true))return;c = ctl.get();}// 当核心线程达到后,则尝试添加到阻塞队列中,具体添加方法由阻塞队列实现// isRunning => c < SHUTDOWN;if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 2.2. 添加队列成功后,还要再次检测线程池的运行状态,决定启动线程或者状态过期// 2.2.1. 当线程池已关闭,则将刚刚添加的任务移除,走reject策略if (! isRunning(recheck) && remove(command))reject(command);// 2.2.2. 当一个worker都没有时,则添加workerelse if (workerCountOf(recheck) == 0)addWorker(null, false);}// 当队列满后,则直接再创建新的线程运行,如果不能再创建线程了,则 rejectelse if (!addWorker(command, false))// 2.3. 拒绝策略处理
            reject(command);}

  通过上面这一小段代码,我们就已经完整地看到了。通过一个 ctl 变量进行全局状态控制,从而保证了线程安全性。整个框架并没有使用锁,但是却是线程安全的。

  整段代码刚好完整描述了线程池的执行流程:

    1. 判断核心线程池是否已满,如果不是,则创建线程执行任务;
    2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;
    3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;
    4. 如果线程池也满了,则按照拒绝策略对任务进行处理;

 

2.1. 添加新的worker

  一个worker,即是一个工作线程。

    /*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked.  If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {// 为确保线程安全,进行CAS反复重试
        retry:for (;;) {int c = ctl.get();// 获取runState , c 的高位存储// c & ~CAPACITY;int rs = runStateOf(c);// Check if queue empty only if necessary.// 已经shutdown, firstTask 为空的添加并不会成功if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 如果超出最大允许创建的线程数,则直接失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS 更新worker+1数,成功则说明占位成功退出retry,后续的添加操作将是安全的,失败则说明已有其他线程变更该值if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl// runState 变更,则退出到 retry 重新循环 if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop
            }}// 以下为添加 worker 过程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 使用 Worker 封闭 firstTask 任务,后续运行将由 Worker 接管w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 添加 worker 的过程,需要保证线程安全
                mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// SHUTDOWN 情况下还是会创建 Worker, 但是后续检测将会失败if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 既然是新添加的线程,就不应该是 alive 状态if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers 只是一个工作线程的容器,使用 HashSet 承载// private final HashSet<Worker> workers = new HashSet<Worker>();
                        workers.add(w);int s = workers.size();// 维护一个全局达到过的最大线程数计数器if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// worker 添加成功后,进行将worker启起来,里面应该是有一个 死循环,一直在获取任务// 不然怎么运行添加到队列里的任务呢?if (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果任务启动失败,则必须进行清理,返回失败if (! workerStarted)addWorkerFailed(w);}return workerStarted;}// 大概添加 worker 的框架明白了,重点对象是 Worker, 我们稍后再讲// 现在先来看看,添加失败的情况,如何进行/*** Rolls back the worker thread creation.* - removes worker from workers, if present* - decrements worker count* - rechecks for termination, in case the existence of this*   worker was holding up termination*/private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);// ctl 中的 workerCount - 1 , CAS 实现
            decrementWorkerCount();// 尝试处理空闲线程
            tryTerminate();} finally {mainLock.unlock();}}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}// 停止可能启动的 worker/*** Transitions to TERMINATED state if either (SHUTDOWN and pool* and queue empty) or (STOP and pool empty).  If otherwise* eligible to terminate but workerCount is nonzero, interrupts an* idle worker to ensure that shutdown signals propagate. This* method must be called following any action that might make* termination possible -- reducing worker count or removing tasks* from the queue during shutdown. The method is non-private to* allow access from ScheduledThreadPoolExecutor.*/final void tryTerminate() {for (;;) {int c = ctl.get();// 线程池正在运行、正在清理、已关闭但队列还未处理完,都不会进行 terminate 操作if (isRunning(c) ||// c >= TIDYINGrunStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminate// 停止线程的两个方式之一,只中断一个 worker
                interruptIdleWorkers(ONLY_ONE);return;}// 以下为整个线程池的后置操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 设置正在清理标识if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 线程池已终止的钩子方法,默认实现为空
                        terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));// 此处 termination 为唤醒等待关闭的线程
                        termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS
        }}/*** Interrupts threads that might be waiting for tasks (as* indicated by not being locked) so they can check for* termination or configuration changes. Ignores* SecurityExceptions (in which case some threads may remain* uninterrupted).** @param onlyOne If true, interrupt at most one worker. This is* called only from tryTerminate when termination is otherwise* enabled but there are still other workers.  In this case, at* most one waiting worker is interrupted to propagate shutdown* signals in case all threads are currently waiting.* Interrupting any arbitrary thread ensures that newly arriving* workers since shutdown began will also eventually exit.* To guarantee eventual termination, it suffices to always* interrupt only one idle worker, but shutdown() interrupts all* idle workers so that redundant workers exit promptly, not* waiting for a straggler task to finish.*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 迭代所有 workerfor (Worker w : workers) {Thread t = w.thread;// 获取到 worker 的锁之后,再进行 interruptif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}// 只中断一个 worker, 立即返回, 不保证 interrupt 成功if (onlyOne)break;}} finally {mainLock.unlock();}}

 

2.2. 当添加队列成功后,发现线程池状态变更,需要进行移除队列操作

    /*** Removes this task from the executor's internal queue if it is* present, thus causing it not to be run if it has not already* started.** <p>This method may be useful as one part of a cancellation* scheme.  It may fail to remove tasks that have been converted* into other forms before being placed on the internal queue. For* example, a task entered using {@code submit} might be* converted into a form that maintains {@code Future} status.* However, in such cases, method {@link #purge} may be used to* remove those Futures that have been cancelled.** @param task the task to remove* @return {@code true} if the task was removed*/public boolean remove(Runnable task) {// 此移除不一定能成功boolean removed = workQueue.remove(task);// 上面已经看过,它会尝试停止一个 worker 线程tryTerminate(); // In case SHUTDOWN and now emptyreturn removed;}

 

3. 添加失败进行执行拒绝策略

    /*** Invokes the rejected execution handler for the given command.* Package-protected for use by ScheduledThreadPoolExecutor.*/final void reject(Runnable command) {// 拒绝策略是在构造方法时传入的,默认为 RejectedExecutionHandler// 即用户只需实现 rejectedExecution 方法,即可以自定义拒绝策略了handler.rejectedExecution(command, this);}

 

4. Worker 的工作机制

  从上面的实现中,我们可以看到,主要是对 Worker 的添加和 workQueue 的添加,所以具体的工作是由谁完成呢?自然就是 Worker 了。

        // Worker 的构造方法,主要是接受一个 task, 可以为 null, 如果非null, 将在不久的将来被执行// private final class Worker extends AbstractQueuedSynchronizer implements Runnable/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 将 Worker 自身当作一个 任务,绑定到 worker.thread 中// thread 启动时,worker 就启动了this.thread = getThreadFactory().newThread(this);}// Worker 的主要工作实现,通过一个循环扫描实现        /** Delegates main run loop to outer runWorker  */public void run() {// 调用 ThreadPoolExecutor 外部实现的 runWorker 方法runWorker(this);}/*** Main worker run loop.  Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters.  Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler).  Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 不停地从 workQueue 中获取任务,然后执行,就是这么个逻辑// getTask() 会阻塞式获取,所以 Worker 往往不会立即退出 while (task != null || (task = getTask()) != null) {// 执行过程中是不允许并发的,即同时只能一个 task 在运行,此时也不允许进行 interrupt
                w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 检测是否已被线程池是否停止 或者当前 worker 被中断// STOP = 1 << COUNT_BITS;if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断信息传递
                    wt.interrupt();try {// 任务开始前 切点,默认为空执行
                    beforeExecute(wt, task);Throwable thrown = null;try {// 直接调用任务的run方法, 具体的返回结果,会被 FutureTask 封装到 某个变量中// 可以参考以前的文章 (FutureTask是怎样获取到异步执行结果的? https://www.cnblogs.com/yougewe/p/11666284.html
                        task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 任务开始后 切点,默认为空执行
                        afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// 正常退出,有必要的话,可能重新将 Worker 添加进来completedAbruptly = false;} finally {// 处理退出后下一步操作,可能重新添加 Worker
            processWorkerExit(w, completedAbruptly);}}/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit.  This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** @param w the worker* @param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {// 在 Worker 正常退出的情况下,检查是否超时导致,维持最小线程数if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果满足最小线程要求,则直接返回if (workerCountOf(c) >= min)return; // replacement not needed
            }// 否则再添加一个Worker到线程池中备用// 非正常退出,会直接再添加一个WorkeraddWorker(null, false);}}/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to*    a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out*    workers are subject to termination (that is,*    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})*    both before and after the timed wait, and if the queue is*    non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case*         workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 如果进行了 shutdown, 且队列为空, 则需要将 worker 退出if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// do {} while (! compareAndDecrementWorkerCount(ctl.get()));
                decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 线程数据大于最大允许线程,需要删除多余的 Workerif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果开户了超时删除功能,则使用 poll, 否则使用 take() 进行阻塞获取Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 获取到任务,则可以进行执行了if (r != null)return r;// 如果有超时设置,则会在下一循环时退出timedOut = true;}// 忽略中断异常// 在这种情况下,Worker如何响应外部的中断请求呢??? 思考catch (InterruptedException retry) {timedOut = false;}}}

  所以,Worker的作用就体现出来了,一个循环取任务执行任务过程:

    1. 有一个主循环一直进行任务的获取;
    2. 针对有超时的设置,会使用poll进行获取任务,如果超时,则 Worker 将会退出循环结束线程;
    3. 无超时的设置,则会使用 take 进行阻塞式获取,直到有值;
    4. 获取任务执行前置+业务+后置任务;
    5. 当获取到null的任务之后,当前Worker将会结束;
    6. 当前Worker结束后,将会判断是否有必要维护最低Worker数,从而决定是否再添加Worker进来。

  还是借用一个网上同学比较通用的一个图来表述下 Worker/ThreadPoolExecutor 的工作流程吧(已经很完美,不需要再造这轮子了)

 

5. shutdown 操作实现

  ThreadPoolExecutor 是通过 ctl 这个变量进行全局状态维护的,shutdown 在线程池中也是表现为一个状态,所以应该是比较简单的。

    /*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution.  Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {// 为保证线程安全,使用 mainLockfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// SecurityManager 检查
            checkShutdownAccess();// 设置状态为 SHUTDOWN
            advanceRunState(SHUTDOWN);// 中断空闲的 Worker, 即相当于依次关闭每个空闲线程
            interruptIdleWorkers();// 关闭钩子,默认实现为空操作,为方便子类实现自定义清理功能onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}//
        tryTerminate();}/*** Transitions runState to given target, or leaves it alone if* already at least the given target.** @param targetState the desired state, either SHUTDOWN or STOP*        (but not TIDYING or TERMINATED -- use tryTerminate for that)*/private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();// 自身CAS更新成功或者被其他线程更新成功if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}// 关闭空闲线程(非 running 状态)/*** Common form of interruptIdleWorkers, to avoid having to* remember what the boolean argument means.*/private void interruptIdleWorkers() {// 上文已介绍, 此处 ONLY_ONE 为 false, 即是最大可能地中断所有 WorkerinterruptIdleWorkers(false);}与 shutdown 对应的,有一个 shutdownNow, 其语义是 立即停止所有任务。/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution. These tasks are drained (removed)* from the task queue upon return from this method.** <p>This method does not wait for actively executing tasks to* terminate.  Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks.  This implementation* cancels tasks via {@link Thread#interrupt}, so any task that* fails to respond to interrupts may never terminate.** @throws SecurityException {@inheritDoc}*/public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 与 shutdown 的差别,设置的状态不一样
            advanceRunState(STOP);// 强行中断线程
            interruptWorkers();// 将未完成的任务返回tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}/*** Interrupts all threads, even if active. Ignores SecurityExceptions* (in which case some threads may remain uninterrupted).*/private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)// 调用 worker 的提供的中断方法
                w.interruptIfStarted();} finally {mainLock.unlock();}}// ThreadPoolExecutor.Worker#interruptIfStartedvoid interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 直接调用任务的 interrupt
                    t.interrupt();} catch (SecurityException ignore) {}}}

 

6. invokeAll 的实现方式

  invokeAll, 望文生义,即是调用所有给定的任务。想来应该是一个个地添加任务到线程池队列吧。

    // invokeAll 的方法直接在抽象方便中就实现了,它的语义是同时执行n个任务,并同步等待结果返回// java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 依次调用各子类的实现,添加任务
                execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 依次等待执行结果
                        f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}

  实现很简单,都是些外围调用。

 

7. ThreadPoolExecutor 的状态值的设计

  通过上面的过程,可以看到,整个ThreadPoolExecutor 非状态的依赖是非常强的。所以一个好的状态值的设计就显得很重要了,runState 代表线程池或者 Worker 的运行状态。如下:

    // runState is stored in the high-order bits// 整个状态使值使用 ctl 的高三位值进行控制, COUNT_BITS=29// 1110 0000 0000 0000private static final int RUNNING    = -1 << COUNT_BITS;// 0000 0000 0000 0000private static final int SHUTDOWN   =  0 << COUNT_BITS;// 0010 0000 0000 0000private static final int STOP       =  1 << COUNT_BITS;// 0100 0000 0000 0000private static final int TIDYING    =  2 << COUNT_BITS;// 0110 0000 0000 0000private static final int TERMINATED =  3 << COUNT_BITS;// 整个状态值的大小顺序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED// 而低 29位,则用来保存 worker 的数量,当worker增加时,只要将整个 ctl 增加即可。// 0001 1111 1111 1111, 即是最大的 worker 数量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 整个 ctl 描述为一个 AtomicInteger, 功能如下:/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields*   workerCount, indicating the effective number of threads*   runState,    indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop.  The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:**   RUNNING:  Accept new tasks and process queued tasks*   SHUTDOWN: Don't accept new tasks, but process queued tasks*   STOP:     Don't accept new tasks, don't process queued tasks,*             and interrupt in-progress tasks*   TIDYING:  All tasks have terminated, workerCount is zero,*             the thread transitioning to state TIDYING*             will run the terminated() hook method*   TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** RUNNING -> SHUTDOWN*    On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP*    On invocation of shutdownNow()* SHUTDOWN -> TIDYING*    When both queue and pool are empty* STOP -> TIDYING*    When pool is empty* TIDYING -> TERMINATED*    When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 

8. awaitTermination 等待关闭完成

  从上面的 shutdown, 可以看到,只是写了 SHUTDOWN 标识后,尝试尽可能地中断停止Worker线程,但并不保证中断成功。要想保证停止完成,需要有另外的机制来保证。从 awaitTermination 的语义来说,它是能保证任务停止完成的,那么它是如何保证的呢?

    // ThreadPoolExecutor.awaitTermination()public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {// 只是循环 ctl 状态, 只要 状态为 TERMINATED 状态,则说明已经关闭成功// 此处 termination 的状态触发是在 tryTerminate 中触发的if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}

  看起来, awaitTermination 并没有什么特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行发生的动作。

  那是在哪里做的操作呢?其实是在获取任务的时候,会检测当前状态是否是 SHUTDOWN, 如果是SHUTDOWN且 队列为空,则会触发获取任务的返回null.从而结束当前 Worker. 

  Worker 在结束前会调用 processWorkerExit() 方法,里面会再次调用 tryTerminate(), 当所有 Worker 都运行到这个点后, awaitTermination() 就会收到通知了。(注意: processWorkerExit() 会在每次运行后进行 addWorker() 尝试,但是在 SHUTDOWN 状态的添加操作总是失败的,所以不用考虑)

 

  到此,你是否可以解答前面的几个问题了呢?

 

  

http://www.jmfq.cn/news/5204899.html

相关文章:

  • 网站结构怎么分析/电子商务说白了就是干什么的
  • 在凡科做的网站怎么推广/企业网站搜索优化网络推广
  • 如何搭建公司内部网站/搜索引擎优化工具有哪些
  • 深圳专业做网站电话/百度打广告多少钱
  • 学校门户网站建设方案/湖南长沙疫情最新情况
  • 做APP必须要有网站么/seo海外推广
  • 哈尔滨专业网站制作设计/shopify seo
  • 网站备案可以强制撤销吗/百度投放广告流程
  • 网站设计培训成都/亚马逊alexa
  • 外贸做的社交网站/关键词排名点击
  • 自己做的网站如何联网/百度竞价推广课程
  • 上的网站app/百度推广工具
  • 企业管理软件销售好做吗/网站是怎么优化推广的
  • 网站域名在哪里注册/百度关键词搜索趋势
  • xsl做书店网站/永久开源的免费建站系统
  • 网站建设论文伯乐在线/代写文案的软件
  • 张家港市建设局网站/郑州网站推广报价
  • 大学做网站是什么专业/百度公司怎么样
  • 微网站开发公司/中国营销网
  • 网站制作的动画怎么做的/网站seo链接购买
  • 做虚假网站犯法吗/产品软文是什么意思
  • 网站设计制作一条龙免费/百度地图优化排名方法
  • 青岛网站建设费用/淘宝seo关键词的获取方法有哪些
  • 广东省住房及建设厅官方网站/应用商店优化
  • 网站建设柚子网络科技联系方式/百度投诉电话客服24小时
  • 做的网站手机打不开怎么回事啊/济南seo排名优化推广
  • erp软件是干嘛的/5g网络优化
  • 营销创意网站/目前病毒的最新情况
  • 社区网站建设公司/100个商业经典案例
  • 石家庄seo网络推广/优化大师客服电话