CyclicBarrier是一個用于線程同步的輔助類,它允許一組線程等待彼此,直到所有線程都到達集合點,然后執行某個設定的任務。
現實中有個很好的例子來形容:幾個人約定了某個地方集中,然后一起出發去旅行。每個參與的人就是一個線程,CyclicBarrier就是那個集合點,所有人到了之后,就一起出發。
CyclicBarrier的構造函數有兩個:
// parties是參與等待的線程的數量,barrierAction是所有線程達到集合點之后要做的動作public CyclicBarrier(int parties, Runnable barrierAction);// 達到集合點之后不執行操作的構造函數public CyclicBarrier(int parties)
需要說明的是,CyclicBarrier只是記錄線程的數目,CyclicBarrier是不創建任何線程的。線程是通過調用CyclicBarrier的await方法來等待其他線程,如果調用await方法的線程數目達到了預設值,也就是上面構造方法中的parties,CyclicBarrier就會開始執行barrierAction。
因此我們來看CyclicBarrier的核心方法dowait,也就是await方法調用的私有方法:
PRivate int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // count就是預設的parties,count減1的值表示還剩余幾個 // 線程沒有達到該集合點 int index = --count; // index為0表示所有的線程都已經達到集合點,這時 // 占用最后一個線程,執行運行設定的任務 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 喚醒其他等待的線程, // 更新generation以便下一次運行 nextGeneration(); return 0; } finally { // 如果運行任務時發生異常,設置狀態為broken // 并且喚醒其他等待的線程 if (!ranAction) breakBarrier(); } } // 還有線程沒有調用await,進入循環等待直到其他線程 // 達到集合點或者等待超時 for (;;) { try { // 如果沒有設置超時,進行無超時的等待 if (!timed) trip.await(); // 有超時設置,進行有超時的等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // generation如果沒有被更新表示還是當前的運行 // (generation被更新表示集合完畢并且任務成功), // 在狀態沒有被設置為broken狀態的情況下,遇到線程 // 中斷異常表示當前線程等待失敗,需要設置為broken // 狀態,并且拋出中斷異常 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // else對應的條件為:g != generation || g.broken // 表示要么generation已經被更新意味著所有線程已經到達 // 集合點并且任務執行成功,要么就是是broken狀態意味著 // 任務執行失敗,無論哪種情況所有線程已經達到集合點,當 // 前線程要結束等待了,發生了中斷異常,需要中斷當前線程 // 表示遇到了中斷異常。 Thread.currentThread().interrupt(); } } // 如果發現當前狀態為broken,拋出異常 if (g.broken) throw new BrokenBarrierException(); // generation被更新表示所有線程都已經達到集合點 // 并且預設任務已經完成,返回該線程進入等待順序號 if (g != generation) return index; // 等待超時,設置為broken狀態并且拋出超時異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
1. 任何一個線程等待時發生異常,CyclicBarrier都將被設置為broken狀態,運行都會失敗
2. 每次運行成功之后CyclicBarrier都會清理運行狀態,這樣CyclicBarrier可以重新使用
3. 對于設置了超時的等待,在發生超時的時候會引起CyclicBarrier的broken
說完了CyclicBarrier,再來說說CountDownLatch。
CountDownLatch同樣也是一個線程同步的輔助類,同樣適用上面的集合點的場景來解釋,但是運行模式完全不同。
CyclicBarrier是參與的所有的線程彼此等待,CountDownLatch則不同,CountDownLatch有一個導游線程在等待,每個線程報到一下即可無須等待,等到導游線程發現所有人都已經報到了,就結束了自己的等待。
CountDownLatch的構造方法允許指定參與的線程數量:
public CountDownLatch(int count)
參與線程使用countDown表示報到:
public void countDown() { sync.releaseShared(1); }
看到releaseShared很容易使人聯想到共享鎖,那么試著用共享鎖的運行模式來解釋就簡單得多了:
和信號量的實現類似,CountDownLatch內置一下有限的共享鎖。
每個參與線程擁有一把共享鎖,調用countDown就等于是釋放了自己的共享鎖,導游線程await等于一下子要拿回所有的共享鎖。那么基于AbstractQueuedSynchronizer類來實現就很簡單了:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
在await時注意到數量是1,其實這個參數對于CountDownLatch實現的Sync類(AbstractQueuedSynchronizer的子類)來說是不起作用的,因為需要保證await獲取共享鎖時必須拿到所有的共享鎖,這個參數也就變得沒有意義了。看一下Sync的tryAcquireShared方法就明白了:
protected int tryAcquireShared(int acquires) { // 和信號量Semaphore的實現一樣,使用state來存儲count, // 每次釋放共享鎖就把state減1,state為0表示所有的共享 // 鎖已經被釋放。注意:這里的acquires參數不起作用 return (getState() == 0) ? 1 : -1; }
因此Sync的tryReleaseShared就是更新state(每次state減1):
protected boolean tryReleaseShared(int releases) { // 每次state減1,當state為0,返回false表示所有的共享鎖都已經釋放 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
CyclicBarrier和CountDownLatch本質上來說都是多個線程同步的輔助工具,前者可以看成分布式的,后者可以看出是主從式。
新聞熱點
疑難解答