读Exchanger源码 博客分类: java多线程 java
程序员文章站
2024-03-14 12:12:22
...
//用于线程间交换数据 public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); } private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // Create in case occupying //计算hash值 int index = hashIndex(); // Index of current slot int fails = 0; // Number of CAS failures for (;;) { Object y; // Contents of current slot Slot slot = arena[index]; if (slot == null) //创建slot createSlot(index); //如果有值 else if ((y = slot.get()) != null && // Try to fulfill slot.compareAndSet(y, null)) { Node you = (Node)y; // Transfer item if (you.compareAndSet(null, item)) { //唤醒线程 LockSupport.unpark(you.waiter); return you.item; } // Else cancelled; continue } else if (y == null && // Try to occupy slot.compareAndSet(null, me)) { if (index == 0) // Blocking wait for slot 0 return timed ? //阻塞当前线程 awaitNanos(me, slot, nanos) : await(me, slot); Object v = spinWait(me, slot); // Spin wait for non-0 if (v != CANCEL) return v; me = new Node(item); // Throw away cancelled node int m = max.get(); if (m > (index >>>= 1)) // Decrease index max.compareAndSet(m, m - 1); // Maybe shrink table } else if (++fails > 1) { // Allow 2 fails on 1st slot int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // Grow on 3rd failed slot else if (--index < 0) index = m; // Circularly traverse } } } private void createSlot(int index) { // Create slot outside of lock to narrow sync region Slot newSlot = new Slot(); Slot[] a = arena; synchronized (a) { if (a[index] == null) a[index] = newSlot; } } private static Object spinWait(Node node, Slot slot) { int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) --spins; else tryCancel(node, slot); } } //尝试取消 private static boolean tryCancel(Node node, Slot slot) { if (!node.compareAndSet(null, CANCEL)) return false; if (slot.get() == node) // pre-check to minimize contention slot.compareAndSet(node, null); return true; } //自旋挂起当前线程 private static Object await(Node node, Slot slot) { Thread w = Thread.currentThread(); int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) // Spin-wait phase --spins; else if (node.waiter == null) // Set up to block next node.waiter = w; else if (w.isInterrupted()) // Abort on interrupt tryCancel(node, slot); else // Block LockSupport.park(node); } } //自选等待挂起当前线程指定时间 private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; else if (node.waiter == null) node.waiter = w; else if (w.isInterrupted()) tryCancel(node, slot); else LockSupport.parkNanos(node, nanos); } else if (tryCancel(node, slot) && !w.isInterrupted()) return scanOnTimeout(node); } } /** 总结:该类的实现,是当一个线程过来先查看对应的slot中是否已经有数据在等待交换, 没有则挂起当前线程等待交换数据。有则将数据交换然后唤醒之前等待交换的线程。 */
推荐阅读
-
读ThreadLocal源代码 博客分类: java多线程 java
-
读Collections源码 博客分类: java集合 java
-
看String源码的疑惑 博客分类: java String
-
读ThreadLocal源代码 博客分类: java多线程 java
-
读Collections源码 博客分类: java集合 java
-
读Exchanger源码 博客分类: java多线程 java
-
java.util.concurrent介绍 博客分类: java 多线程java
-
java.util.concurrent介绍 博客分类: java 多线程java
-
Java多线程-生产者于消费者 博客分类: javajust do itmore and more java多线程thread生产者和消费者经典
-
微信公众账号第三方平台全网发布源码(java) 博客分类: java微信jeewx