Подтвердить что ты не робот

Зарегистрировать привязку "завершение потока"

Используя API Java 8 Stream, я хотел бы зарегистрировать "крюк завершения" по строкам:

Stream<String> stream = Stream.of("a", "b", "c");

// additional filters / mappings that I don't control
stream.onComplete((Completion c) -> {
    // This is what I'd like to do:
    closeResources();

    // This might also be useful:
    Optional<Throwable> exception = c.exception();
    exception.ifPresent(e -> throw new ExceptionWrapper(e));
});

Причина, по которой я хочу сделать это, - это то, что я хочу, чтобы ресурс в Stream для клиентов API потреблял, и я хочу, чтобы Stream автоматически очищал ресурс после его потребления. Если это было возможно, клиент мог бы позвонить:

Collected collectedInOneGo =
Utility.something()
       .niceLookingSQLDSL()
       .moreDSLFeatures()
       .stream()
       .filter(a -> true)
       .map(c -> c)
       .collect(collector);

Вместо того, что нужно сейчас:

try (Stream<X> meh = Utility.something()
                            .niceLookingSQLDSL()
                            .moreDSLFeatures()
                            .stream()) {

    Collected collectedWithUglySyntacticDissonance =
    meh.filter(a -> true)
       .map(c -> c)
       .collect(collector);
}

В идеале я хотел бы войти в java.util.stream.ReferencePipeline различные методы, такие как:

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    try {

        // Existing loop
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

    // These would be nice:
    catch (Throwable t) {
        completion.onFailure(t);
    }
    finally {
        completion.onSuccess();
    }
}

Есть ли простой способ сделать это с помощью существующего API JDK 8?

4b9b3361

Ответ 1

Любое решение, перехватывающее терминальные операции, кроме решения на основе flatMap (как было предложено @Holger), будет хрупким для следующего кода:

Stream<String> stream = getAutoCloseableStream();
if(stream.iterator().hasNext()) {
    // do something if stream is non-empty
}

Такое использование абсолютно законно по спецификации. Не забывайте, что iterator() и spliterator() - это операции с терминальным потоком, но после их выполнения вам по-прежнему нужен доступ к источнику потока. Также совершенно верно отказаться от Iterator или Spliterator в любом состоянии, поэтому вы просто не можете знать, будет ли оно использоваться дальше или нет.

Вы можете подумать о том, чтобы рекомендовать пользователям не использовать iterator() и spliterator(), но как насчет этого кода?

Stream<String> stream = getAutoCloseableStream();
Stream.concat(stream, Stream.of("xyz")).findFirst();

Это внутренне использует spliterator().tryAdvance() для первого потока, затем оставляет его (хотя и закрывается, если результирующий поток close() вызывается явно). Вам нужно будет попросить своих пользователей не использовать Stream.concat. И насколько я знаю внутри вашей библиотеки, вы часто используете iterator()/spliterator(), поэтому вам нужно будет пересмотреть все эти места для возможных проблем. И, конечно же, есть много других библиотек, которые также используют iterator()/spliterator() и могут после этого коротко замыкаться: все они станут несовместимыми с вашей функцией.

Почему решение на основе flatMap работает здесь? Поскольку при первом вызове hasNext() или tryAdvance() он выгружает содержимое потока целиком в промежуточный буфер и закрывает исходный источник потока. Таким образом, в зависимости от размера потока вы можете тратить много промежуточной памяти или даже OutOfMemoryError.

Вы также можете рассмотреть возможность сохранения PhantomReference объектов Stream и мониторинга ReferenceQueue. В этом случае завершение будет инициировано сборщиком мусора (что также имеет некоторые недостатки).

В заключение мой совет - остаться с try-with-resources.

Ответ 2

Самое простое решение - обернуть поток в другом потоке и плоскую карту для себя:

// example stream
Stream<String> original=Stream.of("bla").onClose(()->System.out.println("close action"));

// this is the trick
Stream<String> autoClosed=Stream.of(original).flatMap(Function.identity());

//example op
int sum=autoClosed.mapToInt(String::length).sum();
System.out.println(sum);

Причина, по которой это работает, заключается в flatMap operation:

Каждый отображаемый поток закрывается после того, как его содержимое было помещено в этот поток.

Но есть недостатки. Учитывая текущую реализацию, он не работает настолько надежно, как оператор try(…) в исключительном случае. Кроме того, текущая реализация не является ленивой, как это должно быть при использовании flatMap.


Моя рекомендация состоит в том, чтобы остаться с стандартным решением try(…) и документом, когда возвращаемый поток нужно закрыть. В конце концов, поток, который закрывает ресурс после операции терминала, не является безопасным, поскольку нет гарантии, что клиент фактически вызовет операцию терминала. Изменение его разума и отказ от потока мгновенного действия является допустимым использованием, тогда как не вызов метода close(), когда документация указывает, что это требуется, не является.

Ответ 3

В Java 8 уже есть прецедент того, как работают потоки, которые должны быть закрыты. В своем Javadoc он упоминает:

Потоки имеют метод BaseStream.close() и реализуют AutoCloseable, но почти все потоковые экземпляры фактически не нужно закрывать после использования. Как правило, только потоки, источник которых является каналом ввода-вывода (например, те, которые возвращаются файлами Files.lines(Path, Charset)), требуют закрытия. Большинство потоков поддерживаются коллекциями, массивами или генерирующими функциями, которые не требуют специального управления ресурсами. (Если поток требует закрытия, он может быть объявлен как ресурс в инструкции try-with-resources.)

Итак, рекомендация Java 8 заключается в том, чтобы открыть эти потоки в try-with-resources. И как только вы это сделаете, Stream также предоставит вам возможность добавить близкий крюк почти так же, как вы описали: onClose(Runnable), который принимает lambda, сообщив ему, что делать и возвращает Stream, который также выполнит эту операцию, когда он будет закрыт.

То, как дизайн и документация API предлагает делать то, что вы пытаетесь сделать.

Ответ 4

Решение, которое я придумал, выглядит следующим образом:

class AutoClosingStream<T> implements Stream<T> {

    AutoClosingStream(Stream<T> delegate, Consumer<Optional<Throwable>> onComplete) {}

    // Pipeline ops delegate the op to the real stream and wrap that again
    @Override
    public Stream<T> limit(long maxSize) {
        return new AutoClosingStream(delegate.limit(maxSize), onComplete);
    }

    // Terminal ops intercept the result and call the onComplete logic
    @Override
    public void forEach(Consumer<? super T> action) {
        terminalOp(() -> delegate.forEach(action));
    }

    private void terminalOp(Runnable runnable) {
        terminalOp(() -> { runnable.run(); return null; });
    }

    private <R> R terminalOp(Supplier<R> supplier) {
        R result = null;

        try {
            result = supplier.get();
            onComplete.accept(Optional.empty());
        }
        catch (Throwable e) {
            onComplete.accept(Optional.of(e));
            Utils.sneakyThrow(e);
        }

        return result;
    }
}

Это лишь упрощенная схема, иллюстрирующая идею. Реальное решение также поддерживало бы примитивные IntStream, LongStream и DoubleStream