阻塞隊列會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素后,被阻塞的線程會自動被喚醒,不需要notify。這樣提供了極大的方便性。
一.幾種主要的阻塞隊列
自從java 1.5之后,在java.util.concurrent包下提供了若干個阻塞隊列,主要有以下幾個:
ArrayBlockingQueue:一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。 這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。 此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
LinkedBlockingQueue:一個基于已鏈接節點的、范圍任意的 blocking queue。此隊列按FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部是在隊列中時間最短的元素。新元素插入到隊列的尾部,并且隊列獲取操作會獲得位于隊列頭部的元素。鏈接隊列的吞吐量通常要高于基于數組的隊列,但是在大多數并發應用程序中,其可預知的性能要低。 可選的容量范圍構造方法參數作為防止隊列過度擴展的一種方法。如果未指定容量,則它等于 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入后會動態地創建鏈接節點。 此類及其迭代器實現 Collection 和 Iterator 接口的所有可選 方法。
PRiorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,一個無界阻塞隊列,它使用與類 PriorityQueue 相同的順序規則,并且提供了阻塞獲取操作。雖然此隊列邏輯上是無界的,但是資源被耗盡時試圖執行 add操作也將失敗(導致 OutOfMemoryError)。此類不允許使用 null元素。依賴自然順序的優先級隊列也不允許插入不可比較的對象(這樣做會導致拋出 ClassCastException)。 此類及其迭代器可以實現 Collection 和 Iterator 接口的所有可選 方法。iterator() 方法中提供的迭代器并不保證以特定的順序遍歷 PriorityBlockingQueue 的元素。如果需要有序地進行遍歷,則應考慮使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按優先級順序移除全部或部分元素,并將它們放在另一個 collection 中。 在此類上進行的操作不保證具有同等優先級的元素的順序。如果需要實施某一排序,那么可以定義自定義類或者比較器,比較器可使用修改鍵斷開主優先級值之間的聯系。
DelayQueue:Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿后保存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且 poll 將返回 null。當一個元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一個小于等于 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不允許使用 null 元素。 此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
二.阻塞隊列的實現原理
如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?JDK使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。
三.示例對比
示例一:使用阻塞隊列實現的生產者消費者模式package com.bh.block;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/**使用阻塞隊列實現的生產者消費者模式 * @author bh * */public class Test { private int queueSize = 10; /*一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。 隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。 */ private BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } /**消費者 * @author bh * */ class Consumer extends Thread{ @Override public void run() { consume(); } /** * 消費 */ private void consume() { while(true){ try { queue.take(); System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**生產者 * @author bh * */ class Producer extends Thread{ @Override public void run() { produce(); } /** * 生產 */ private void produce() { while(true){ try { queue.put(1); System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } }}示例二:使用非阻塞隊列實現的生產者消費者模式package com.bh.unblock;import java.util.PriorityQueue;import java.util.Queue;/**非阻塞隊列通過使用Object.wait()和Object.notify()方法實現生產者與消費者模式 * @author bh * */public class Test { private int queueSize = 10; //基于優先級堆的無界優先級隊列 private Queue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } /**消費者 * @author bh * */ class Consumer extends Thread{ @Override public void run() { consume(); } /**消費 */ private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("隊列空,等待數據"); //在其他線程調用此對象的 notify() 方法或 notifyAll() 方法前,導致當前線程等待 queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走隊首元素 queue.notify(); //喚醒在此對象監視器上等待的單個線程 System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素"); } } } } /**生產者 * @author bh * */ class Producer extends Thread{ @Override public void run() { produce(); } /**生產 */ private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("隊列滿,等待有空余空間"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一個元素 queue.notify(); System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size())); } } } }}新聞熱點
疑難解答