VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • CountDownLatch能不能在多个线程上添加await?

CountDownLatch类的使用过程中,发现了一个很奇怪的现象:

	CountDownLatch countDownLatch = new CountDownLatch(2);
		
		Runnable taskMain = () -> {
			try { 
				countDownLatch.await();   // 等待唤醒
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("继续执行任务");
		};
		Runnable taskMain1 = () -> {
			try { 
				countDownLatch.await();  // 等待唤醒
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("继续执行任务2");
		};
		
		Runnable task1 = () -> {
			countDownLatch.countDown();   // 计数器 -1 
			System.out.println("前置任务1完成");
		};
		
		Runnable task2 = () -> {
			countDownLatch.countDown();  // 计数器 -1 
			System.out.println("前置任务2完成");
		};
		
		new Thread(taskMain).start();
		new Thread(taskMain1).start();
		new Thread(task1).start();
		new Thread(task2).start();

在这个地方使用了两个await,希望在两个前置线程执行完成之后再执行剩下的两个线程。但是结果有点特别:

继续执行任务
前置任务1完成
前置任务2完成
继续执行任务2

我发现taskMain的任务首先被执行了。

按照逻辑来说,在第一个await执行中,由于此时AQSstate值等于计数器设置的count值2,state必须等于0时他才能拿到锁,所以此时他被挂起。第二个也是如此,那么为什么会出现第一个await被执行了呢?

我从头捋一遍代码,当第一个await被调用后:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }
    
    // 这是在CountDownLatch中复写的方法,忘了方便观看放到一起了。
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

因为此时state值为2,所以tryAcquireShared(arg) < 0成立,此时会调用acquire尝试去获取锁。

 /**
     * Main acquire method, invoked by all exported acquire methods.
     *
     * @param node null unless a reacquiring Condition
     * @param arg the acquire argument
     * @param shared true if shared mode else exclusive
     * @param interruptible if abort and return negative on interrupt
     * @param timed if true use timed waits
     * @param time if timed, the System.nanoTime value to timeout
     * @return positive if acquired, 0 if timed out, negative if interrupted
     */
    final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

由此可以看出第一次运行之后,创建节点加入到CLH队列中,然后被LockSupport.park(this)挂起。这部分跟我预想的一样。第二次也是这样,按理说没什么区别啊。

我不信邪,又重新试了一遍。这次我添加了计数器的个数(15),并且每次都输出当前的state值。

前置任务b完成:13
前置任务a完成:13
前置任务a完成:12
前置任务b完成:11
前置任务a完成:10
前置任务b完成:9
前置任务a完成:8
前置任务b完成:7
前置任务a完成:6
前置任务b完成:5
前置任务a完成:4
前置任务b完成:3
前置任务a完成:2
前置任务b完成:1
继续执行任务c:0
继续执行任务c:0
继续执行任务c:0
继续执行任务d:0
继续执行任务c:0
继续执行任务d:0
继续执行任务d:0
继续执行任务d:0
继续执行任务c:0
继续执行任务d:0
继续执行任务d:0

误会,嘿嘿。CountDownLatch果然可以在多个线程上添加await。

出处:https://www.cnblogs.com/ljunn/p/15185792.html#/c/subject/p/15185792.html
 


相关教程