麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

《java.util.concurrent 包源碼閱讀》15 線程池系列之ScheduledThreadPoolExecutor 第二部分

2019-11-14 20:58:41
字體:
供稿:網(wǎng)友
java.util.concurrent 包源碼閱讀》15 線程池系列之ScheduledThreadPoolExecutor 第二部分

這篇文章主要說說DelayedWorkQueue。

在ScheduledThreadPoolExecutor使用DelayedWorkQueue來存放要執(zhí)行的任務(wù),因為這些任務(wù)是帶有延遲的,而每次執(zhí)行都是取第一個任務(wù)執(zhí)行,因此在DelayedWorkQueue中任務(wù)必然按照延遲時間從短到長來進行排序的。

DelayedWorkQueue使用堆來實現(xiàn)的。

和以前分析BlockingQueue的實現(xiàn)類一樣,首先來看offer方法,基本就是一個添加元素到堆的邏輯。

        public boolean offer(Runnable x) {            if (x == null)                throw new NullPointerException();            RunnableScheduledFuture e = (RunnableScheduledFuture)x;            final ReentrantLock lock = this.lock;            lock.lock();            try {                int i = size;                // 因為元素時存儲在一個數(shù)組中,隨著堆變大,當(dāng)數(shù)組存儲不夠時,需要對數(shù)組擴容                if (i >= queue.length)                    grow();                size = i + 1;                // 如果原來隊列為空                if (i == 0) {                    queue[0] = e;                    // 這個i就是RunnableScheduledFuture用到的heapIndex                    setIndex(e, 0);                } else {                    // 添加元素到堆中                    siftUp(i, e);                }                // 如果隊列原先為空,那么可能有線程在等待元素,這時候既然添加了元                // 素,就需要通過Condition通知這些線程                if (queue[0] == e) {                    // 因為有元素新添加了,第一個等待的線程可以結(jié)束等待了,因此這里                    // 刪除第一個等待線程                    leader = null;                    available.signal();                }            } finally {                lock.unlock();            }            return true;        }

這里順帶看一下siftUp,熟悉堆的實現(xiàn)的朋友應(yīng)該很容易看懂這是一個把元素添加已有堆中的算法。

        PRivate void siftUp(int k, RunnableScheduledFuture key) {            while (k > 0) {                int parent = (k - 1) >>> 1;                RunnableScheduledFuture e = queue[parent];                if (key.compareTo(e) >= 0)                    break;                queue[k] = e;                setIndex(e, k);                k = parent;            }            queue[k] = key;            setIndex(key, k);        }

那么接著就看看poll:

        public RunnableScheduledFuture poll() {            final ReentrantLock lock = this.lock;            lock.lock();            try {                // 因為即使拿到任務(wù),線程還是需要等待,而這個等待過程是由隊列幫助完成的                // 因此poll方法只能返回已經(jīng)到執(zhí)行時間點的任務(wù)                RunnableScheduledFuture first = queue[0];                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)                    return null;                else                    return finishPoll(first);            } finally {                lock.unlock();            }        }

因為poll方法只能返回已經(jīng)到了執(zhí)行時間點的任務(wù),所以對于我們理解隊列如何實現(xiàn)延遲執(zhí)行沒有意義,因此重點看看take方法:

        public RunnableScheduledFuture take() throws InterruptedException {            final ReentrantLock lock = this.lock;            lock.lockInterruptibly();            try {                for (;;) {                    // 嘗試獲取第一個元素,如果隊列為空就進入等待                    RunnableScheduledFuture first = queue[0];                    if (first == null)                        available.await();                    else {                        // 獲取任務(wù)執(zhí)行的延遲時間                        long delay = first.getDelay(TimeUnit.NANOSECONDS);                        // 如果任務(wù)不用等待,立刻返回該任務(wù)給線程                        if (delay <= 0)                            // 從堆中拿走任務(wù)                            return finishPoll(first);                        // 如果任務(wù)需要等待,而且前面有個線程已經(jīng)等待執(zhí)行任務(wù)(leader線程                        // 已經(jīng)拿到任務(wù)了,但是執(zhí)行時間沒有到,延遲時間肯定是最短的),                        // 那么執(zhí)行take的線程肯定繼續(xù)等待,                        else if (leader != null)                            available.await();                        // 當(dāng)前線程的延遲時間是最短的情況,那么更新leader線程                        // 用Condition等待直到時間到點,被喚醒或者被中斷                        else {                            Thread thisThread = Thread.currentThread();                            leader = thisThread;                            try {                                available.awaitNanos(delay);                            } finally {                                // 重置leader線程以便進行下一次循環(huán)                                if (leader == thisThread)                                    leader = null;                            }                        }                    }                }            } finally {                // 隊列不為空發(fā)出signal很好理解,這里附帶了沒有l(wèi)eader線程                // 的條件是因為leader線程存在時表示leader線程正在等待執(zhí)行時間點的                // 到來,如果此時發(fā)出signal會觸發(fā)awaitNanos提前返回                if (leader == null && queue[0] != null)                    available.signal();                lock.unlock();            }        }

take方法的重點就是leader線程,因為存在延遲時間,即使拿到任務(wù),線程還是需要等待的,leader線程就那個最先執(zhí)行任務(wù)的線程。

因為線程拿到任務(wù)之后還是需要等待一段延遲執(zhí)行的時間,所以對于超時等待的poll方法來說就有點意思了:

        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)            throws InterruptedException {            long nanos = unit.toNanos(timeout);            final ReentrantLock lock = this.lock;            lock.lockInterruptibly();            try {                for (;;) {                    RunnableScheduledFuture first = queue[0];                    // 任務(wù)隊列為空的情況                    if (first == null) {                        // nanos小于等于0有兩種可能:                        // 1. 參數(shù)值設(shè)定                        // 2. 等待已經(jīng)超時                        if (nanos <= 0)                            return null;                        else                            // 等待一段時間,返回剩余的等待時間                            nanos = available.awaitNanos(nanos);                    } else {                        long delay = first.getDelay(TimeUnit.NANOSECONDS);                        if (delay <= 0)                            return finishPoll(first);                        if (nanos <= 0)                            return null;                        // leader線程存在并且nanos大于delay的情況下,                        // 依然等待nanos這么長時間,不用擔(dān)心會超過delay設(shè)定                        // 的時間點,因為leader線程到時間之后會發(fā)出signal                        // 喚醒線程,而那個時候顯然還沒有到delay設(shè)定的時間點                        if (nanos < delay || leader != null)                            nanos = available.awaitNanos(nanos);                        else {                            Thread thisThread = Thread.currentThread();                            leader = thisThread;                            try {                                long timeLeft = available.awaitNanos(delay);                                // 剩余的超時時間                                nanos -= delay - timeLeft;                            } finally {                                if (leader == thisThread)                                    leader = null;                            }                        }                    }                }            } finally {                if (leader == null && queue[0] != null)                    available.signal();                lock.unlock();            }        }

通過分析以上代碼基本上已經(jīng)理清楚了DelayedWorkQueue實現(xiàn)延遲執(zhí)行的原理:

1. 按照執(zhí)行延遲從短到長的順序把任務(wù)存儲到堆;

2. 通過leader線程讓拿到任務(wù)的線程等到規(guī)定的時間點再執(zhí)行任務(wù);


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 国产亚洲精品影达达兔 | 黄网站免费在线看 | 午夜激情视频网站 | 久久久久久久久久久久久久国产 | 妇女毛片 | 亚洲特黄妇女高潮 | 成人在线视频一区 | 看国产一级毛片 | 国产免费一区二区三区 | 黄色片网站在线看 | 一区二区久久精品66国产精品 | 欧美人禽| 国产精品久久国产精麻豆96堂 | 免费观看一级黄色片 | 13一14毛片免费看 | 国产一国产精品一级毛片 | 国产又白又嫩又紧又爽18p | 欧美成人性生活片 | 国产亚洲精品美女久久久 | 午夜精品小视频 | 国产日韩免费观看 | 午夜神马电影网 | 毛片电影网址 | 在线看小早川怜子av | 暖暖免费观看高清完整版电影 | 国产精品久久久久久影院8一贰佰 | 热99在线视频 | 国产一级毛片国语版 | 成年人在线免费播放视频 | 在线观看视频日本 | 黄色免费高清网站 | 亚洲第一黄色网 | h视频免费看 | 国产精品久久久久久久久久久久久久久久 | 久久久成人一区二区免费影院 | 精品视频一区二区三区四区 | 久草热久草视频 | 美女毛片在线观看 | 久久草在线观看视频 | 一区二区久久 | 嫩草影院在线观看网站成人 |