Подтвердить что ты не робот

PipedInputStream - Как избежать "java.io.IOException: Pipe broken"

У меня есть два потока. Один из них пишет в PipedOutputStream, другой - из соответствующего PipedInputStream. Предпосылкой является то, что один поток загружает некоторые данные с удаленного сервера и мультиплексирует его на несколько других потоков через потоки с каналами.

Проблема в том, что иногда (особенно при загрузке больших файлов ( > 50 МБ) я получаю java.io.IOException: Pipe broken при попытке прочитать из PipedInputStream.
Джавадок говорит, что A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.
Это правда, мой поток писем действительно умирает после написания всех его данных в PipedOutputStream.

Любые решения? Как я могу запретить PipedInputStream отбрасывать это исключение? Я хочу иметь возможность читать все данные, которые были написаны в PipedOutputStream, даже если запись потока завершила его работу. (Если кто-нибудь знает, как продолжать писать нить, пока все данные не будут прочитаны, это решение также приемлемо).

4b9b3361

Ответ 1

Используйте java.util.concurrent.CountDownLatch и не завершайте первый поток до того, как второй сообщит, что он закончил чтение из канала.

Обновление: быстрый и грязный код, чтобы проиллюстрировать мой комментарий ниже

    final PipedInputStream pin = getInputStream();
    final PipedOutputStream pout = getOutputStream();

    final CountDownLatch latch = new CountDownLatch(1);

    InputStream in = new InputStream() {

        @Override
        public int read() throws IOException {
            return pin.read();
        }

        @Override
        public void close() throws IOException {
            super.close();
            latch.countDown();
        }
    };


    OutputStream out = new OutputStream(){

        @Override
        public void write(int b) throws IOException {
            pout.write(b);
        }

        @Override
        public void close() throws IOException {
            while(latch.getCount()!=0) {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    //too bad
                }
            }
            super.close();
        }
    };

    //give the streams to your threads, they don't know a latch ever existed
    threadOne.feed(in);
    threadTwo.feed(out);

Ответ 2

Закрываете ли вы PipedOutputStream, когда поток, который его использует? Вам нужно сделать так, чтобы байты в нем попали в соответствующий PipedInputStream.

Ответ 3

PipedInputStream и PipedOutputStream нарушаются (в отношении потоков). Они предполагают, что каждый экземпляр привязан к определенному потоку. Это странно. Я предлагаю использовать вашу собственную (или, по крайней мере, другую) реализацию.