最近在學習java中自帶的JDK并發(fā)包,java.util.concurrent,發(fā)現(xiàn)功能很強大,其中之一就是工作中多次用到的線程工具類BlockingQueue。在實際開發(fā)工作和面試過程中,經(jīng)常會考察對于該工具類的使用和理解。
BlockingQueue最終會有四種狀況,拋出異常、返回特殊值、阻塞、超時,下表總結(jié)了這些方法:
拋出異常 | 特殊值 | 阻塞 | 超時 | |
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
BlockingQueue是個接口,有如下實現(xiàn)類:
1. ArrayBlockQueue:一個由數(shù)組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。創(chuàng)建其對象必須明確大小,像數(shù)組一樣。
2. LinkedBlockQueue:一個可改變大小的阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。創(chuàng)建其對象如果沒有明確大小,默認值是Integer.MAX_VALUE。鏈接隊列的吞吐量通常要高于基于數(shù)組的隊列,但是在大多數(shù)并發(fā)應用程序中,其可預知的性能要低。
3. PRiorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)所帶的Comparator決定的順序。
4. SynchronousQueue:同步隊列。同步隊列沒有任何容量,每個插入必須等待另一個線程移除,反之亦然。
由于LinkedBlockingQueue實現(xiàn)是線程安全的,實現(xiàn)了先進先出等特性,是作為生產(chǎn)者消費者的首選,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
package cn.thread;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;/** * 多線程模擬實現(xiàn)生產(chǎn)者/消費者模型 * */public class BlockingQueueTest2 { /** * * 定義裝蘋果的籃子 * */ public class Basket { // 籃子,能夠容納3個蘋果 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 生產(chǎn)蘋果,放入籃子 public void produce() throws InterruptedException { // put方法放入一個蘋果,若basket滿了,等到basket有位置 basket.put("An apple"); } // 消費蘋果,從籃子中取走 public String consume() throws InterruptedException { // take方法取出一個蘋果,若basket為空,等到basket有蘋果為止(獲取并移除此隊列的頭部) return basket.take(); } } // 定義蘋果生產(chǎn)者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生產(chǎn)蘋果 System.out.println("生產(chǎn)者準備生產(chǎn)蘋果:" + instance); basket.produce(); System.out.println("!生產(chǎn)者生產(chǎn)蘋果完畢:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定義蘋果消費者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消費蘋果 System.out.println("消費者準備消費蘋果:" + instance); System.out.println(basket.consume()); System.out.println("!消費者消費蘋果完畢:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一個裝蘋果的籃子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生產(chǎn)者001", basket); Producer producer2 = test.new Producer("生產(chǎn)者002", basket); Consumer consumer = test.new Consumer("消費者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序運行5s后,所有任務停止// try {// Thread.sleep(1000 * 5);// } catch (InterruptedException e) {// e.printStackTrace();// }// service.shutdownNow(); }}
新聞熱點
疑難解答