Подтвердить что ты не робот

Параллельная установка очереди

Возможно, это глупый вопрос, но я не могу найти очевидного ответа.

Мне нужна параллельная очередь FIFO, содержащая только уникальные значения. Попытка добавить значение, которое уже существует в очереди, просто игнорирует это значение. Что, если бы не безопасность потока, было бы тривиально. Существует ли структура данных на Java или, возможно, код snipit на межстраничных сетях, которые проявляют это поведение?

4b9b3361

Ответ 1

Если вам нужна более качественная concurrency, чем полная синхронизация, есть один способ, которым я это знаю, используя ConcurrentHashMap в качестве карты поддержки. Ниже приведен только эскиз.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

Все это не солнечный свет с таким подходом. У нас нет подходящего способа выбора элемента head, кроме использования карты поддержки entrySet().iterator().next(), в результате чего карта становится все более неуравновешенной с течением времени. Этот дисбаланс является проблемой как из-за больших столкновений ковша, так и с большей конкуренцией сегмента.

Примечание: этот код использует Guava в нескольких местах.

Ответ 2

Там нет встроенной коллекции, которая делает это. Существуют несколько параллельных реализаций Set, которые могут использоваться вместе с параллельным Queue.

Например, элемент добавляется в очередь только после того, как он был успешно добавлен в набор, и каждый элемент, удаленный из очереди, удаляется из набора. В этом случае содержимое очереди логически является тем, что находится в наборе, и очередь используется только для отслеживания заказа и обеспечения эффективных операций take() и poll(), найденных только на BlockingQueue.

Ответ 3

A java.util.concurrent.ConcurrentLinkedQueue предоставляет вам большую часть пути.

Оберните ConcurrentLinkedQueue со своим собственным классом, который проверяет уникальность добавления. Ваш код должен быть потокобезопасным.

Ответ 4

Я бы использовал синхронизированный LinkedHashSet, пока не было достаточного обоснования для рассмотрения альтернатив. Основное преимущество, которое может предложить более параллельное решение, - это разделение блокировок.

Простейшим параллельным подходом будет ConcurrentHashMap (действующий как набор) и ConcurrentLinkedQueue. Порядок операций обеспечил бы необходимое ограничение. Предложение() сначала должно выполнить CHM # putIfAbsent() и, если успешно вставить в CLQ. Опрос() будет выведен из CLQ, а затем удалить его из CHM. Это означает, что мы рассматриваем запись в нашей очереди, если она находится на карте, а CLQ обеспечивает упорядочение. Затем производительность может быть скорректирована путем увеличения сопоставления карты. Если вы терпимы к дополнительной уверенности, тогда дешевый CHM # get() может действовать как разумное предварительное условие (но он может пострадать, будучи слегка устаревшим).

Ответ 5

Что вы подразумеваете под параллельной очередью с помощью Set semantics? Если вы имеете в виду подлинно параллельную структуру (в отличие от потокобезопасной структуры), я бы утверждал, что вы просите пони.

Что происходит, например, если вы вызываете put(element) и обнаруживаете, что что-то уже есть, которое немедленно удаляется? Например, что это значит в вашем случае, если offer(element) || queue.contains(element) возвращает false?

Этим вещам часто нужно думать немного по-другому в параллельном мире, так как часто ничего не происходит, как кажется, если вы не остановите мир (заблокируйте его). В противном случае вы обычно смотрите на что-то в прошлом. Итак, что вы на самом деле пытаетесь сделать?

Ответ 6

Возможно расширение ArrayBlockingQueue. Чтобы получить доступ к блокировке доступа к пакетам, мне пришлось поместить мой подкласс в один и тот же пакет. Предостережение: я не проверял это.

package java.util.concurrent;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {

    public DeDupingBlockingQueue(int capacity) {
        super(capacity);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair) {
        super(capacity, fair);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        super(capacity, fair, c);
    }

    @Override
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return false;
            return super.add(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //Should this be lock.lock() instead?
        try {
            if (contains(e)) return;
            super.put(e); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }
}

Ответ 7

Простой ответ для очереди уникальных объектов может быть следующим:

import java.util.concurrent.ConcurrentLinkedQueue;

public class FinalQueue {

    class Bin {
        private int a;
        private int b;

        public Bin(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int hashCode() {
            return a * b;
        }

        public String toString() {
            return a + ":" + b;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Bin other = (Bin) obj;
            if ((a != other.a) || (b != other.b))
                return false;
            return true;
        }
    }

    private ConcurrentLinkedQueue<Bin> queue;

    public FinalQueue() {
        queue = new ConcurrentLinkedQueue<Bin>();
    }

    public synchronized void enqueue(Bin ipAddress) {
        if (!queue.contains(ipAddress))
            queue.add(ipAddress);
    }

    public Bin dequeue() {
        return queue.poll();
    }

    public String toString() {
        return "" + queue;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        FinalQueue queue = new FinalQueue();
        Bin a = queue.new Bin(2,6);

        queue.enqueue(a);
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(14, 3));
        queue.enqueue(queue.new Bin(13, 9));
        queue.enqueue(queue.new Bin(18, 3));
        queue.enqueue(queue.new Bin(14, 7));
        Bin x= queue.dequeue();
        System.out.println(x.a);
        System.out.println(queue.toString());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println(queue.toString());
    }
}