Подтвердить что ты не робот

Неблокирующая параллельная очередь с предложением и потоком

Неблокирующая параллельная очередь с предложением и сбросом

Мне нужна неограниченная неблокирующая параллельная очередь, в основном, всего 2 операции:

  • предложение: атомарно вставляет указанный элемент в хвост этой очереди;
  • flush: берет все элементы, присутствующие в этот момент в очереди, и начинает обрабатывать их один за другим в соответствии с порядком вставки. Более конкретно то, что должно быть атомарным, - это только эта операция "takeAll", которая станет самой первой операцией флеша. Все предметы, предлагаемые в очередь после takeAll, будут вставлены, а затем обработаны только другим последующим сбросом.

Цель состоит в том, что у потребителя есть одна операция CAS на takeAll и затем может перебирать элементы в списке без прохождения операции CAS для каждого чтения. Далее мы уже владеем Node (Entry), поскольку это необходимо для хранения некоторого другого неизменяемого состояния. Новый Node может принимать HEAD как аргумент конструктора, создавая связанный список с одним направлением.

Существует ли в литературе очередь с этими характеристиками?

4b9b3361

Ответ 1

Здесь вы идете:

public class FunkyQueue<T> {
    private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>();

    public void offer(T t) {
        while(true) {
            Node<T> tail = _tail.get();
            Node<T> newTail = new Node<T>(t, tail);
            if(_tail.compareAndSet(tail, newTail)) {
                break;
            }
        }
    }

    public List<T> takeAll() {
        Node<T> tail = _tail.getAndSet(null);

        LinkedList<T> list = new LinkedList<T>();
        while(tail != null) {
            list.addFirst(tail.get());
            tail = tail.getPrevious();
        }

        return list;
    }

    private static final class Node<T>
    {
        private final T _obj;
        private Node<T> _prev;

        private Node(T obj, Node<T> prev) {
            _obj = obj;
            _prev = prev;            
        }

        public T get() {
            return _obj;
        }

        public Node<T> getPrevious() {
            return _prev;
        }
    }
}

Ответ 2

Предоставляется: хорошая реализация, для которой требуется один CAS для offer() и takeAll().

Проблема: длинное takeAll() выполнение, так как для этого требуется полный обход односвязного списка в противоположном направлении.

Решение: создать дополнительные уровни пропуска на узлах. Для указанных цифр (N ~ 100K) достаточно двух уровней, что уменьшит количество шагов в диапазоне от takeAll() до ~ 150.

Основываясь на указанной реализации, Node class:

public static final class Node<T> {

    private final T value;
    private Node<T> prev, prevL1, prevL2;
    private Node<T> next, nextL1, nextL2;

    private Node(T obj, Node<T> prev, long c) {
        value = obj;
        this.prev = prev;  
        // level 1 to skip 64 nodes, level 2 to skip 64^2 nodes
        // c is a value from some global addition counter, that
        // is not required to be atomic with `offer()`
        prevL1 = (c & (64 - 1) == 0) ? prev : prev.prevL1;
        prevL2 = (c & (64 * 64 - 1) == 0) ? prev : prev.prevL2;
    }

    public T get() {
        return value;
    }

    public Node<T> findHead() {
        // see below
    }

    public Node<T> next() {
        // see below
    }
}

FunkyQueue#offer() метод:

public void offer(T t) {
    long c = counter.incrementAndGet();  
    for(;;) {
        Node<T> oldTail = tail.get();
        Node<T> newTail = new Node<T>(t, oldTail, c);
        if (tail.compareAndSet(oldTail, newTail)) 
            break;
    }
}

FunkyQueue#getAll() теперь вернет голову списка:

public List<T> takeAll() {
    return tail.getAndSet(null).findHead();
}

Он вызывает Node#findHead(), который теперь может использовать уровни пропуска для ускорения обратного обхода:

private Node<T> findHead() {

     Node<T> n = this;
     while (n.prevL2 != null) {  // <- traverse back on L2, assigning `next` nodes
         n.prevL2.nextL2 = n;
         n = n.prevL2; 
     }
     while (n.prevL1 != null) {  // <- the same for L1
         n.prevL1.nextL1 = n;
         n = n.prev1;
     }
     while (n.prev != null) {    // <- the same for L0
         n.prev.next = n;
         n = n.prev;
     }
     return n;
}

И, наконец, Node#next():

public Node<T> next() {

    if (this.next == null && this.nextL1 == null && this.nextL2 == null)       
        throw new IllegalStateException("No such element");

    Node<T> n;
    if (this.next == null) {         // L0 is not traversed yet
        if (this.nextL1 == null) {   // the same for L1
            n = this.nextL2;         // step forward on L2
            while (n != this) {      // traverse on L1
                n.prevL1.nextL1 = n;
                n = n.prevL1;
            }
        }  
        n = this.nextL1;             // step forward on L1
        while (n != this) {          // traverse on L0
            n.prev.next = n;
            n = n.prev;
        }
    }
    return this.next;
}

Я полагаю, что основная идея понятна. Применяя некоторый рефакторинг, можно сделать Node#findHead() и, таким образом, FunkyQueue#getAll() работать в O (log N) и Node#next() в O (1) в среднем.


P.S. если вы заметите какую-либо ошибку или плохую грамматику, пожалуйста, отредактируйте.

Ответ 3

ConcurrentLinkedQueue использует алгоритм Майкл и Скотт и может быть адаптирован для обеспечения этого метода. Возвращенная коллекция будет немодифицированным видом удаленных узлов для обхода. Это будет выглядеть примерно так:

public Collection<E> drain() {
  for (;;) {
    Node<E> h = head;
    Node<E> t = tail;

    if (h == t) {
      return Collections.emptyList();
    } else if (casHead(h, t)) {
      return new CollectionView<E>(h, t);
    }
  }
}

Формирование коллекции не очень забавно, поэтому вместо <Мозес и алгоритм Шавита для оптимистичной очереди (более быстрая альтернатива). Это улучшено с помощью платформы backoff, чтобы комбинировать одновременные дополнения, чтобы уменьшить конкуренцию, вызванную несколькими производителями.