Подтвердить что ты не робот

Как закоротить операцию reduce() на потоке?

По сути, это тот же вопрос, что и Как уменьшить короткое замыкание в потоке?. Однако, поскольку этот вопрос сфокусирован на потоке логических значений, и его ответ не может быть обобщен для других типов и операций сокращения, я хотел бы задать более общий вопрос.

Как мы можем уменьшить поток, чтобы он закорачивался, когда он сталкивался с поглощающим элементом для операции сокращения?

Типичный математический случай будет 0 для умножения. Это Stream:

int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .reduce(1, (a, b) -> a * b);

будет использовать два последних элемента (7 и 8) независимо от того, что после обнаружения 0 продукт известен.

4b9b3361

Ответ 1

К сожалению, Stream API имеет ограниченные возможности для создания собственных операций короткого замыкания. Не очень чистым решением было бы бросить RuntimeException и поймать его. Вот реализация для IntStream, но она может быть обобщена и для других типов потоков:

public static int reduceWithCancelEx(IntStream stream, int identity, 
                      IntBinaryOperator combiner, IntPredicate cancelCondition) {
    class CancelException extends RuntimeException {
        private final int val;

        CancelException(int val) {
            this.val = val;
        }
    }

    try {
        return stream.reduce(identity, (a, b) -> {
            int res = combiner.applyAsInt(a, b);
            if(cancelCondition.test(res))
                throw new CancelException(res);
            return res;
        });
    } catch (CancelException e) {
        return e.val;
    }
}

Пример использования:

int product = reduceWithCancelEx(
        IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 
        1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);

Выход:

2
3
4
5
0
Result: 0

Обратите внимание, что хотя он работает с параллельными потоками, он не гарантирует, что другие параллельные задачи будут завершены, как только одна из них выдаст исключение. Уже запущенные подзадачи, вероятно, будут выполняться до конца, поэтому вы можете обработать больше элементов, чем ожидалось.

Обновление: альтернативное решение, которое намного длиннее, но более дружественно к параллельным. Он основан на пользовательском разделителе, который возвращает не более одного элемента, который является результатом накопления всех базовых элементов). Когда вы используете его в последовательном режиме, он выполняет всю работу за один вызов tryAdvance. Когда вы разделяете его, каждая часть генерирует соответствующий частичный результат, который сокращается механизмом Stream с использованием функции объединения. Здесь общая версия, но возможна и примитивная специализация.

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
        Consumer<T>, Cloneable {
    private Spliterator<T> source;
    private final BiFunction<A, ? super T, A> accumulator;
    private final Predicate<A> cancelPredicate;
    private final AtomicBoolean cancelled = new AtomicBoolean();
    private A acc;

    CancellableReduceSpliterator(Spliterator<T> source, A identity,
            BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
        this.source = source;
        this.acc = identity;
        this.accumulator = accumulator;
        this.cancelPredicate = cancelPredicate;
    }

    @Override
    public boolean tryAdvance(Consumer<? super A> action) {
        if (source == null || cancelled.get()) {
            source = null;
            return false;
        }
        while (!cancelled.get() && source.tryAdvance(this)) {
            if (cancelPredicate.test(acc)) {
                cancelled.set(true);
                break;
            }
        }
        source = null;
        action.accept(acc);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super A> action) {
        tryAdvance(action);
    }

    @Override
    public Spliterator<A> trySplit() {
        if(source == null || cancelled.get()) {
            source = null;
            return null;
        }
        Spliterator<T> prefix = source.trySplit();
        if (prefix == null)
            return null;
        try {
            @SuppressWarnings("unchecked")
            CancellableReduceSpliterator<T, A> result = 
                (CancellableReduceSpliterator<T, A>) this.clone();
            result.source = prefix;
            return result;
        } catch (CloneNotSupportedException e) {
            throw new InternalError();
        }
    }

    @Override
    public long estimateSize() {
        // let pretend we have the same number of elements
        // as the source, so the pipeline engine parallelize it in the same way
        return source == null ? 0 : source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source == null ? SIZED : source.characteristics() & ORDERED;
    }

    @Override
    public void accept(T t) {
        this.acc = accumulator.apply(this.acc, t);
    }
}

Методы, аналогичные Stream.reduce(identity, accumulator, combiner) и Stream.reduce(identity, combiner), но с cancelPredicate:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
        BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
        Predicate<U> cancelPredicate) {
    return StreamSupport
            .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
                    accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
            .orElse(identity);
}

public static <T> T reduceWithCancel(Stream<T> stream, T identity,
        BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}

Давайте протестируем обе версии и посчитаем, сколько элементов фактически обработано. Пусть положить 0 близко к концу. Исключительная версия:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()), 1,
        (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

Типичный вывод:

product: 0/count: 281721
product: 0/count: 500001

Таким образом, хотя результат возвращается, когда обрабатываются только некоторые элементы, задачи продолжают работать в фоновом режиме, а счетчик продолжает увеличиваться. Вот версия сплитератора:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()).boxed(), 
                1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

Типичный вывод:

product: 0/count: 281353
product: 0/count: 281353

Все задачи фактически завершены, когда возвращается результат.

Ответ 2

Общий метод статического уменьшения короткого замыкания может быть реализован с использованием разделителя потока. Это даже оказалось не очень сложным! Использование spliterators, кажется, способ пойти много раз, когда человек хочет работать с парами более гибким способом.

public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {
    BoxConsumer<T> box = new BoxConsumer<T>();
    Spliterator<T> splitr = s.spliterator();

    while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {
        acc = op.apply(acc, box.value);
    }

    return acc;
}

public static class BoxConsumer<T> implements Consumer<T> {
    T value = null;
    public void accept(T t) {
        value = t;
    }
}

Использование:

    int product = reduceWithCancel(
        Stream.of(1, 2, 0, 3, 4).peek(System.out::println),
        1, (acc, i) -> acc * i, i -> i == 0);

    System.out.println("Result: " + product);

Вывод:

1
2
0
Result: 0

Этот метод может быть обобщен для выполнения других видов терминальных операций.

Это основано на этом ответе о выполнении операции.

Я ничего не знаю о возможности параллелизации этого.

Ответ 3

Мой собственный подход заключается в том, чтобы не использовать reduce() как таковой, но использовать существующую заключительную операцию короткого замыкания.

noneMatch() или allMatch() могут использоваться для этого при использовании предиката с побочным эффектом. По общему признанию, это также не самое чистое решение, но оно достигает цели:

AtomicInteger product = new AtomicInteger(1);
IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .peek(System.out::println)
        .noneMatch(i -> {
            if (i == 0) {
                product.set(0);
                return true;
            }
            int oldValue = product.get();
            while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) {
                oldValue = product.get();
            }
            return oldValue == 0;
        });
System.out.println("Result: " + product.get());

Это короткое замыкание и может быть выполнено параллельно.