Exchanger可以看做雙向數(shù)據(jù)傳輸?shù)腟ynchronousQueue,即沒有生產(chǎn)者和消費(fèi)者之分,任意兩個線程都可以交換數(shù)據(jù)。
在JDK5中Exchanger被設(shè)計成一個容量為1的容器,存放一個等待線程,直到有另外線程到來就會發(fā)生數(shù)據(jù)交換,然后清空容器,等到下一個到來的線程。
從JDK6開始,Exchanger用了類似ConcurrentMap的分段思想,提供了多個slot,增加了并發(fā)執(zhí)行時的吞吐量。
Exchanger不存在公平不公平的模式,因為沒有排隊的情況發(fā)生,只要有兩個線程就可以發(fā)生數(shù)據(jù)交換。
直接看核心方法:
PRivate Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // index是線程ID的hash值映射到0到max之間的一個值 // 一般情況下max為0,這樣線程交換數(shù)據(jù)只會使用第一個slot, // 即index是0,而max不為0情況請看下面的循環(huán) int index = hashIndex(); // CAS操作失敗的次數(shù) int fails = 0; for (;;) { // 當(dāng)前slot中存儲的對象,也就是Node Object y; Slot slot = arena[index]; // 延遲加載,即只有當(dāng)slot為null時才創(chuàng)建一個slot // 延遲加載后重新循環(huán)一次 if (slot == null) createSlot(index); // slot中有數(shù)據(jù),也就意味著有線程在等待交換數(shù)據(jù) // 這時可以嘗試用CAS重置slot(把slot存儲的對象設(shè)為null) // 用slot中存儲的對象和當(dāng)前線程進(jìn)行數(shù)據(jù)交換 // 如果交換成功就通知原先等待的線程 else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { Node you = (Node)y; if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // 如果slot存儲的對象已經(jīng)被重置為null,但是數(shù)據(jù)交換失敗了 // 這時就意味著這個等待的線程的交換請求被取消了 // 在分析wait類型的方法代碼時會看到如何處理這種情況 } // 如果slot中沒有存儲對象,那么首先嘗試把當(dāng)前線程存儲到slot中 // 如果存儲失敗了,就重新循環(huán) else if (y == null && slot.compareAndSet(null, me)) { // index為0意味著僅僅有當(dāng)前線程在等待交換數(shù)據(jù),因此直接等待即可 if (index == 0) return timed ? awaitNanos(me, slot, nanos) : await(me, slot); // 所謂的spin wait:就是固定次數(shù)循環(huán),每次計數(shù)減一 // 對于單核系統(tǒng)來說,spin wait是不做的,因為單核 // 做wait時需要占用CPU,其他線程是無法使用CPU,因此這樣 // 的等待毫無意義。而多核系統(tǒng)中spin值為2000,也就是會做 // 2000次循環(huán)。 // 如果循環(huán)完成后依然沒有得到交換的數(shù)據(jù),那么會返回一個 // CANCEL對象表示請求依舊被取消,并且把Node從slot中清除 Object v = spinWait(me, slot); if (v != CANCEL) return v; // 如果取消了,就新建一個Node取消原先取消的Node用于下次循環(huán) me = new Node(item); int m = max.get(); // index除2,縮小slot的范圍 // 同時如果m過大,減小m if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } // 允許CAS失敗兩次,因為兩個else if中都有CAS,因此這里 // 允許兩個else if的CAS操作都失敗過 else if (++fails > 1) { int m = max.get(); // 失敗超過3次,增大m,并且從m處重新索引 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // 當(dāng)index小于0,回到m,重新循環(huán) else if (--index < 0) index = m; } } }
這篇文章關(guān)于索引index這塊弄得不是很清楚,后續(xù)會繼續(xù)研究,及時更新。
新聞熱點(diǎn)
疑難解答