這篇文章主要說說DelayedWorkQueue。
在ScheduledThreadPoolExecutor使用DelayedWorkQueue來存放要執(zhí)行的任務(wù),因為這些任務(wù)是帶有延遲的,而每次執(zhí)行都是取第一個任務(wù)執(zhí)行,因此在DelayedWorkQueue中任務(wù)必然按照延遲時間從短到長來進行排序的。
DelayedWorkQueue使用堆來實現(xiàn)的。
和以前分析BlockingQueue的實現(xiàn)類一樣,首先來看offer方法,基本就是一個添加元素到堆的邏輯。
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture e = (RunnableScheduledFuture)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 因為元素時存儲在一個數(shù)組中,隨著堆變大,當(dāng)數(shù)組存儲不夠時,需要對數(shù)組擴容 if (i >= queue.length) grow(); size = i + 1; // 如果原來隊列為空 if (i == 0) { queue[0] = e; // 這個i就是RunnableScheduledFuture用到的heapIndex setIndex(e, 0); } else { // 添加元素到堆中 siftUp(i, e); } // 如果隊列原先為空,那么可能有線程在等待元素,這時候既然添加了元 // 素,就需要通過Condition通知這些線程 if (queue[0] == e) { // 因為有元素新添加了,第一個等待的線程可以結(jié)束等待了,因此這里 // 刪除第一個等待線程 leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
這里順帶看一下siftUp,熟悉堆的實現(xiàn)的朋友應(yīng)該很容易看懂這是一個把元素添加已有堆中的算法。
PRivate void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
那么接著就看看poll:
public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 因為即使拿到任務(wù),線程還是需要等待,而這個等待過程是由隊列幫助完成的 // 因此poll方法只能返回已經(jīng)到執(zhí)行時間點的任務(wù) RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } }
因為poll方法只能返回已經(jīng)到了執(zhí)行時間點的任務(wù),所以對于我們理解隊列如何實現(xiàn)延遲執(zhí)行沒有意義,因此重點看看take方法:
public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 嘗試獲取第一個元素,如果隊列為空就進入等待 RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { // 獲取任務(wù)執(zhí)行的延遲時間 long delay = first.getDelay(TimeUnit.NANOSECONDS); // 如果任務(wù)不用等待,立刻返回該任務(wù)給線程 if (delay <= 0) // 從堆中拿走任務(wù) return finishPoll(first); // 如果任務(wù)需要等待,而且前面有個線程已經(jīng)等待執(zhí)行任務(wù)(leader線程 // 已經(jīng)拿到任務(wù)了,但是執(zhí)行時間沒有到,延遲時間肯定是最短的), // 那么執(zhí)行take的線程肯定繼續(xù)等待, else if (leader != null) available.await(); // 當(dāng)前線程的延遲時間是最短的情況,那么更新leader線程 // 用Condition等待直到時間到點,被喚醒或者被中斷 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { // 重置leader線程以便進行下一次循環(huán) if (leader == thisThread) leader = null; } } } } } finally { // 隊列不為空發(fā)出signal很好理解,這里附帶了沒有l(wèi)eader線程 // 的條件是因為leader線程存在時表示leader線程正在等待執(zhí)行時間點的 // 到來,如果此時發(fā)出signal會觸發(fā)awaitNanos提前返回 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
take方法的重點就是leader線程,因為存在延遲時間,即使拿到任務(wù),線程還是需要等待的,leader線程就那個最先執(zhí)行任務(wù)的線程。
因為線程拿到任務(wù)之后還是需要等待一段延遲執(zhí)行的時間,所以對于超時等待的poll方法來說就有點意思了:
public RunnableScheduledFuture poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; // 任務(wù)隊列為空的情況 if (first == null) { // nanos小于等于0有兩種可能: // 1. 參數(shù)值設(shè)定 // 2. 等待已經(jīng)超時 if (nanos <= 0) return null; else // 等待一段時間,返回剩余的等待時間 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; // leader線程存在并且nanos大于delay的情況下, // 依然等待nanos這么長時間,不用擔(dān)心會超過delay設(shè)定 // 的時間點,因為leader線程到時間之后會發(fā)出signal // 喚醒線程,而那個時候顯然還沒有到delay設(shè)定的時間點 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // 剩余的超時時間 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
通過分析以上代碼基本上已經(jīng)理清楚了DelayedWorkQueue實現(xiàn)延遲執(zhí)行的原理:
1. 按照執(zhí)行延遲從短到長的順序把任務(wù)存儲到堆;
2. 通過leader線程讓拿到任務(wù)的線程等到規(guī)定的時間點再執(zhí)行任務(wù);
新聞熱點
疑難解答