Защелка, которая может быть увеличена - программирование
Подтвердить что ты не робот

Защелка, которая может быть увеличена

Кто-нибудь знает, есть ли какая-либо реализация защелки, которая делает следующее:

  • имеет метод уменьшения значения защелки или ждет, если значение равно нулю
  • имеет метод ожидания нулевого значения защелки
  • имеет метод добавления числа к значению фиксации
4b9b3361

Ответ 1

Вместо того, чтобы вернуться из AQS, вы можете использовать простую реализацию, как показано ниже. Он несколько наивен (он синхронизирован по алгоритмам без блокировки AQS), но если вы не собираетесь использовать его в довольном сценарии, это может быть достаточно хорошим.

public class CountUpAndDownLatch {
    private CountDownLatch latch;
    private final Object lock = new Object();

    public CountUpAndDownLatch(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void countDownOrWaitIfZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() == 0) {
                lock.wait();
            }
            latch.countDown();
            lock.notifyAll();
        }
    }

    public void waitUntilZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() != 0) {
                lock.wait();
            }
        }
    }

    public void countUp() { //should probably check for Integer.MAX_VALUE
        synchronized(lock) {
            latch = new CountDownLatch((int) latch.getCount() + 1);
            lock.notifyAll();
        }
    }

    public int getCount() {
        synchronized(lock) {
            return (int) latch.getCount();
        }
    }
}

Примечание. Я не тестировал его по глубине, но он, похоже, ведет себя так, как ожидалось:

public static void main(String[] args) throws InterruptedException {
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
    Runnable up = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN UP " + latch.getCount());
                latch.countUp();
                System.out.println("UP " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable downOrWait = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN DOWN " + latch.getCount());
                latch.countDownOrWaitIfZero();
                System.out.println("DOWN " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable waitFor0 = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("WAIT FOR ZERO " + latch.getCount());
                latch.waitUntilZero();
                System.out.println("ZERO " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };
    new Thread(waitFor0).start();
    up.run();
    downOrWait.run();
    Thread.sleep(100);
    downOrWait.run();
    new Thread(up).start();
    downOrWait.run();
}

Вывод:

IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0

Ответ 2

Вы также можете использовать Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
    phaser.register(); // Equivalent to countUp
    // do some work asynchronously, invoking
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete

Надеюсь, это поможет.

Ответ 3

java.util.concurrent.Semaphore, похоже, соответствует счету.

  • приобретать() или приобретать (n)
  • также приобретает() (не уверен, что я понимаю, в чем разница здесь) (*)
  • release() или релиз (n)

(*) Хорошо, нет встроенного метода, чтобы ждать, пока семафор не станет недоступным. Я полагаю, вы напишете свою собственную оболочку для acquire, которая сначала выполняет tryAcquire, и если это не срабатывает, ваше "занятое событие" (и продолжает использовать обычный acquire). Каждому нужно будет позвонить в вашу обертку. Может быть, подкласс Семафор?

Ответ 4

Для тех, кто нуждается в решении на основе AQS, здесь, который работал у меня:

public class CountLatch {

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return count.get() == releaseValue ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long releaseValue;

    public CountLatch(final long initial, final long releaseValue) {
        this.releaseValue = releaseValue;
        this.count = new AtomicLong(initial);
        this.sync = new Sync();
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public long countUp() {
        final long current = count.incrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long countDown() {
        final long current = count.decrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long getCount() {
        return count.get();
    }
}

Вы инициализируете синхронизатор с начальным и целевым значением. После достижения целевого значения (путем подсчета и/или уменьшения) потоки ожидания будут освобождены.

Ответ 5

Мне нужен был один и построил его, используя ту же стратегию, что и CountDownLatch, которая использует AQS (неблокирующий), этот класс тоже очень похож (если не точный) на один, созданный для Apache Camel, я думаю, что он также легче JDK Phaser, это будет действовать так же, как CountDownLact от JDK, это не позволит вам отсчитывать значение ниже нуля и позволит вам отсчитывать и увеличивать:

import java.util.concurrent.TimeUnit;   import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountingLatch
{
  /**
   * Synchronization control for CountingLatch.
   * Uses AQS state to represent count.
   */
  private static final class Sync extends AbstractQueuedSynchronizer
  {
    private Sync()
    {
    }

    private Sync(final int initialState)
    {
      setState(initialState);
    }

    int getCount()
    {
      return getState();
    }

    protected int tryAcquireShared(final int acquires)
    {
      return getState()==0 ? 1 : -1;
    }

    protected boolean tryReleaseShared(final int delta)
    {
      // Decrement count; signal when transition to zero
      for(; ; ){
        final int c=getState();
        final int nextc=c+delta;
        if(nextc<0){
          return false;
        }
        if(compareAndSetState(c,nextc)){
          return nextc==0;
        }
      }
    }
  }

  private final Sync sync;

  public CountingLatch()
  {
    sync=new Sync();
  }

  public CountingLatch(final int initialCount)
  {
    sync=new Sync(initialCount);
  }

  public void increment()
  {
    sync.releaseShared(1);
  }

  public int getCount()
  {
    return sync.getCount();
  }

  public void decrement()
  {
    sync.releaseShared(-1);
  }

  public void await() throws InterruptedException
  {
    sync.acquireSharedInterruptibly(1);
  }

  public boolean await(final long timeout) throws InterruptedException
  {
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
  }
}

Ответ 6

Это вариант на CounterLatch, доступный с сайта Apache.

Их версия по наиболее известным для себя причинам блокирует поток вызывающего , а переменная (AtomicInteger) имеет заданное значение.

Но это высота простоты для настройки этого кода, чтобы вы могли выбрать либо то, что версия Apache, либо... чтобы сказать "подождите здесь до, счетчик достигнет определенного значения". Вероятно, последнее будет иметь большую применимость. В моем конкретном случае я зашуршал это, потому что я хотел проверить, что все "куски" были опубликованы в SwingWorker.process()... но с тех пор я нашел для него другие варианты.

Здесь написано на Jython, официально лучший язык в мире (TM). Я собираюсь вздремнуть версию Java со временем.

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

NB версия Apache использует ключевое слово volatile для signal и released. В Jython я не думаю, что это существует как таковое, но использование AtomicInteger и AtomicBoolean должно гарантировать, что никакие значения не устарели в любом потоке.

Пример использования:

В конструкторе SwingWorker:

self.publication_counter_latch = CounterLatch() 

В SW.publish:

# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

В SW.process:

# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )

В потоке, ожидающем завершения обработки фрагмента:

worker.publication_counter_latch.await()

Ответ 7

Кажется, что CountDownLatch будет делать то, что вы пожелаете:

Функция CountDownLatch инициализируется данным счетчиком. Методы ожидания до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов countDown(), после чего все ожидающие потоки освобождаются и любые последующие вызовы ждут немедленно. Это однократное явление - счет не может быть reset. Если вам нужно версии, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html