Сбрасываемый CountdownLatch

Мне нужно что-то, что прямо эквивалентно CountDownLatch, но сбрасывается (остается в потоковом режиме!). Я не могу использовать классические конструкции синхронизации, поскольку они просто не работают в этой ситуации (сложные проблемы с блокировкой). На данный момент я создаю много объектов CountDownLatch, каждый из которых заменяет предыдущий. Я считаю, что это происходит в молодом поколении в GC (из-за большого количества объектов). Вы можете увидеть код, который использует защелки ниже (это часть макета java.net для интерфейса сетевого симулятора ns-3).

Некоторые идеи могут состоять в том, чтобы попробовать CyclicBarrier (JDK5 +) или Phaser (JDK7)

Я могу проверить код и вернуться к любому, кто найдет решение этой проблемы, поскольку я единственный, кто может вставить его в запущенную систему, чтобы узнать, что происходит:)

package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

 * KSelector
 * @version 1.0
 * @author Chris Dennett
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        public SocketListener() {

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
        public void connectionSucceeded(KKSSocket socket) {
        public void connectionFailed(KKSSocket socket) {
        public void dataSent(KKSSocket socket, long bytesSent) {
        public void sendCB(KKSSocket socket, long bytesAvailable) {
        public void onRecv(KKSSocket socket) {
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
        public void normalClose(KKSSocket socket) {
        public void errorClose(KKSSocket socket) {

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);

     * @param provider
    protected KSelector(SelectorProvider provider) {

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())

    protected void implCloseInterrupt() {

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                return false;

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
            return notify;

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);

        return notify;

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
    protected int doSelect(long timeout) throws IOException {

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            try {

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                            latch = currListener.newLatch();
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                            } else {
                        } catch (InterruptedException e) {

                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);


    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread = null;
        return this;



Я скопировал CountDownLatch и реализовал метод reset(), который сбрасывает внутренний класс Sync в исходное состояние (начальный счет):) Появляется, чтобы работать нормально. Нет необходимости создавать ненужные объекты\o/Подкласс не был доступен, потому что Sync был закрытым. Boo.

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class ResettableCountDownLatch {
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;

        int getCount() {
            return getState();

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;

        public void reset() {

    private final Sync sync;

    public void reset() {

Основываясь на ответе @Fidel, я сделал замену для ResettableCountDownLatch. Изменения, которые я сделал

  • mLatch private volatile
  • mInitialCount private final
  • возвращаемый тип простого await() изменился на void.

В противном случае исходный код тоже классный. Итак, это полный расширенный код:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);

    public void reset() {
        latch = new CountDownLatch(initialCount);

    public void countDown() {

    public void await() throws InterruptedException {

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);


На основе комментария @Systemplanet - это более безопасная версия reset():

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {

В принципе, это выбор между простотой и безопасностью. То есть если вы готовы перенести ответственность на клиента своего кода, то достаточно установить ссылку null в reset().

С другой стороны, если вы хотите, чтобы это стало проще для пользователей этого кода, вам нужно использовать немного больше трюков.

Phaser имеет больше опций, мы можем реализовать сброс countdownLatch, используя это.

Прочтите ниже основные понятия со следующих сайтов



import java.util.concurrent.Phaser;
 * Resettable countdownLatch using phaser
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        //register is one time call for all the phases.
        //means no need to register for every phase             

        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);


    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " after passing barrier");

Другая замена замены

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);

    public void countDown() {

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;