博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CountDownLatch使用及源码分析
阅读量:4148 次
发布时间:2019-05-25

本文共 9341 字,大约阅读时间需要 31 分钟。

概念:CountDownLatch是通过一个计数器,记录线程的数量,每个线程在完成自己的任务之后,调用countDown方法,将计数器减一,当计数器的值将为0时,原来等待的线程,即调用countDownLatch.await()方法线程将会被唤起,继续执行。这里有个主线程,调用countDownLatch.await()方法的线程,它会处于挂起状态,直到所有的线程都执行完countDownLatch.countDown方法,最终将计数器减为0,才会被唤醒继续执行。

先看下javadoc中给出的示例代码,稍微做了一些改动,在原来基础添加了可编译的代码

public class CountDownLatchTest2 {    public static void main(String[] args) throws InterruptedException{        Driver driver = new Driver();        driver.main();    }}class Driver { // ...    void main() throws InterruptedException {        int N = 5;        CountDownLatch startSignal = new CountDownLatch(1);        CountDownLatch doneSignal = new CountDownLatch(N);        for (int i = 0; i < N; ++i) // 让工人们开始动起来            new Thread(new Worker(startSignal, doneSignal)).start();        Thread.sleep(3000);           // 当前main线程先睡眠一段时间,做些其他的事情        startSignal.countDown();      // 唤醒工人们,开始干活        Thread.sleep(3000);          // 司机再去做一些其他事情        doneSignal.await();           // 等待工人们结束工作后,再司机开始执行        System.out.println(Thread.currentThread().getName()+"司机开始");    }}class Worker implements Runnable {    private final CountDownLatch startSignal;    private final CountDownLatch doneSignal;    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {        this.startSignal = startSignal;        this.doneSignal = doneSignal;    }    public void run() {        try {            System.out.println(Thread.currentThread().getName() + "工人开始准备干活");            startSignal.await();            doWork();            doneSignal.countDown();            System.out.println(Thread.currentThread().getName() + "干完活了");        } catch (InterruptedException ex) {} // return;    }    void doWork() {        System.out.println(Thread.currentThread().getName() + "工人正在干活。。。。");    }}

上面这个代码使用两个CountDownLatch来相互约束,首先是每个工人在准备开始干活之后,startSignal.await进行挂起,这里是每个工人线程都进行挂起,等待司机执行一系列动作之后,司机说开始执行,startSignal.countDown,工人们继续干他们的工作,此时司机的线程进行挂起,司机也需要等待工人们都干完活,才能继续执行,当所有工人执行结束之后,计数器变为0,司机继续执行。

上面这段代码我觉得一开始可能看起来比较绕,不过他也提供了一种实现场景,网上关于CountDownLatch的例子,大多数来说使用都是一个线程等待另外几个线程执行结束后唤醒,这里给出两个CountDownLatch,来实现了线程之间的互相约束,给我们提供了一个非常好的开发场景例子。而不仅仅局限于一个线程等待多个线程。

CountDownLatch源码分析

首先来看下类的内部结构,CountDownLatch 提供了一个内部类,继承自AbstractQueuedSynchronizer(后面简称AQS),关于AQS我之前在分析ReentrantLock源码时,对AQS进行了相关代码的分析,它主要提供了一种模板设计模式,实现AQS接口的类,只需要根据自己的需求,来重写这些方法

tryAcquire (排它锁获取)

tryRelease (排它锁释放)

tryAcquireShared (共享锁获取)

tryReleasedShared (共享锁释放)

当我们创建一个CountDownLatch时

public CountDownLatch(int count) {        if (count < 0) throw new IllegalArgumentException("count < 0");        this.sync = new Sync(count);    }
Sync(int count) {            setState(count);        }

线程计数器作为AQS当中的state的值,之前我们说过state是判断是否持有锁的一个标志,在这里state表示的需要记录的线程数。

Sync类在继承AQS时,关于tryAccquireShared的实现

protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }

只要当前state的值不等于0,就返回 -1 

tryReleaseShared

protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }

在释放的时候,使用循环和CAS来更新state的值,如果更新成功,并且更新后的值等于0,说明释放掉锁,返回true,否则返回false

当我们在主线程中调用await方法时,是如何处理的呢

public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }

接下来进入到AQS中

public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);    }
1、如果线程被中断,直接抛出异常,

2、尝试去获取锁,上面我们已经分析了,如果state不等于0,就返回 -1,

3、如果还持有锁,执行doAcquireSharedInterruptibly(1)

private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }

在AQS分析ReentrantLock时候,我们分析过doAcquire方法,这里是针对于共享锁的尝试加锁操作

addWaiter创建一个共享锁的链表,如果当前尾节点存在的话,就在尾节点后面追加新的节点,如果尾节点不存在,创建空的head和tail节点,并把当前新节点追加到节点后面,这个在AQS中已经分析过,不再详细赘述

1、找到当前节点在链表中的前驱节点

2、判断前驱节点是否为头节点(即当前持有锁的线程)

3、如果当前节点是头节点,判断头节点是否已经释放掉了锁,如果释放掉了锁,那么将当前节点设置成头节点

4、如果当前节点不是头节点,执行下面一段代码

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*             * This node has already set status asking a release             * to signal it, so it can safely park.             */            return true;        if (ws > 0) {            /*             * Predecessor was cancelled. Skip over predecessors and             * indicate retry.             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*             * waitStatus must be 0 or PROPAGATE.  Indicate that we             * need a signal, but don't park yet.  Caller will need to             * retry to make sure it cannot acquire before parking.             */            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }

这段代码的逻辑就是判断前一个node节点状态,如果前一个节点是SIGNAL状态,返回true,然后将挂起,并判断是否中断,如果中断的话,将interrupt状态改为true, 如果前一个节点已经取消了,那么久遍历到waitStatus小于0处,最终返回false

接下来来看countDown方法的执行

public void countDown() {        sync.releaseShared(1);    }
public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }
private void doReleaseShared() {          for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

从head开始遍历,找到第一个状态是Signal的线程,将其唤醒。

结合一个简单的例子进行分析:

public class CountDownLatchTest1 {    public static void main(String[] args) {        final CountDownLatch latch = new CountDownLatch(2);        new Thread(() -> {            try{                System.out.println(Thread.currentThread().getName()+"正在准备执行");                Thread.sleep(3000);                latch.countDown();                System.out.println(Thread.currentThread().getName() +"执行完成");            }catch (Exception e) {                e.printStackTrace();            }        }).start();        new Thread(() -> {            try{                System.out.println(Thread.currentThread().getName()+"正在准备执行");                Thread.sleep(3000);                latch.countDown();                System.out.println(Thread.currentThread().getName() +"执行完成");            }catch (Exception e) {                e.printStackTrace();            }        }).start();        try{            System.out.println("等待两个线程执行结束");            latch.await();            System.out.println("两个子线程执行结束");            System.out.println("继续执行主线程");        }catch (InterruptedException e){            e.printStackTrace();        }    }}

这个例子是在main线程执行一半后挂起,thread1 和thread2 执行,执行到countDown方法将计数器减为0之后,main方法被唤起,继续执行。

我们先来分析await方法,它的调用执行顺序

CountDownLatch(await) 

-->AQS(acquireSharedInterruptibly(1))

--->CountDownLatch(Sync tryAcquireShared(1))   AQS中state=2,返回 -1

--->AQS(doAcquireSharedInterruptibly)

--->AQS(addWaiter) 

此时链表当中的数据是这样的 head(waitstatus=0, thread=null, next=tail,) tail(waitStatus=0,thread=main,pre=head)

--->AQS(shouldParkAfterFailedAcquire) 

这个方法执行后,将head当中waitStatus改为 SIGNAL

链表结构变为head(waitstatus=-1, thread=null, next=tail,) tail(waitStatus=0,thread=main,pre=head)

---->AQS(parkAndCheckInteruput) 挂起main线程

接下来 Thread1 执行countDown方法,我们来看下执行流程

CountDownLatch(countDown)

--->AQS(releaseShared)

--->CountDownLatch(tryReleaseShared) 将AQS中state 减 1,返回false 整个流程结束

接下下来看Thread2 执行countDown方法

CountDownLatch(countDown)

--->AQS(releaseShared)

--->CountDownLatch(tryReleaseShared)  AQS中的state减为0,返回true

---->AQS(doRelaseShared)  将head节点waitStatus更新为0,

---->AQS(unparkSuccessor)  唤醒 head的节点的next节点,main线程执行

唤醒main线程之后,在main线程被挂起的地方继续往下执行 parkAndCheckInterrupt()方法中

AQS(doAcquireSharedInterruptibly)

--->CountDownLatch(tryAcquireShared) 这个时候返回true

---->AQS(setHeadAndPropagate) 将包含main线程的节点设置成head节点,

---->AQS(doReleaseShared)  将包含main线程的节点状态改为PROPAGATE=-3 整个流程执行结束

如有问题欢迎指正~

转载地址:http://wlvti.baihongyu.com/

你可能感兴趣的文章
idea添加gradle模块报错The project is already registered
查看>>
在C++中如何实现模板函数的外部调用
查看>>
在C++中,关键字explicit有什么作用
查看>>
C++中异常的处理方法以及使用了哪些关键字
查看>>
如何定义和实现一个类的成员函数为回调函数
查看>>
内存分配的形式有哪些? C++
查看>>
什么是内存泄露,如何避免内存泄露 C++
查看>>
栈和堆的空间大小 C++
查看>>
什么是缓冲区溢出 C++
查看>>
sizeof C++
查看>>
使用指针有哪些好处? C++
查看>>
引用还是指针?
查看>>
checkio-non unique elements
查看>>
checkio-medium
查看>>
checkio-house password
查看>>
checkio-moore neighbourhood
查看>>
checkio-the most wanted letter
查看>>
Redis可视化工具
查看>>
吃一堑长一智!2021年字节跳动、阿里等大厂最全Android面试题,安卓系列学习进阶视频
查看>>
大牛手把手带你!2021新一波程序员跳槽季,全套教学资料
查看>>