先來看ThreadPoolExecutor的execute方法,這個方法能體現出一個Task被加入到線程池之后都發生了什么:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果運行中的worker線程數少于設定的常駐線程數,增加worker線程,把task分配給新建的worker線程 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果任務可以被加入到任務隊列中,即等待的任務數還在允許的范圍內, // 再次檢查線程池是否被關閉,如果關閉的話,則移除任務并拒絕該任務 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任務數超過了現有worker線程的承受范圍,嘗試新建worker線程 // 如果無法添加新的worker線程,則會拒絕該任務 else if (!addWorker(command, false)) reject(command); }
在執行任務時,需要經常檢查線程池的狀態,那么接下來說說線程池是如何進行狀態控制的。上面的代碼有個成員變量叫做ctl,它用于標記線程池狀態和worker線程的數量,是一個AutomaticInteger對象。
PRivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一個32位的整數,最高的3位表示狀態:
111為running,
000為shutdown,
001為stop,
010為tidying,
011為ternimated。
因此狀態值就是這三位加上29個0,因此running的值是個負整數(最高位為1),其他狀態都是正整數,后面判斷狀態會比較值的大小時會用到這點。
剩下的29位表示worker線程的數量(因此最大允許的線程數就是2的29方減1)。
這里是說說這幾個狀態的意義,這幾個狀態發生的順序正好就是上面列出的順序:
running表示正常運行狀態
shutdown狀態意味著發出了一個shutdown信號,類似于你點擊了windows的關機按鈕
stop表示shutdown信號收到,等于windows響應了這個信號,發出正在關機的信息
tidying發生在stop之后做出的響應,表示這個時候在清理一些資源,
ternimated發生在tidying完成之后,表示關閉完成。
接著來看添加一個worker線程時都發生了什么:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 返回false的情況: // 1. rs>shutdown,即shutdown和running以外的狀態 // 2. shutdown的狀態 // 1)firstTask不為null,即有task分配 // 2)沒有task,但是workQueue(等待任務隊列)為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 1. 如果沒有設定線程數的限制,worker線程數不能大于最大值(2的29次方-1) // 2. 如果是固定尺寸的線程池,不能大于固定尺寸 // 3. 如果是可擴展的線程池,不能大于規定的線程數的上限 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 用CAS操作增加線程數量,如果失敗,重新循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; loop } } // 新建worker線程 Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 檢查以下任一狀態是否出現: // 1. 創建線程失敗 // 2. rs>shutdown,即shutdown和running以外的狀態 // 3. rs==shutdown,有任務分配 if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); // 這里考慮一種極少出現的情況,如果worker線程調用start沒有完成時, // 線程池進入Stop狀態,這個時候會調用Thread#interrupt中斷每個 // worker線程,但是 interrupt對沒有start的線程不一定起作用,這樣 // 就會漏掉了對這個thread的interrupt,因此在worker線程start之后 // 檢查以下,如果stop了,而這個線程卻沒有被interrupt,補上這個漏掉 // 的interrupt。 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
這篇文章主要講線程池如何處理任務,下一篇文章將會講worker線程是如何工作的。
新聞熱點
疑難解答