多線程的難點主要就是多線程通信協(xié)作這一塊了,前面筆記二中提到了常見的同步方法,這里主要是進行實例學習了,今天總結(jié)了一下3個實例:
1、銀行存款與提款多線程實現(xiàn),使用Lock鎖和條件Condition。 附加 : 用監(jiān)視器進行線程間通信
2、生產(chǎn)者消費者實現(xiàn),使用LinkedList自寫緩沖區(qū)。
3、多線程之阻塞隊列學習,用阻塞隊列快速實現(xiàn)生產(chǎn)者消費者模型。 附加:用布爾變量關(guān)閉線程
在三種線程同步方法中,我們這里的實例用Lock鎖來實現(xiàn)變量同步,因為它比較靈活直觀。
實現(xiàn)了變量的同步,我們還要讓多個線程之間進行“通話”,就是一個線程完成了某個條件之后,告訴其他線程我完成了這個條件,你們可以行動了。下面就是java提供的條件接口Condition定義的同步方法:
很方便的是,java的Lock鎖里面提供了newConditon()方法可以,該方法返回:一個綁定了lock鎖的Condition實例,有點抽象,其實把它看作一個可以發(fā)信息的鎖就可以了,看后面的代碼,應(yīng)該就能理解了。
1、銀行存款與提款多線程實現(xiàn)。我們模擬ATM機器存款與提款,創(chuàng)建一個賬戶類Account(),該類包含同步方法:
存款方法:deposit()
提款方法:withdraw()
以及一個普通的查詢余額的方法getbalance().
我們創(chuàng)建兩個任務(wù)線程,分別調(diào)用兩個同步方法,進行模擬操作,看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ThreadCoOperation {PRivate static Account account = new Account(); public static void main(String[] args){//創(chuàng)建線程池ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new DepositTask());executor.execute(new WithdrawTask());}//存錢public static class DepositTask implements Runnable{@Overridepublic void run() {try {while(true){account.deposit((int)(Math.random()*1000)+1);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class WithdrawTask implements Runnable{@Overridepublic void run() {try{while(true){account.withdraw((int)(Math.random()*1000)+1);Thread.sleep(500);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Account{//一個鎖是一個Lock接口的實例 它定義了加鎖和釋放鎖的方法 ReentrantLock是為創(chuàng)建相互排斥的鎖的Lock的具體實現(xiàn)private static Lock lock = new ReentrantLock();//創(chuàng)建一個condition,具有發(fā)通知功能的鎖,前提是要實現(xiàn)了lock接口private static Condition newDeposit = lock.newCondition();private int balance = 0;public int getBalance(){return balance;}public void withdraw(int amount){lock.lock();try {while(balance < amount){System.out.println("/t/t錢不夠,等待存錢");newDeposit.await();}balance -= amount;System.out.println("/t/t取出"+amount+"塊錢/t剩余"+getBalance());} catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}public void deposit(int amount){lock.lock();try{balance+=amount;System.out.println("存入"+amount+"塊錢");newDeposit.signalAll(); //發(fā)信息喚醒所有的線程}finally{lock.unlock();}}}}
運行截圖
分析:
1、程序中需要注意的:創(chuàng)建一個condition,具有發(fā)通知功能的鎖,前提是要實現(xiàn)了lock接口。
2、while(balance < amount)不能改用if判斷,用if會使得線程不安全,使用if會不會進行循環(huán)驗證,而while會,我們經(jīng)常看到while(true),但是不會經(jīng)常看到if(true).
3、調(diào)用了await方法后,要記得使用signalAll()或者signal()將線程喚醒,否則線程永久等待。
最后再來分析一下這個類的結(jié)構(gòu),有3個類,兩個靜態(tài)任務(wù)類實現(xiàn)了Runnable接口,是線程類,而另外一個類則是普通的任務(wù)類,包含了線程類所用到的方法。我們的主類在main方法前面就實例化一個Account類,以供線程類調(diào)用該類里面的同步方法。
這種構(gòu)造方式是多線程常用到的一種構(gòu)造方式吧。不難發(fā)現(xiàn)后面要手寫的生產(chǎn)者消費者模型也是這樣子構(gòu)造的。這相當于是一個多線程模板。也是我們學習這個例子最重要的收獲吧。
用監(jiān)視器進行線程之間的通信
還有一點,接口Lock與Condition都是在java5之后出現(xiàn)的,在這之前,線程通信是通過內(nèi)置的監(jiān)視器(monitor)實現(xiàn)的。
監(jiān)視器是一個相互排斥且具有同步能力的對象,任意對象都有可能成為一個monitor。監(jiān)視器是通過synchronized關(guān)鍵字來對自己加鎖(加鎖解鎖是解決線程同步最基本的思想),使用wait()方法時線程暫停并 等待條件發(fā)生,發(fā)通知則是通過notify()和notifyAll()方法。大體的模板是這樣子的:
不難看出await()、signal()、signally()是wait()、notify()、notifyAll()的進化形態(tài),所以不建議使用監(jiān)視器。
2、生產(chǎn)者消費者實現(xiàn),使用LinkedList自寫緩沖區(qū)
這個模型一直很經(jīng)典,學操作系統(tǒng)的時候還學過,記得linux還用PV操作去實現(xiàn)它,不過這東西是跨學科的。
考慮緩存區(qū)buffer的使用者,生產(chǎn)者和消費者,他們都能識別緩沖區(qū)是否滿的,且兩種各只能發(fā)出一種信號:
生產(chǎn)者:它能發(fā)出notEmpty()信號,即緩沖區(qū)非空信號,當它看到緩沖區(qū)滿的時候,它就調(diào)用await等待。
消費者:它能發(fā)出notFull()信號,即緩沖區(qū)未滿的信號,當它看到緩沖區(qū)空的時候,它也調(diào)用await等待。
看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;//生產(chǎn)者消費者public class ConsumerProducer {private static Buffer buffer= new Buffer();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new ProducerTask());executor.execute(new ConsumerTask());executor.shutdown();}public static class ProducerTask implements Runnable{@Overridepublic void run() {int i=1;try {while(true){System.out.println("生產(chǎn)者寫入數(shù)據(jù)"+i);buffer.write(i++);Thread.sleep((int)(Math.random()*80));} }catch (InterruptedException e) {e.printStackTrace();}}}public static class ConsumerTask implements Runnable{public void run() {try {while(true){System.out.println("/t/t消費讀出數(shù)據(jù)"+buffer.read());Thread.sleep((int)(Math.random()*100));}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Buffer{private static final int CAPACTIY = 4; //緩沖區(qū)容量private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();private static Lock lock = new ReentrantLock();private static Condition notEmpty = lock.newCondition();private static Condition notFull = lock.newCondition();public void write(int value){lock.lock();try{while(queue.size()==CAPACTIY){System.out.println("緩沖區(qū)爆滿");notFull.await();}queue.offer(value);notEmpty.signalAll(); //通知所有的緩沖區(qū)未空的情況}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();}}@SuppressWarnings("finally")public int read(){int value = 0;lock.lock();try{while(queue.isEmpty()){System.out.println("/t/t緩沖區(qū)是空的,等待緩沖區(qū)非空的情況");notEmpty.await();}value = queue.remove();notFull.signal();}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();return value;}}}}
運行截圖
程序運行正常,不過稍微延長一下讀取時間,就會出現(xiàn)這樣的情況
程序里面設(shè)置的容量是4,可是這里卻可以存入最多5個數(shù)據(jù),而且更合理的情況應(yīng)該是初始緩沖區(qū)是空的,后面找了下這個小bug,原來是調(diào)用offer()函數(shù)應(yīng)該放在檢測語句之前,如果希望一開始就調(diào)用ConsumerTask,在main方法里面調(diào)換兩者的順序即可。
3、用阻塞隊列快速實現(xiàn)生產(chǎn)者消費者模型java的強大之處是它有著豐富的類庫,我們學習java在某種程度上就是學習這些類庫。
阻塞隊列是這樣的一種隊列:當試圖向一個滿隊列里添加元素 或者 從空隊列里刪除元素時,隊列會讓線程自動阻塞,且當隊列滿時,隊列會繼續(xù)存儲元素,供喚醒后的線程使用。這應(yīng)該說是專門為消費者生產(chǎn)者模型而設(shè)計的一種隊列吧,它實現(xiàn)了Queue接口,主要方法是put()和take()方法。
java支持三個具體的阻塞隊列ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。都在java.util.concurrent包中。
簡單描述上面三個阻塞隊列:
ArrayBlockingQueue: 該阻塞用數(shù)組實現(xiàn),按照FIFO,即先進先出的原則對數(shù)據(jù)進行排序,和數(shù)組的使用有點相似,它事先需要指定一個容量,不過即便隊列超出這個容量,也是不會報錯滴。
LinkeddBlockingQueue:用鏈表實現(xiàn),默認隊列大小是Integer.MAX_VALUE,也是按照先進先出的方法對數(shù)據(jù)排序,性能可能比ArrayBlockingQueue,有待研究。
PriorityBlockingQueue:用優(yōu)先隊列實現(xiàn)的阻塞隊列,會對元素按照大小進行排序,也可以創(chuàng)建不受限制的隊列,put方法永不阻塞。
ok,看代碼:
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerProducerUsingBlockQueue {private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2);public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());try {Thread.sleep(100);executor.shutdownNow(); //暴力關(guān)閉,會報錯,不推薦} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static class Consumer implements Runnable{@Overridepublic void run() {try{int i=1;while(true){System.out.println("生成者寫入:"+i);buffer.put(i++);Thread.sleep((int)(Math.random())*1000);}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{@Overridepublic void run() {try{while(true){System.out.println("/t/t消費者取出"+buffer.take());Thread.sleep((int)(Math.random())*10000);}}catch(InterruptedException ex){ex.printStackTrace();}}}}
運行截圖:
沒啥大的問題,就是在關(guān)閉線程的時候太過暴力了,會報錯,線程里面的每一個函數(shù)都似乎值得研究,之前想通過Interrupt暫停,不過失敗了,就直接使用線程池執(zhí)行器的shoutdownNow方法來的。后面自己又用了另外一種關(guān)閉線程的方法,見下面代碼
使用LinkedBlockingQueue實現(xiàn)消費者生產(chǎn)者且使用布爾變量控制線程關(guān)閉
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class A_Control_stop {private static LinkedBlockingQueue<String> buffer = new LinkedBlockingQueue<String>();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());executor.shutdown();while(!executor.isTerminated()){}System.out.println("所有的的線程都正常結(jié)束");}public static class Consumer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;String[] str ={"as","d","sd","ew","sdfg","esfr"};while(!exit){System.out.println("生成者寫入:"+str[i]);buffer.put(str[i++]);Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;while(!exit){System.out.println("/t/t消費者取出"+buffer.take());i++;Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}}
截圖
關(guān)于阻塞隊列,覺得這篇文章講的不錯,推薦大家看看 聊聊并發(fā)----Java中的阻塞隊列
用了幾天,多線程算是學了點皮毛,附注一下:這幾天文章主要是參考了《java程序語言設(shè)計進階篇第8版》,老外寫的書講的真心不錯,只不過現(xiàn)在java都已經(jīng)更新到j(luò)ava8了。在其他一些網(wǎng)站上看到自己的文章,沒有說明轉(zhuǎn)載什么的,估計是直接“被采集”過去了。
本文出自于博客園蘭幽,轉(zhuǎn)載請說明出處。
新聞熱點
疑難解答