Я хотел бы иметь возможность обрабатывать чтение потока Java из источника, к которому нужно обращаться на страницах. В качестве первого подхода я применил итератор подкачки, который просто запрашивал страницы, когда на текущей странице заканчивались элементы, а затем использовала StreamSupport.stream(iterator, false)
, чтобы получить дескриптор потока над итератором.
Как я обнаружил, что мои страницы довольно дороги для извлечения, я хотел бы получить доступ к страницам через параллельный поток. В этот момент я обнаружил, что parallelism, предоставленный моим наивным подходом, отсутствует в связи с реализацией spliterator, которую java предоставляет непосредственно из итератора. Поскольку я действительно много знаю об элементах, которые я хотел бы пройти (я знаю, что суммарный результат подсчитывается после запроса первой страницы, а источник поддерживает смещение и ограничение). Я думаю, что должно быть возможно реализовать мой собственный разделитель, который достигает real concurrency (как в работе над элементами страницы, так и в запросе страницы).
Мне удалось довольно легко выполнить "работу над элементами" concurrency, но в моей первоначальной реализации запрос страницы выполняется только самым верхним разделителем и, следовательно, не приносит пользы от разделения работы, предлагаемого внедрением fork-join.
Как написать разделитель, который достигает обеих этих целей?
Для справки, я расскажу, что я сделал до сих пор (я знаю, что он не делит запросы соответственно).
public final class PagingSourceSpliterator<T> implements Spliterator<T> {
public static final long DEFAULT_PAGE_SIZE = 100;
private Page<T> result;
private Iterator<T> results;
private boolean needsReset = false;
private final PageProducer<T> generator;
private long offset = 0L;
private long limit = DEFAULT_PAGE_SIZE;
public PagingSourceSpliterator(PageProducer<T> generator) {
this.generator = generator;
}
public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
this.generator = generator;
this.limit = pageSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (hasAnotherElement()) {
if (!results.hasNext()) {
loadPageAndPrepareNextPaging();
}
if (results.hasNext()) {
action.accept(results.next());
return true;
}
}
return false;
}
@Override
public Spliterator<T> trySplit() {
// if we know there another page, go ahead and hand off whatever
// remains of this spliterator as a new spliterator for other
// threads to work on, and then mark that next time something is
// requested from this spliterator it needs to be reset to the head
// of the next page
if (hasAnotherPage()) {
Spliterator<T> other = result.getPage().spliterator();
needsReset = true;
return other;
} else {
return null;
}
}
@Override
public long estimateSize() {
if(limit == 0) {
return 0;
}
ensureStateIsUpToDateEnoughToAnswerInquiries();
return result.getTotalResults();
}
@Override
public int characteristics() {
return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
}
private boolean hasAnotherElement() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (results.hasNext() || hasAnotherPage());
}
private boolean hasAnotherPage() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (result.getTotalResults() > offset);
}
private boolean isBound() {
return Objects.nonNull(results) && Objects.nonNull(result);
}
private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
ensureBound();
ensureResetIfNecessary();
}
private void ensureBound() {
if (!isBound()) {
loadPageAndPrepareNextPaging();
}
}
private void ensureResetIfNecessary() {
if(needsReset) {
loadPageAndPrepareNextPaging();
needsReset = false;
}
}
private void loadPageAndPrepareNextPaging() {
// keep track of the overall result so that we can reference the original list and total size
this.result = generator.apply(offset, limit);
// make sure that the iterator we use to traverse a single page removes
// results from the underlying list as we go so that we can simply pass
// off the list spliterator for the trySplit rather than constructing a
// new kind of spliterator for what remains.
this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
@Override
public T next() {
T next = super.next();
this.remove();
return next;
}
};
// update the paging for the next request and inquiries prior to the next request
// we use the page of the actual result set instead of the limit in case the limit
// was not respected exactly.
this.offset += result.getPage().size();
}
public static class DelegatingIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
iterator.forEachRemaining(action);
}
}
}
И источник моих страниц:
public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {
}
И страница:
public final class Page<T> {
private long totalResults;
private final List<T> page = new ArrayList<>();
public long getTotalResults() {
return totalResults;
}
public List<T> getPage() {
return page;
}
public Page setTotalResults(long totalResults) {
this.totalResults = totalResults;
return this;
}
public Page setPage(List<T> results) {
this.page.clear();
this.page.addAll(results);
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Page)) {
return false;
}
Page<?> page1 = (Page<?>) o;
return totalResults == page1.totalResults && Objects.equals(page, page1.page);
}
@Override
public int hashCode() {
return Objects.hash(totalResults, page);
}
}
И образец получения потока с "медленным" поисковым вызовом для тестирования
private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
PageProducer<T> producer = (offset, limit) -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int beginIndex = offset.intValue();
int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
return new Page<T>().setTotalResults(things.size())
.setPage(things.subList(beginIndex, endIndex));
};
return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}