Подтвердить что ты не робот

Круглый буфер в Java

Рассмотрим несколько экземпляров веб-сервера, работающих параллельно. Каждый сервер содержит ссылку на один общий "Хранитель состояния", роль которого поддерживает последние запросы N со всех серверов.

Например (N=3):

Server a: "Request id = ABCD"        Status keeper=["ABCD"]
Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]

В любой момент времени из приложения мониторинга может вызываться "Хранитель состояния", который читает эти последние N запросы для отчета SLA.

Какой лучший способ реализовать этот сценарий производителя-потребителя в Java, придавая веб-серверам более высокий приоритет, чем отчет SLA?

CircularFifoBuffer кажется подходящей структурой данных для хранения запросов, но я не уверен, какой оптимальный способ реализовать эффективные concurrency.

4b9b3361

Ответ 1

Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());

Ответ 2

Здесь выполняется блокировка кольцевого буфера. Он реализует буфер фиксированного размера - нет функциональности FIFO. Я бы предложил вам сохранить Collection запросов для каждого сервера. Таким образом, ваш отчет может фильтровать, а не фильтровать вашу структуру данных.

/**
 * Container
 * ---------
 * 
 * A lock-free container that offers a close-to O(1) add/remove performance.
 * 
 */
public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // TESTING {
  AtomicLong totalAdded = new AtomicLong(0);
  AtomicLong totalFreed = new AtomicLong(0);
  AtomicLong totalSkipped = new AtomicLong(0);

  private void resetStats() {
    totalAdded.set(0);
    totalFreed.set(0);
    totalSkipped.set(0);
  }
  // TESTING }

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    totalAdded.incrementAndGet();
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // Keep count of skipped.
    totalSkipped.addAndGet(skipped);
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    totalFreed.incrementAndGet();
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    public T get () {
      return element;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      // Made into a `while` loop to fix issue reported by @Nim in code review
      while (next == null && limit > 0) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

Обратите внимание, что это близко к O1 put и get. A Separator просто испускает "первый раз", а затем его параметр с этого момента.

Изменить: добавлены методы тестирования.

// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");

private static synchronized void log(boolean toStdoutToo, String s) {
  if (Debug) {
    if (toStdoutToo) {
      System.out.println(s);
    }
    log(s);
  }
}

private static synchronized void log(String s) {
  if (Debug) {
    try {
      log.writeLn(logName, s);
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}
static volatile boolean testing = true;

// Tester object to exercise the container.
static class Tester<T> implements Runnable {
  // My name.

  T me;
  // The container I am testing.
  Container<T> c;

  public Tester(Container<T> container, T name) {
    c = container;
    me = name;
  }

  private void pause() {
    try {
      Thread.sleep(0);
    } catch (InterruptedException ex) {
      testing = false;
    }
  }

  public void run() {
    // Spin on add/remove until stopped.
    while (testing) {
      // Add it.
      Node<T> n = c.add(me);
      log("Added " + me + ": " + c.toString());
      pause();
      // Remove it.
      c.remove(n, me);
      log("Removed " + me + ": " + c.toString());
      pause();
    }
  }
}
static final String[] strings = {
  "One", "Two", "Three", "Four", "Five",
  "Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);

public static void main(String[] args) throws InterruptedException {
  Debug = true;
  log.delete(logName);
  Container<String> c = new Container<String>(10);

  // Simple add/remove
  log(true, "Simple test");
  Node<String> it = c.add(strings[0]);
  log("Added " + c.toString());
  c.remove(it, strings[0]);
  log("Removed " + c.toString());

  // Capacity test.
  log(true, "Capacity test");
  ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  // Fill it.
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  // Add one more.
  try {
    c.add("Wafer thin mint!");
  } catch (IllegalStateException ise) {
    log("Full!");
  }
  c.clear();
  log("Empty: " + c.toString());

  // Iterate test.
  log(true, "Iterator test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
  }
  StringBuilder all = new StringBuilder ();
  Separator sep = new Separator(",");
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("All: "+all);
  for (int i = 0; i < strings.length; i++) {
    c.remove(nodes.get(i), strings[i]);
  }
  sep.reset();
  all.setLength(0);
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("None: " + all.toString());

  // Multiple add/remove
  log(true, "Multi test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  log("Filled " + c.toString());
  for (int i = 0; i < strings.length - 1; i++) {
    c.remove(nodes.get(i), strings[i]);
    log("Removed " + strings[i] + " " + c.toString());
  }
  c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  log("Empty " + c.toString());

  // Multi-threaded add/remove
  log(true, "Threads test");
  c.clear();
  for (int i = 0; i < TEST_THREADS; i++) {
    Thread t = new Thread(new Tester<String>(c, strings[i]));
    t.setName("Tester " + strings[i]);
    log("Starting " + t.getName());
    t.start();
  }
  // Wait for 10 seconds.
  long stop = System.currentTimeMillis() + 10 * 1000;
  while (System.currentTimeMillis() < stop) {
    Thread.sleep(100);
  }
  // Stop the testers.
  testing = false;
  // Wait some more.
  Thread.sleep(1 * 100);
  // Get stats.
  double added = c.totalAdded.doubleValue();
  double skipped = c.totalSkipped.doubleValue();
  //double freed = c.freed.doubleValue();
  log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}

Ответ 3

Я бы посмотрел на ArrayDeque или для более параллельной реализации рассмотрим библиотеку Disruptor, которая является одной из самый сложный/сложный кольцевой буфер в Java.

Альтернативой является использование неограниченной очереди, которая является более параллельной, поскольку продюсеру не нужно ждать потребителя. Java Chronicle

Если ваши потребности не оправдывают сложность, ArrayDeque может быть всем, что вам нужно.

Ответ 4

Возможно, вы хотите посмотреть Disruptor - параллельная платформа программирования.

  • Найдите документ, описывающий альтернативы, дизайн, а также сравнение производительности с java.util.concurrent.ArrayBlockingQueue здесь: pdf
  • Подумайте, прочитайте первые три статьи из BlogsAndArticles

Если библиотека слишком большая, придерживайтесь java.util.concurrent.ArrayBlockingQueue

Ответ 5

Также посмотрите java.util.concurrent.

Блокирующие очереди будут блокироваться до тех пор, пока не будет что-то потребляемое или (необязательно) пространство для создания:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

Параллельная связанная очередь не блокируется и использует алгоритм slick, который позволяет производителю и потребителю быть активным одновременно:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Ответ 6

Hazelcast Queue предлагает почти все, что вы просите, но не поддерживает округлость. Но из вашего описания я не уверен, действительно ли вам это нужно.

Ответ 7

Если бы это был я, я бы использовал CircularFIFOBuffer, как вы указали, и синхронизировать вокруг буфера при записи (добавить). Когда приложение мониторинга хочет прочитать буфер, синхронизируется в буфере, а затем копирует или клонирует его для использования.

Это предположение основывается на предположении, что латентность минимальна для копирования/клонирования буфера на новый объект. Если есть большое количество элементов, а время копирования медленное, то это не очень хорошая идея.

Пример псевдокода:

public void writeRequest(String requestID) {
    synchronized(buffer) {
       buffer.add(requestID);
    }
}

public Collection<String> getRequests() {
     synchronized(buffer) {
        return buffer.clone();
     }
}