Подтвердить что ты не робот

Как я могу создать универсальный пейджинговый разделитель?

Я хотел бы иметь возможность обрабатывать чтение потока Java из источника, к которому нужно обращаться на страницах. В качестве первого подхода я применил итератор подкачки, который просто запрашивал страницы, когда на текущей странице заканчивались элементы, а затем использовала StreamSupport.stream(iterator, false), чтобы получить дескриптор потока над итератором.

Как я обнаружил, что мои страницы довольно дороги для извлечения, я хотел бы получить доступ к страницам через параллельный поток. В этот момент я обнаружил, что parallelism, предоставленный моим наивным подходом, отсутствует в связи с реализацией spliterator, которую java предоставляет непосредственно из итератора. Поскольку я действительно много знаю об элементах, которые я хотел бы пройти (я знаю, что суммарный результат подсчитывается после запроса первой страницы, а источник поддерживает смещение и ограничение). Я думаю, что должно быть возможно реализовать мой собственный разделитель, который достигает real concurrency (как в работе над элементами страницы, так и в запросе страницы).

Мне удалось довольно легко выполнить "работу над элементами" concurrency, но в моей первоначальной реализации запрос страницы выполняется только самым верхним разделителем и, следовательно, не приносит пользы от разделения работы, предлагаемого внедрением fork-join.

Как написать разделитель, который достигает обеих этих целей?

Для справки, я расскажу, что я сделал до сих пор (я знаю, что он не делит запросы соответственно).

   public final class PagingSourceSpliterator<T> implements Spliterator<T> {

    public static final long DEFAULT_PAGE_SIZE = 100;

    private Page<T> result;
    private Iterator<T> results;
    private boolean needsReset = false;
    private final PageProducer<T> generator;
    private long offset = 0L;
    private long limit = DEFAULT_PAGE_SIZE;


    public PagingSourceSpliterator(PageProducer<T> generator) {
        this.generator = generator;
    }

    public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
        this.generator = generator;
        this.limit = pageSize;
    }


    @Override
    public boolean tryAdvance(Consumer<? super T> action) {

        if (hasAnotherElement()) {
            if (!results.hasNext()) {
                loadPageAndPrepareNextPaging();
            }
            if (results.hasNext()) {
                action.accept(results.next());
                return true;
            }
        }

        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        // if we know there another page, go ahead and hand off whatever
        // remains of this spliterator as a new spliterator for other
        // threads to work on, and then mark that next time something is
        // requested from this spliterator it needs to be reset to the head
        // of the next page
        if (hasAnotherPage()) {
            Spliterator<T> other = result.getPage().spliterator();
            needsReset = true;
            return other;
        } else {
            return null;
        }

    }

    @Override
    public long estimateSize() {
        if(limit == 0) {
            return 0;
        }

        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return result.getTotalResults();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
    }

    private boolean hasAnotherElement() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (results.hasNext() || hasAnotherPage());
    }

    private boolean hasAnotherPage() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (result.getTotalResults() > offset);
    }

    private boolean isBound() {
        return Objects.nonNull(results) && Objects.nonNull(result);
    }

    private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
        ensureBound();
        ensureResetIfNecessary();
    }

    private void ensureBound() {
        if (!isBound()) {
            loadPageAndPrepareNextPaging();
        }
    }

    private void ensureResetIfNecessary() {
        if(needsReset) {
            loadPageAndPrepareNextPaging();
            needsReset = false;
        }
    }

    private void loadPageAndPrepareNextPaging() {
        // keep track of the overall result so that we can reference the original list and total size
        this.result = generator.apply(offset, limit);

        // make sure that the iterator we use to traverse a single page removes
        // results from the underlying list as we go so that we can simply pass
        // off the list spliterator for the trySplit rather than constructing a
        // new kind of spliterator for what remains.
        this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
            @Override
            public T next() {
                T next = super.next();
                this.remove();
                return next;
            }
        };

        // update the paging for the next request and inquiries prior to the next request
        // we use the page of the actual result set instead of the limit in case the limit
        // was not respected exactly.
        this.offset += result.getPage().size();
    }

    public static class DelegatingIterator<T> implements Iterator<T> {

        private final Iterator<T> iterator;

        public DelegatingIterator(Iterator<T> iterator) {
            this.iterator = iterator;
        }


        @Override
        public boolean hasNext() {
            return iterator.hasNext();
        }

        @Override
        public T next() {
            return iterator.next();
        }

        @Override
        public void remove() {
            iterator.remove();
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            iterator.forEachRemaining(action);
        }
    }
}

И источник моих страниц:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {

}

И страница:

public final class Page<T> {

    private long totalResults;
    private final List<T> page = new ArrayList<>();

    public long getTotalResults() {
        return totalResults;
    }

    public List<T> getPage() {
        return page;
    }

    public Page setTotalResults(long totalResults) {
        this.totalResults = totalResults;
        return this;
    }

    public Page setPage(List<T> results) {
        this.page.clear();
        this.page.addAll(results);
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Page)) {
            return false;
        }
        Page<?> page1 = (Page<?>) o;
        return totalResults == page1.totalResults && Objects.equals(page, page1.page);
    }

    @Override
    public int hashCode() {
        return Objects.hash(totalResults, page);
    }

}

И образец получения потока с "медленным" поисковым вызовом для тестирования

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {

    PageProducer<T> producer = (offset, limit) -> {

        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        int beginIndex = offset.intValue();
        int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
        return new Page<T>().setTotalResults(things.size())
                .setPage(things.subList(beginIndex, endIndex));
    };

    return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}
4b9b3361

Ответ 1

Основная причина, ваш spliterator не приближает вас к вашей цели, заключается в том, что он пытается разделить страницы, а не пространство исходного элемента. Если вы знаете общее количество элементов и имеете источник, позволяющий извлекать страницу с помощью смещения и ограничения, наиболее естественной формой spliterator является инкапсуляция диапазона в пределах этих элементов, например. через смещение, ограничение или конец. Затем расщепление означает просто разделение этого диапазона, адаптируя ваши разделители смещения к позиции разделения и создавая новый разделитель, представляющий префикс, от "старого смещения" до положения разделения.

Before splitting:
      this spliterator: offset=x, end=y
After splitting:
      this spliterator: offset=z, end=y
  returned spliterator: offset=x, end=z

x <= z <= y

В лучшем случае z находится ровно посередине между x и y, чтобы создать сбалансированные расщепления, но в нашем случае мы немного приспособим его к тому, чтобы рабочие множества были кратными странице размер.

Эта логика работает без необходимости извлекать страницы, поэтому, если вы откладываете выборку страниц до момента, фреймворк хочет начать обход, т.е. после разделения, операции выборки могут выполняться параллельно. Самым большим препятствием является то, что вам нужно получить первую страницу, чтобы узнать об общем количестве элементов. Решение ниже отделяет этот первый выбор от остальных, упрощая реализацию. Конечно, он должен сдать результат этой первой выборки страницы, которая будет потребляться при первом обходе (в последовательном случае) или возвращаться в качестве первого разделительного префикса, принимая в этот момент один несбалансированный раскол, но не имея чтобы справиться с этим позже.

public class PagingSpliterator<T> implements Spliterator<T> {
    public interface PageFetcher<T> {
        List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
    }
    public static final long DEFAULT_PAGE_SIZE = 100;

    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
        return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
    }
    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
                                      long pageSize, boolean parallel) {
        if(pageSize<=0) throw new IllegalArgumentException();
        return StreamSupport.stream(() -> {
            PagingSpliterator<T> pgSp
                = new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
            pgSp.danglingFirstPage
                =spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
            return pgSp;
        }, CHARACTERISTICS, parallel);
    }
    private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;

    private final PageFetcher<T> supplier;
    long start, end, pageSize;
    Spliterator<T> currentPage, danglingFirstPage;

    PagingSpliterator(PageFetcher<T> supplier,
            long start, long end, long pageSize) {
        this.supplier = supplier;
        this.start    = start;
        this.end      = end;
        this.pageSize = pageSize;
    }

    public boolean tryAdvance(Consumer<? super T> action) {
        for(;;) {
            if(ensurePage().tryAdvance(action)) return true;
            if(start>=end) return false;
            currentPage=null;
        }
    }
    public void forEachRemaining(Consumer<? super T> action) {
        do {
            ensurePage().forEachRemaining(action);
            currentPage=null;
        } while(start<end);
    }
    public Spliterator<T> trySplit() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        if(currentPage!=null)
            return currentPage.trySplit();
        if(end-start>pageSize) {
            long mid=(start+end)>>>1;
            mid=mid/pageSize*pageSize;
            if(mid==start) mid+=pageSize;
            return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
        }
        return ensurePage().trySplit();
    }
    /**
     * Fetch data immediately before traversing or sub-page splitting.
     */
    private Spliterator<T> ensurePage() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            currentPage=fp;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        Spliterator<T> sp = currentPage;
        if(sp==null) {
            if(start>=end) return Spliterators.emptySpliterator();
            sp = spliterator(supplier.fetchPage(
                                 start, Math.min(end-start, pageSize), l->{}));
            start += sp.getExactSizeIfKnown();
            currentPage=sp;
        }
        return sp;
    }
    /**
     * Ensure that the sub-spliterator provided by the List is compatible with
     * ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
     * the spliterators are, so the costs of dumping into an intermediate array
     * in the other case is irrelevant.
     */
    private static <E> Spliterator<E> spliterator(List<E> list) {
        Spliterator<E> sp = list.spliterator();
        if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
            sp=Spliterators.spliterator(
                StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
        return sp;
    }
    public long estimateSize() {
        if(currentPage!=null) return currentPage.estimateSize();
        return end-start;
    }
    public int characteristics() {
        return CHARACTERISTICS;
    }
}

Он использует специализированный функциональный интерфейс PageFetcher, который может быть реализован путем вызова метода accept обратного вызова с итоговым итоговым размером и возвратом списка элементов. Разделитель пейджинга будет просто делегировать разделители списков для обхода, а в случае, если concurrency значительно выше, чем результирующее число страниц, может даже выиграть от разделения этих разделителей страниц, что подразумевает, что списки произвольного доступа, такие как ArrayList, являются предпочтительным типом списка.

Адаптация кода примера к

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
    return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
        totalSizeSink.accept(things.size());
        if(offset>things.size()) return Collections.emptyList();
        int beginIndex = (int)offset;
        assert beginIndex==offset;
        int endIndex = Math.min(beginIndex+(int)limit, things.size());
        System.out.printf("Page %6d-%6d:\t%s%n",
                          beginIndex, endIndex, Thread.currentThread());
        // artificial slowdown
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        return things.subList(beginIndex, endIndex);
    }, pageSize, true);
}

вы можете проверить его как

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();

Учитывая достаточное количество свободных ядер процессора, он будет демонстрировать, как страницы выбираются одновременно, поэтому неупорядочен, а результат будет корректно находиться в порядке поиска. Вы также можете протестировать подстраницу concurrency, которая применяется, когда есть меньше страниц:

Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
    .peek(x -> threads.add(Thread.currentThread()))
    .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();
System.out.println("Concurrency: "+threads.size());

Ответ 2

https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html

По моему мнению, скорость раздробления происходит от неизменности. Чем более неизменный источник, тем быстрее обработка как неизменность лучше обеспечивает обработку параллелей или, скорее, расщепление.

Кажется, что идея заключается в том, чтобы как можно лучше отреагировать на изменения, если они есть, до того, как они свяжутся в целом (лучше всего) или по частям (как правило, так и, следовательно, ваши и многие другие), для splittingators.

В вашем случае это может означать, что сначала будут соблюдаться размеры страниц, а не:

//.. in case the limit was not respected exactly. this.offset += result.getPage().size();

Это также может означать, что подача потока должна быть подготовлена ​​и не использоваться в качестве прямого источника.

В конце документа есть пример "как параллельная вычислительная среда, такая как пакет java.util.stream, будет использовать Spliterator в параллельном вычислении"

Обратите внимание, как поток будет использовать spliterator, а не как spliterator использует поток в качестве источника.

В конце примера есть интересный метод "compute".

PS, если вы когда-либо получите общий эффективный класс PageSpliterator, обязательно сообщите об этом некоторым из нас.

приветствий.