Подтвердить что ты не робот

Закрытие потоков в середине трубопроводов

Когда я выполняю этот код, который открывает много файлов во время конвейера потока:

public static void main(String[] args) throws IOException {
    Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
            100, (path, attr) -> path.toString().endsWith(".html"))
        .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1)))
        .map(Stream::count)
        .forEachOrdered(System.out::println);
}

Я получаю исключение:

java.nio.file.FileSystemException: /long/file/name: Too many open files

Проблема заключается в том, что Stream.count не закрывает поток, когда он проходит его. Но я не понимаю, почему это не должно, учитывая, что это терминальная операция. То же самое относится к другим терминальным операциям, таким как reduce и forEach. flatMap, с другой стороны, закрывает потоки, из которых он состоит.

Документация говорит мне использовать инструкцию try-with-resouces для закрытия потоков, если это необходимо. В моем случае я мог бы заменить строку count на что-то вроде этого:

.map(s -> { long c = s.count(); s.close(); return c; } )

Но это шумно и уродливо и может быть настоящим неудобством в некоторых случаях с большими сложными конвейерами.

Итак, мои вопросы таковы:

  • Почему потоки не были разработаны так, чтобы терминальные операции закрывали потоки, над которыми они работают? Это улучшит их работу с потоками ввода-вывода.
  • Какое оптимальное решение для закрытия потоков ввода-вывода в конвейерах?

runtimizeException - это метод, который обертывает проверенное исключение в RuntimeException s.

4b9b3361

Ответ 1

Здесь есть два вопроса: обработка проверенных исключений, таких как IOException, и своевременное закрытие ресурсов.

Ни один из предопределенных функциональных интерфейсов не объявляет о каких-либо проверенных исключениях, а это означает, что они должны обрабатываться в лямбда или завернуты в исключение и исключены. Похоже, ваша функция runtimizeException делает это. Возможно, вам также пришлось объявить для себя собственный функциональный интерфейс. Как вы, вероятно, обнаружили, это боль.

При закрытии таких ресурсов, как файлы, было проведено некоторое расследование того, что потоки автоматически закрываются, когда достигнут конец потока. Это было бы удобно, но это не касается закрытия, когда генерируется исключение. В потоках нет волшебного механизма для правильной работы.

Мы остались со стандартными методами Java, связанными с закрытием ресурсов, а именно конструкцией try-with-resources, представленной в Java 7. TWR действительно хочет, чтобы ресурсы были закрыты на том же уровне в стеке вызовов, когда они были открыты. Применяется принцип "кто бы ни открывал его, чтобы закрыть его". TWR также занимается обработкой исключений, что обычно упрощает работу с обработкой исключений и закрытием ресурсов в том же месте.

В этом примере поток несколько необычен тем, что он отображает a Stream<Path> в Stream<Stream<String>>. Эти вложенные потоки - это те, которые не закрыты, что приводит к возможному исключению, когда в системе заканчиваются дескрипторы открытых файлов. Что затрудняет то, что файлы открываются одной операцией потока, а затем передаются вниз по течению; это делает невозможным использование TWR.

Альтернативный подход к структурированию этого конвейера выглядит следующим образом.

Вызов Files.lines - это тот, который открывает файл, поэтому это должен быть ресурс в инструкции TWR. Обработка этого файла происходит там, где (некоторые) IOExceptions получают бросок, поэтому мы можем сделать обработку исключений в одном и том же заявлении TWR. Это предполагает наличие простой функции, которая отображает путь к счету строк при обработке закрытия ресурсов и обертывания исключений:

long lineCount(Path path) {
    try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) {
        return s.count();
    } catch (IOException ioe) {
        throw new UncheckedIOException(ioe);
    }
}

Когда у вас есть эта вспомогательная функция, основной конвейер выглядит следующим образом:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
           100, (path, attr) -> path.toString().endsWith(".html"))
     .mapToLong(this::lineCount)
     .forEachOrdered(System.out::println);

Ответ 2

Можно создать служебный метод, который надежно закрывает потоки в середине конвейера.

Это гарантирует, что каждый ресурс закрыт приложением try-with-resource, но избегает необходимости в специальном методе утилиты и гораздо менее подробен, чем запись try-statement непосредственно в лямбда.

С помощью этого метода конвейер из вопроса выглядит следующим образом:

Files.find(Paths.get("Java_8_API_docs/docs/api"), 100,
        (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> applyAndClose(
        () -> Files.lines(file, StandardCharsets.ISO_8859_1),
        Stream::count))
    .forEachOrdered(System.out::println);

Реализация выглядит следующим образом:

/**
 * Applies a function to a resource and closes it afterwards.
 * @param sup Supplier of the resource that should be closed
 * @param op operation that should be performed on the resource before it is closed
 * @return The result of calling op.apply on the resource 
 */
private static <A extends AutoCloseable, B> B applyAndClose(Callable<A> sup, Function<A, B> op) {
    try (A res = sup.call()) {
        return op.apply(res);
    } catch (RuntimeException exc) {
        throw exc;
    } catch (Exception exc) {
        throw new RuntimeException("Wrapped in applyAndClose", exc);
    }
}

(Поскольку ресурсы, которые необходимо закрыть, часто также генерируют исключения, когда они назначаются исключениями, отличными от времени выполнения, завершаются в исключениях времени выполнения, избегая необходимости в отдельном методе, который это делает.)

Ответ 3

Вам нужно будет вызвать close() в этой операции потока, которая вызовет вызов всех основных обработчиков.

Еще лучше, было бы обернуть весь ваш оператор в блок try-with-resources, так как тогда он автоматически вызовет обработчик закрытия.

В вашей ситуации это может быть не так, это означает, что вам нужно будет справиться с этим самостоятельно в какой-то операции. Ваши текущие методы могут вообще не подходить для потоков.

Кажется, вам действительно нужно сделать это во второй операции map().

Ответ 4

Закрытие интерфейса AutoCloseable следует вызывать только один раз. Для получения дополнительной информации см. Документацию AutoCloseable.

Если конечные операции будут автоматически закрывать поток, закрыть может быть вызвано дважды. Взгляните на следующий пример:

try (Stream<String> lines = Files.lines(path)) {
    lines.count();
}

Как он определен прямо сейчас, метод close в строках будет вызываться ровно один раз. Независимо от того, завершится ли окончательная операция нормально или операция прервана в IOException. Если поток будет закрыт неявно в заключительной операции, метод close будет вызываться один раз, если произойдет IOException, и дважды, если операция завершится успешно.

Ответ 5

Вот альтернатива, которая использует другой метод из Files и позволит избежать утечки дескрипторов файлов:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
    100, (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> runtimizeException(() -> Files.readAllLines(file, StandardCharsets.ISO_8859_1).size())
    .forEachOrdered(System.out::println);

В отличие от вашей версии, он вернет int вместо long для подсчета строк; но у вас нет файлов с таким количеством строк, не так ли?