前面講ScheduledThreadPoolExecutor曾經重點講到了DelayedWorkQueue,這里說的PriorityBlockingQueue其實是DelayedWorkQueue的簡化版本,實現了按序排列元素的功能。也就是說PriorityBlockingQueue是維護一個按序排列的隊列,排序的方法可以通過指定Comparator來比較元素的大小,或者元素類型本身實現了Comparable接口。
因此PriorityBlockingQueue也是使用基于數組的二叉堆來實現的。
首先還是看看offer方法:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 鎖定這個隊列 lock.lock(); int n, cap; Object[] array; // 如果數組已滿,則嘗試為數組擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) // 沒有comparator情況下,元素類型必須實現Comparable接口 // 使用compare方法進行比較,然后插入元素到堆中 siftUpComparable(n, e, array); else // 制定comparator的情況下,插入元素使用comparator // 比較元素,然后插入元素到堆中 siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
這里看一下tryGrow,在對數組進行擴容時釋放了主鎖的,因為分配空間本身是不需要主鎖的,只有更新數組時才會要主鎖。
這樣可以提高并發執行的性能,減少阻塞。
private void tryGrow(Object[] array, int oldCap) { // 擴容數組時,釋放主鎖,這樣其他取走元素的操作就可以正常 // 操作了。這里使用一個簡單的allocationSpinLock作為鎖, // 它的值為1表示鎖正在被使用,為0表示鎖為被占用。 // 在獲取該鎖時,用的CAS操作,而釋放時,因為鎖已經占用, // 直接賦值為0即可。 // 分配空間本身是用不到主鎖的,只有更新數組的時候才需要。 lock.unlock(); Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { int minCap = oldCap + 1; // 整數溢出 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 如果數組被更新了,就沒有必要再分配新的空間了 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 其他線程正在占用allocationSpinLock,調用yield告訴線程調度 // 如果其他線程需要CPU,可以先拿去,我過會再執行,否則我繼續執行。 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 因為要返回,再次獲取主鎖,而且后面可能要更新數組也需要主鎖 lock.lock(); // 如果分配新空間成功,而且原先的隊列沒有被其他的線程更新過 // 就更新數組。這里不需要使用CAS,因為這個時候已經占用主鎖了 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); }
再看取走元素的核心方法extract(poll方法也是使用這個方法從堆中拿走元素)
private E extract() { E result; int n = size - 1; if (n < 0) result = null; else { // 取走第一個元素 Object[] array = queue; result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; } return result; }
這里提一下PriorityBlockingQueue的序列化。PriorityBlockingQueue內置了一個PriorityQueue對象,序列化會把元素轉存到這個PriorityQueue中,然后再進行序列化。
反序列化時也是用PriorityQueue讀取,然后再把元素轉存回PriorityBlockingQueue自己的隊列。
private PriorityQueue q;
下一篇會講具有DelayedWorkQueue的另外一個功能延遲執行的DelayQueue。
新聞熱點
疑難解答