ReentrantLock實現了標準的互斥操作,也就是說在某一時刻只有有一個線程持有鎖。ReentrantLock采用這種獨占的保守鎖直接,在一定程度上減低了吞吐量。在這種情況下任何的“讀/讀”、“讀/寫”、“寫/寫”操作都不能同時發生。然而在實際的場景中我們就會遇到這種情況:有些資源并發的訪問中,它大部分時間都是執行讀操作,寫操作比較少,但是讀操作并不影響數據的一致性,如果在進行讀操作時采用獨占的鎖機制,這樣勢必會大大降低吞吐量。所以如果能夠做到讀寫分離,那就非常完美了。
ReadWriteLock, 維護了一對相關的鎖,一個用于只讀操作,另一個用于寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。寫入鎖是獨占的。對于ReadWriteLock而言,一個資源能夠被多個讀線程訪問,或者被一個寫線程訪問,但是不能同時存在讀寫線程。也就是說讀寫鎖使用的場合是一個共享資源被大量讀取操作,而只有少量的寫操作(修改數據)。如下:
public interface ReadWriteLock { Lock readLock(); Lock writeLock();}
ReadWriteLock為一個接口,他定義了兩個方法readLock、writeLock,從方法名我們就可以看出這兩個方法是干嘛用的。ReentrantReadWriteLock作為ReadWriteLock的實現類,在API文檔中詳細介紹了它的特性。
(一) 公平性
1)、 非公平鎖(默認) 這個和獨占鎖的非公平性一樣,由于讀線程之間沒有鎖競爭,所以讀操作沒有公平性和非公平性,寫操作時,由于寫操作可能立即獲取到鎖,所以會推遲一個或多個讀操作或者寫操作。因此非公平鎖的吞吐量要高于公平鎖。
2)、 公平鎖 利用AQS的CLH隊列,釋放當前保持的鎖(讀鎖或者寫鎖)時,優先為等待時間最長的那個寫線程分配寫入鎖,當前前提是寫線程的等待時間要比所有讀線程的等待時間要長。同樣一個線程持有寫入鎖或者有一個寫線程已經在等待了,那么試圖獲取公平鎖的(非重入)所有線程(包括讀寫線程)都將被阻塞,直到最先的寫線程釋放鎖。如果讀線程的等待時間比寫線程的等待時間還有長,那么一旦上一個寫線程釋放鎖,這一組讀線程將獲取鎖。
(二) 重入性
1)、 讀寫鎖允許讀線程和寫線程按照請求鎖的順序重新獲取讀取鎖或者寫入鎖。當然了只有寫線程釋放了鎖,讀線程才能獲取重入鎖。
2)、 寫線程獲取寫入鎖后可以再次獲取讀取鎖,但是讀線程獲取讀取鎖后卻不能獲取寫入鎖。
3)、 另外讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。(為何是65535后面介紹)
(三) 鎖降級
1)、 寫線程獲取寫入鎖后可以獲取讀取鎖,然后釋放寫入鎖,這樣就從寫入鎖變成了讀取鎖,從而實現鎖降級的特性。
(四) 鎖升級
1)、 讀取鎖是不能直接升級為寫入鎖的。因為獲取一個寫入鎖需要釋放所有讀取鎖,所以如果有兩個讀取鎖視圖獲取寫入鎖而都不釋放讀取鎖時就會發生死鎖。
(五) 鎖獲取中斷
1)、 讀取鎖和寫入鎖都支持獲取鎖期間被中斷。這個和獨占鎖一致。
(六) 條件變量
1)、 寫入鎖提供了條件變量(Condition)的支持,這個和獨占鎖一致,但是讀取鎖卻不允許獲取條件變量,將得到一個UnsupportedOperationException異常。
(七) 重入數
1)、 讀取鎖和寫入鎖的數量最大分別只能是65535(包括重入數)。
(八) 監測
1)、 此類支持一些確定是保持鎖還是爭用鎖的方法。這些方法設計用于監視系統狀態,而不是同步控制。
ReentrantReadWriteLock與ReentrantLock一樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。所以ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一 樣而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。
/** 讀鎖 */ PRivate final ReentrantReadWriteLock.ReadLock readerLock; /** 寫鎖 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** 使用默認(非公平)的排序屬性創建一個新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** 使用給定的公平策略創建一個新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** 返回用于寫入操作的鎖 */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** 返回用于讀取操作的鎖 */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } public static class WriteLock implements Lock, java.io.Serializable{ public void lock() { //獨占鎖 sync.acquire(1); } /** * 省略其余源代碼 */ } public static class ReadLock implements Lock, java.io.Serializable { public void lock() { //共享鎖 sync.acquireShared(1); } /** * 省略其余源代碼 */ }
從上面的源代碼我們可以看到WriteLock就是一個獨占鎖,readLock是一個共享鎖,他們內部都是使用AQS的acquire、release來進行操作的。但是還是存在一些區別的。關于獨占鎖、共享鎖,請關注前面的博客:
【Java并發編程實戰】—–“J.U.C”:ReentrantLock之二lock方法分析
【Java并發編程實戰】—–“J.U.C”:ReentrantLock之三unlock方法分析
【Java并發編程實戰】-----“J.U.C”:Semaphore
下面LZ就ReadLock、WriteLock的獲取鎖(lock)、釋放鎖(release)進行分析。
public void lock() { sync.acquire(1); }
與ReentrantLock一樣,調用AQS的acquire():
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
第一步,寫鎖調用tryAcquire方法,該方法與ReentrantLock中的tryAcquire方法略有不同:
protected final boolean tryAcquire(int acquires) { //當前線程 Thread current = Thread.currentThread(); //當前鎖個數 int c = getState(); //寫鎖個數 int w = exclusiveCount(c); //當前鎖個數 != 0(是否已經有線程持有鎖),線程重入 if (c != 0) { //w == 0,表示寫線程數為0 //或者獨占鎖不是當前線程,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //超出最大范圍(65535) if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //設置鎖的線程數量 setState(c + acquires); return true; } //是否阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //設置鎖為當前線程所有 setExclusiveOwnerThread(current); return true; }
在tryAcquire()中有一個段代碼
int w = exclusiveCount(c);
該段代碼主要是獲取線程的數量的,在前面的特性里面有講過讀取鎖和寫入鎖的數量最大分別只能是65535(包括重入數)。為何是65535呢?在前面LZ也提到過獨占鎖ReentrantLock中有一個state,共享鎖中也有一個state,其中獨占鎖中的state為0或者1如果有重入,則表示重入的次數,共享鎖中表示的持有鎖的數量。而在ReadWriteLock中則不同,由于ReadWriteLock中存在兩個鎖,他們之間有聯系但是也有差異,所以需要有兩個state來分別表示他們。于是ReentrantReadWriteLock就將state一分二位,高16位表示共享鎖的數量,低16位表示獨占鎖的數量。2^16 – 1 = 65535。這就是前面提過的為什么讀取鎖和寫入鎖的數量最大分別只能是65535。
· static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 返回共享鎖持有線程的數量 **/ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回獨占鎖持有線程的數量 **/ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
這段代碼可以清晰表達計算寫鎖、讀書持有線程的數量。
在上段tryAcquire方法的源代碼中,主要流程如下:
1、首先獲取c、w。然后判斷是否已經有線程持有寫鎖(c != 0),如果持有,則線程進行重入。若w == 0(寫入鎖==0)或者 current != getExclusiveOwnerThread()(鎖的持有者不是當前線程),則返回false。如果寫入鎖的數量超出最大范圍(65535),則拋出error。
2、如果當且寫線程數為0(那么讀線程也應該為0,因為上面已經處理c!=0的情況),并且當前線程需要阻塞那么就返回失敗;如果通過CAS增加寫線程數失敗也返回失敗。
3、當c ==0或者c>0,w >0,則設置鎖的持有則為當前線程。
public void unlock() { sync.release(1); }
unlock()調用Sync的release():
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在release()中首先調用tryRelease方法進行嘗試釋放鎖:
protected final boolean tryRelease(int releases) { //若鎖的持有者不是當前線程,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //寫鎖的新線程數 int nextc = getState() - releases; //若寫鎖的新線程數為0,則將鎖的持有者設置為null boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //設置寫鎖的新線程數 setState(nextc); return free; }
寫鎖的釋放過程還是相對而言比較簡單的:首先查看當前線程是否為寫鎖的持有者,如果不是拋出異常。然后檢查釋放后寫鎖的線程數是否為0,如果為0則表示寫鎖空閑了,釋放鎖資源將鎖的持有線程設置為null,否則釋放僅僅只是一次重入鎖而已,并不能將寫鎖的線程清空。
由于寫鎖與獨占鎖存在很大的相似之處,所以相同的地方,LZ不再闡述,更多請查閱:
【Java并發編程實戰】—–“J.U.C”:ReentrantLock之二lock方法分析
【Java并發編程實戰】—–“J.U.C”:ReentrantLock之三unlock方法分析
讀鎖的內在機制就是共享鎖:
public void lock() { sync.acquireShared(1); }
lock方法內部調用Sync的acquireShared():
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
對于tryAquireShared():
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //鎖的持有線程數 int c = getState(); /* * 如果寫鎖線程數 != 0 ,且獨占鎖不是當前線程則返回失敗,因為存在鎖降級 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖線程數 int r = sharedCount(c); /* * readerShouldBlock():讀鎖是否需要等待(公平鎖原則) * r < MAX_COUNT:持有線程小于最大數(65535) * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { /* * holdCount部分后面講解 */ if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; // } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
讀鎖獲取鎖的過程比寫鎖稍微復雜些:
1、寫鎖的線程數C!=0且寫鎖的線程持有者不是當前線程,返回-1。因為存在鎖降級,寫線程獲取寫入鎖后可以獲取讀取鎖。
2、依據公平性原則,判斷讀鎖是否需要阻塞,讀鎖持有線程數小于最大值(65535),且設置鎖狀態成功,執行以下代碼(對于HoldCounter下面再闡述),并返回1。如果不滿足改條件,執行fullTryAcquireShared():(HoldCounter部分后面講解)
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { //鎖的線程持有數 int c = getState(); //如果寫鎖的線程持有數 != 0 且鎖的持有者不是當前線程,返回-1 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //若讀鎖需要阻塞 else if (readerShouldBlock()) { //若隊列的頭部是當前線程 if (firstReader == current) { } else { //下面講解 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //讀鎖的線程數到達最大值:65536,拋出異常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //設置鎖的狀態成功 if (compareAndSetState(c, c + SHARED_UNIT)) { // if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; }// else if (firstReader == current) { firstReaderHoldCount++; }//下面講解 else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } } }
public void unlock() { sync.releaseShared(1); }
unlock調用releaseShared()方法,releaseShared()是AQS中的方法,如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared是ReentrantReadWriteLock中的方法:
protected final boolean tryReleaseShared(int unused) { //當前線程 Thread current = Thread.currentThread(); /* * HoldCounter部分后面闡述 */ if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //不斷循環,不斷嘗試CAS操作 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
在這里同樣忽略HoldCounter,其實在該方法中最關鍵的部分在于for(;;)部分,該部分其實就是一個不斷嘗試的CAS過程,直到修狀態成功。
在讀鎖的獲取、釋放過程中,總是會有一個對象存在著,同時該對象在獲取線程獲取讀鎖是+1,釋放讀鎖時-1,該對象就是HoldCounter。
要明白HoldCounter就要先明白讀鎖。前面提過讀鎖的內在實現機制就是共享鎖,對于共享鎖其實我們可以稍微的認為它不是一個鎖的概念,它更加像一個計數器的概念。一次共享鎖操作就相當于一次計數器的操作,獲取共享鎖計數器+1,釋放共享鎖計數器-1。只有當線程獲取共享鎖后才能對共享鎖進行釋放、重入操作。所以HoldCounter的作用就是當前線程持有共享鎖的數量,這個數量必須要與線程綁定在一起,否則操作其他線程鎖就會拋出異常。
先看讀鎖獲取鎖的部分:
if (r == 0) { //r == 0,表示第一個讀鎖線程,第一個讀鎖firstRead是不會加入到readHolds中 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //第一個讀鎖線程重入 firstReaderHoldCount++; } else { //非firstReader計數 HoldCounter rh = cachedHoldCounter; //readHoldCounter緩存 //rh == null 或者 rh.tid != current.getId(),需要獲取rh if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //加入到readHolds中 rh.count++; //計數+1 }
這里為什么要搞一個firstRead、firstReaderHoldCount呢?而不是直接使用else那段代碼?這是為了一個效率問題,firstReader是不會放入到readHolds中的,如果讀鎖僅有一個的情況下就會避免查找readHolds。可能就看這個代碼還不是很理解HoldCounter。我們先看firstReader、firstReaderHoldCount的定義:
private transient Thread firstReader = null;private transient int firstReaderHoldCount;
這兩個變量比較簡單,一個表示線程,當然該線程是一個特殊的線程,一個是firstReader的重入計數。
HoldCounter的定義:
static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); }
在HoldCounter中僅有count和tid兩個變量,其中count代表著計數器,tid是線程的id。但是如果要將一個對象和線程綁定起來僅記錄tid肯定不夠的,而且HoldCounter根本不能起到綁定對象的作用,只是記錄線程tid而已。
誠然,在java中,我們知道如果要將一個線程和對象綁定在一起只有ThreadLocal才能實現。所以如下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
ThreadLocalHoldCounter繼承ThreadLocal,并且重寫了initialValue方法。
故而,HoldCounter應該就是綁定線程上的一個計數器,而ThradLocalHoldCounter則是線程綁定的ThreadLocal。從上面我們可以看到ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock里面緩存的上一個讀取線程(cachedHoldCounter)是否是當前線程。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的原因是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(盡管GC能夠智能的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收對象而已(引自[1])。
public class Reader implements Runnable{ private PricesInfo pricesInfo; public Reader(PricesInfo pricesInfo){ this.pricesInfo = pricesInfo; } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + "--Price 1:" + pricesInfo.getPrice1()); System.out.println(Thread.currentThread().getName() + "--Price 1:" + pricesInfo.getPrice2()); } }}
Writer
public class Writer implements Runnable{ private PricesInfo pricesInfo; public Writer(PricesInfo pricesInfo){ this.pricesInfo = pricesInfo; } @Override public void run() { for (int i=0; i<3; i++) { System.out.printf("Writer: Attempt to modify the prices./n"); pricesInfo.setPrices(Math.random()*10, Math.random()*8); System.out.printf("Writer: Prices have been modified./n"); try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }}
PriceInfo
public class PricesInfo { private double price1; private double price2; private ReadWriteLock lock; public PricesInfo(){ price1 = 1.0; price2 = 2.0; lock = new ReentrantReadWriteLock(); } public double getPrice1(){ lock.readLock().lock(); double value = price1; lock.readLock().unlock(); return value; } public double getPrice2(){ lock.readLock().lock(); double value = price2; lock.readLock().unlock(); return value; } public void setPrices(double price1, double price2){ lock.writeLock().lock(); this.price1 = price1; this.price2 = price2; lock.writeLock().unlock(); }}
Test:
public class Test { public static void main(String[] args) { PricesInfo pricesInfo = new PricesInfo(); Reader[] readers = new Reader[5]; Thread[] readerThread = new Thread[5]; for (int i=0; i<5; i++){ readers[i]=new Reader(pricesInfo); readerThread[i]=new Thread(readers[i]); } Writer writer=new Writer(pricesInfo); Thread threadWriter=new Thread(writer); for (int i=0; i<5; i++){ readerThread[i].start(); } threadWriter.start(); }}
參考資料:
1、Java多線程(十)之ReentrantReadWriteLock深入分析
2、JUC 可重入 讀寫鎖 ReentrantReadWriteLock
新聞熱點
疑難解答