本文共 9341 字,大约阅读时间需要 31 分钟。
概念:CountDownLatch是通过一个计数器,记录线程的数量,每个线程在完成自己的任务之后,调用countDown方法,将计数器减一,当计数器的值将为0时,原来等待的线程,即调用countDownLatch.await()方法线程将会被唤起,继续执行。这里有个主线程,调用countDownLatch.await()方法的线程,它会处于挂起状态,直到所有的线程都执行完countDownLatch.countDown方法,最终将计数器减为0,才会被唤醒继续执行。
先看下javadoc中给出的示例代码,稍微做了一些改动,在原来基础添加了可编译的代码
public class CountDownLatchTest2 { public static void main(String[] args) throws InterruptedException{ Driver driver = new Driver(); driver.main(); }}class Driver { // ... void main() throws InterruptedException { int N = 5; CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // 让工人们开始动起来 new Thread(new Worker(startSignal, doneSignal)).start(); Thread.sleep(3000); // 当前main线程先睡眠一段时间,做些其他的事情 startSignal.countDown(); // 唤醒工人们,开始干活 Thread.sleep(3000); // 司机再去做一些其他事情 doneSignal.await(); // 等待工人们结束工作后,再司机开始执行 System.out.println(Thread.currentThread().getName()+"司机开始"); }}class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { System.out.println(Thread.currentThread().getName() + "工人开始准备干活"); startSignal.await(); doWork(); doneSignal.countDown(); System.out.println(Thread.currentThread().getName() + "干完活了"); } catch (InterruptedException ex) {} // return; } void doWork() { System.out.println(Thread.currentThread().getName() + "工人正在干活。。。。"); }}
上面这个代码使用两个CountDownLatch来相互约束,首先是每个工人在准备开始干活之后,startSignal.await进行挂起,这里是每个工人线程都进行挂起,等待司机执行一系列动作之后,司机说开始执行,startSignal.countDown,工人们继续干他们的工作,此时司机的线程进行挂起,司机也需要等待工人们都干完活,才能继续执行,当所有工人执行结束之后,计数器变为0,司机继续执行。
上面这段代码我觉得一开始可能看起来比较绕,不过他也提供了一种实现场景,网上关于CountDownLatch的例子,大多数来说使用都是一个线程等待另外几个线程执行结束后唤醒,这里给出两个CountDownLatch,来实现了线程之间的互相约束,给我们提供了一个非常好的开发场景例子。而不仅仅局限于一个线程等待多个线程。
首先来看下类的内部结构,CountDownLatch 提供了一个内部类,继承自AbstractQueuedSynchronizer(后面简称AQS),关于AQS我之前在分析ReentrantLock源码时,对AQS进行了相关代码的分析,它主要提供了一种模板设计模式,实现AQS接口的类,只需要根据自己的需求,来重写这些方法
tryAcquire (排它锁获取)
tryRelease (排它锁释放)
tryAcquireShared (共享锁获取)
tryReleasedShared (共享锁释放)
当我们创建一个CountDownLatch时
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
Sync(int count) { setState(count); }
线程计数器作为AQS当中的state的值,之前我们说过state是判断是否持有锁的一个标志,在这里state表示的需要记录的线程数。
Sync类在继承AQS时,关于tryAccquireShared的实现
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
只要当前state的值不等于0,就返回 -1
tryReleaseShared
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
在释放的时候,使用循环和CAS来更新state的值,如果更新成功,并且更新后的值等于0,说明释放掉锁,返回true,否则返回false
当我们在主线程中调用await方法时,是如何处理的呢
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
接下来进入到AQS中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }1、如果线程被中断,直接抛出异常,
2、尝试去获取锁,上面我们已经分析了,如果state不等于0,就返回 -1,
3、如果还持有锁,执行doAcquireSharedInterruptibly(1)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在AQS分析ReentrantLock时候,我们分析过doAcquire方法,这里是针对于共享锁的尝试加锁操作
addWaiter创建一个共享锁的链表,如果当前尾节点存在的话,就在尾节点后面追加新的节点,如果尾节点不存在,创建空的head和tail节点,并把当前新节点追加到节点后面,这个在AQS中已经分析过,不再详细赘述
1、找到当前节点在链表中的前驱节点
2、判断前驱节点是否为头节点(即当前持有锁的线程)
3、如果当前节点是头节点,判断头节点是否已经释放掉了锁,如果释放掉了锁,那么将当前节点设置成头节点
4、如果当前节点不是头节点,执行下面一段代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这段代码的逻辑就是判断前一个node节点状态,如果前一个节点是SIGNAL状态,返回true,然后将挂起,并判断是否中断,如果中断的话,将interrupt状态改为true, 如果前一个节点已经取消了,那么久遍历到waitStatus小于0处,最终返回false
接下来来看countDown方法的执行
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
从head开始遍历,找到第一个状态是Signal的线程,将其唤醒。
结合一个简单的例子进行分析:
public class CountDownLatchTest1 { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); new Thread(() -> { try{ System.out.println(Thread.currentThread().getName()+"正在准备执行"); Thread.sleep(3000); latch.countDown(); System.out.println(Thread.currentThread().getName() +"执行完成"); }catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try{ System.out.println(Thread.currentThread().getName()+"正在准备执行"); Thread.sleep(3000); latch.countDown(); System.out.println(Thread.currentThread().getName() +"执行完成"); }catch (Exception e) { e.printStackTrace(); } }).start(); try{ System.out.println("等待两个线程执行结束"); latch.await(); System.out.println("两个子线程执行结束"); System.out.println("继续执行主线程"); }catch (InterruptedException e){ e.printStackTrace(); } }}
这个例子是在main线程执行一半后挂起,thread1 和thread2 执行,执行到countDown方法将计数器减为0之后,main方法被唤起,继续执行。
我们先来分析await方法,它的调用执行顺序
CountDownLatch(await)
-->AQS(acquireSharedInterruptibly(1))
--->CountDownLatch(Sync tryAcquireShared(1)) AQS中state=2,返回 -1
--->AQS(doAcquireSharedInterruptibly)
--->AQS(addWaiter)
此时链表当中的数据是这样的 head(waitstatus=0, thread=null, next=tail,) tail(waitStatus=0,thread=main,pre=head)
--->AQS(shouldParkAfterFailedAcquire)
这个方法执行后,将head当中waitStatus改为 SIGNAL
链表结构变为head(waitstatus=-1, thread=null, next=tail,) tail(waitStatus=0,thread=main,pre=head)
---->AQS(parkAndCheckInteruput) 挂起main线程
接下来 Thread1 执行countDown方法,我们来看下执行流程
CountDownLatch(countDown)
--->AQS(releaseShared)
--->CountDownLatch(tryReleaseShared) 将AQS中state 减 1,返回false 整个流程结束
接下下来看Thread2 执行countDown方法
CountDownLatch(countDown)
--->AQS(releaseShared)
--->CountDownLatch(tryReleaseShared) AQS中的state减为0,返回true
---->AQS(doRelaseShared) 将head节点waitStatus更新为0,
---->AQS(unparkSuccessor) 唤醒 head的节点的next节点,main线程执行
唤醒main线程之后,在main线程被挂起的地方继续往下执行 parkAndCheckInterrupt()方法中
AQS(doAcquireSharedInterruptibly)
--->CountDownLatch(tryAcquireShared) 这个时候返回true
---->AQS(setHeadAndPropagate) 将包含main线程的节点设置成head节点,
---->AQS(doReleaseShared) 将包含main线程的节点状态改为PROPAGATE=-3 整个流程执行结束
如有问题欢迎指正~
转载地址:http://wlvti.baihongyu.com/