设计作品集模板/seo网络推广报价
问题描述
最近遇到自动任务停止的问题。用到的技术是Quartz、信号量、线程池。
先分析了信号量,排除嫌疑。
怀疑过Quartz的版本太低,还未开始求证。
那就分析线程池了。
一边猜测,一边百度,还受到一个不靠谱的博客误导。。。还以为是子线程的异常没有正确处理,导致整个任务阻塞了。
原谅我一年多前最初写的时候只是知道基础的用法,那时的我还停留在–我们不生产代码,我们只是代码的搬运工–级别。
现在遇到问题,尝试从源码去分析。
java.util.concurrent.AbstractExecutorService
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try { f.get(); }catch (CancellationException ignore) {}catch (ExecutionException ignore) {}}}return futures;} catch (Throwable t) {cancelAll(futures);throw t;}}
怎么从这里开始呢,因为我最初用的是for循环add future task ,再循环future.get。后来改成invokeAll方法,保证所有子线程执行完毕。以前真没点进去看过这个方法。
初看没什么特别,但是异常的处理,非特定异常出现,比如InterruptedException会中断后续的任务执行(cancelAll的含义)。
那什么异常不中止呢?敲黑板,划重点,注意中间的catch块中的ExecutionException。那就继续跟Future的get方法。
java.util.concurrent.FutureTask
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}
awaitDone等待任务执行完毕,任务的返回值或者报出异常放到该方法的返回值中。report根据当前任务的状态处理返回结果。
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
注意子线程的异常包装成ExecutionException。 结合上面最初的代码,也就是某个子线程的异常不会中断其它子线程的执行、正常返回。
以上分析排除子线程异常的影响,那么会不会是队列不够用了呢?
ThreadPoolExecutor
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
小于核心线程数,启动新的线程(addWorker),否则扔给workQueue。
我使用的是newFixedThreadPool创建线程池,默认的是LinkedBlockingQueue,这个队列的最大程度是2^31-1,即2147483647。
那需要看添加任务的方法。
ThreadPoolExecutor
// 截取不完整,现实的是该方法的部分
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}
& COUNT_MASK是个位与计算,限制最大值,那么这个值是多少呢,2^29-1,即536870911。
就反馈的情况,重启前有700笔卡在那里。嗯,看来不是队列不足这个问题。
总结
目前没啥成果,不过有人提出建设性的思路,可能是数据库链接数不够用了,导致的阻塞。后续再说。
补充
之前不知道为什么线程池没有任务了,为什么main方法还不退出(就是那个红色的小方块还在亮着)。现在加以说明:
ThreadPoolExecutor
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
这个代码放这,想说的是,以后可以继承ThreadPoolExecutor进行自定义前置和后置处理。
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}
注意addWorker(null, false);会加一个空的任务,应该是用来变避免线程状态变为终止的,具体没细看了。
这个图是想说明啥事也没有了,已启用的一个线程在那等待中。(这个是1个固定数目的线程池)
这个图是想说明中间有线程发生异常了,会废弃并接着new一个线程(从线程的名字上看),如果原来的线程正常结束,那么还继续使用这个线程。(这个是2个固定数目的线程池,第2个任务抛出异常)
不说了,睡脚。