DelayQueue有序存儲Delayed類型或者子類型的對象,沒當從隊列中取走元素時,需要等待延遲耗完才會返回該對象。
所謂Delayed類型,因為需要比較,所以繼承了Comparable接口:
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit);}
其實Delayed對象的排序和延遲長短是無關的,因為Comparable的compare方法是用戶自己實現的,DelayQueue只是保證返回對象的延遲已經耗盡。
DelayQueue需要排序存儲Delayed類型的對象同時具備阻塞功能,但是阻塞的過程伴有延遲等待類型的阻塞,因此不能直接使用BlockingPRiorityQueue來實現,而是用非阻塞的版本的PriorityQueue來實現排序存儲。
private final PriorityQueue<E> q = new PriorityQueue<E>();
因此DelayQueue需要自己實現阻塞的功能(需要一個Condition):
private final Condition available = lock.newCondition();
老規矩還是先來看offer方法:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 如果原來隊列為空,重置leader線程,通知available條件 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
順便提一下,因為DelayQueue不限制長度,因此添加元素的時候不會因為隊列已滿產生阻塞,因此帶有超時的offer方法的超時設置是不起作用的:
public boolean offer(E e, long timeout, TimeUnit unit) { // 和不帶timeout的offer方法一樣 return offer(e); }
因為DelayQueue需要自己實現阻塞,因此關注的重點應該是兩個帶有阻塞的方法:沒有超時的take方法和帶有超時的poll方法。
普通poll方法很簡單,如果延遲時間沒有耗盡的話,直接返回null就可以了。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
接下來看take和帶timeout的poll方法,在看過DelayedWorkQueue之后這部分還是比較好理解的:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 如果隊列為空,需要等待available條件被通知 E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); // 如果延遲時間已到,直接返回第一個元素 if (delay <= 0) return q.poll(); // leader線程存在表示有其他線程在等待,那么當前線程肯定需要等待 else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; // 如果沒有leader線程,設置當前線程為leader線程 // 嘗試等待直到延遲時間耗盡(可能提前返回,那么下次 // 循環會繼續處理) try { available.awaitNanos(delay); } finally { // 如果leader線程還是當前線程,重置它用于下一次循環。 // 等待available條件時,鎖可能被其他線程占用從而導致 // leader線程被改變,所以要檢查 if (leader == thisThread) leader = null; } } } } } finally { // 如果沒有其他線程在等待,并且隊列不為空,通知available條件 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
再來看帶有timeout的poll方法,和DelayedWorkQueue非常相似:
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else // 嘗試等待available條件,記錄剩余的時間 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; // 當leader線程不為空時(此時delay>=nanos),等待的時間 // 似乎delay更合理,但是nanos也可以,因為排在當前線程前面的 // 其他線程返回時會喚醒available條件從而返回, // 這里使用nanos和nonas<delay合并更加簡單 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // nanos需要更新 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
前面理解了DelayedWorkQueue再來看DelayQueue就非常容易理解了。
新聞熱點
疑難解答