Подтвердить что ты не робот

Stream API и Queues: подпишитесь на StreamingQueue stream-style

Скажем, у нас есть Очередь

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

и какой-то другой поток помещает в него значения, тогда мы читаем его как

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

Как я могу перебирать эту очередь в стиле потока, сохраняя при этом подобную семантику выше кода.

Этот код проходит только текущее состояние очереди:

queue.stream().forEach(e -> System.out.println(e));
4b9b3361

Ответ 1

Я догадываюсь о том, чего вы ожидаете, но я думаю, что у меня есть хорошая догадка.

Поток очереди, как итерация по очереди, представляет текущее содержимое очереди. Когда итератор или поток достигает хвоста очереди, он не блокирует ожидающие добавления дополнительных элементов. Итератор или поток исчерпаны в этой точке, и вычисление завершается.

Если вам нужен поток, состоящий из всех текущих и будущих элементов очереди, вы можете сделать что-то вроде этого:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(К сожалению, необходимость обрабатывать InterruptedException делает это довольно грязным.)

Обратите внимание, что нет возможности закрыть очередь, и нет возможности для Stream.generate прекратить генерировать элементы, поэтому это фактически бесконечный поток. Единственный способ его прекратить - это операция короткого замыкания, такая как findFirst.

Ответ 2

Вы можете посмотреть реализацию async Queue. Если у вас есть Java 8, то cyclops-react, я разработчик в этом проекте предоставляет async.Queue, который позволит вам как заполнять, так и потребляют из очереди асинхронно (и чисто).

например.

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

Или просто (до тех пор, пока это com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

Затем в отдельном потоке:

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

Вернитесь в основной поток, ваш исходный код должен теперь работать как ожидалось (бесконечно перебирать сообщения, добавляемые в очередь и распечатывать их)

queue.stream().forEach(e -> System.out.println(e));

Очередь и, следовательно, поток могут быть закрыты на любом этапе через -

queue.close();

Ответ 3

Другой подход заключается в создании пользовательского Spliterator. В моем случае у меня есть блокирующая очередь, и я хочу создать поток, который продолжает извлекать элементы, пока блок не истечет. Разделитель выглядит примерно так:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

Исключение, созданное для обработки InterruptedException, является расширением RuntimeException. Используя этот класс, можно построить поток через:   StreamSupport.stream(новый QueueSpliterator (...)) и добавьте обычные операции потока.