Подтвердить что ты не робот

Программирование потока: подписчик и издатель отслеживают счет?

Я пришел к статье относительно новых Flow связанных интерфейсов в Java9. Пример кода оттуда:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;    
  @Override  
  public void onSubscribe(Subscription subscription) {  
    this.subscription = subscription;  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  }        
  @Override  
  public void onNext(T item) {  
    System.out.println("Got : " + item);  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
   }  

Как вы можете видеть, onNext() запрашивает новый элемент один.

Теперь мне интересно:

  • если onSubscribe() запрошено, скажем, 5 элементов
  • и после того, как первый элемент будет доставлен, request(1) вызывается, как указано выше

Ожидается ли, что сервер отправит

  • пять элементов (запрошено 5 запросов -1 отправлено +1 запрошено)
  • или один элемент (поскольку предыдущий запрос "отбрасывается" этим новым запросом)

Другими словами: когда request() вызывается несколько раз, добавьте эти числа; или предыдущие запросы "отброшены"?

Подведение к заголовку вопроса - должен ли абонент отслеживать полученные элементы, чтобы избежать запроса "слишком много" элементов в какой-то момент.

4b9b3361

Ответ 1

Как указывает Sotirios , говорится в request методе Javadoc (выделение мой):

Добавляет указанное число n элементов к текущему невыполненному запросу для этой подписки. Если n меньше или равно нулю, подписчик получит сигнал onError с аргументом IllegalArgumentException. В противном случае Подписчик получит до n дополнительные вызовы onNext (или меньше, если они завершены).

Таким образом, ответ явно да, абоненту необходимо отслеживать элементы. На самом деле, вся суть этого механизма. Некоторая предыстория: метод request предназначен для того, чтобы позволить подписчику применять противодавление, информируя компоненты восходящего потока о том, что он перегружен и "нужен перерыв", Поэтому задача абонента (и только его) состоит в том, чтобы тщательно проверять, когда и сколько новых элементов нужно запросить. В этой строке он не может "пересмотреть" и уменьшить количество получаемых элементов.

Понижение числа также обеспечит связь между издателем и подписчиком "немонотонным" в том смысле, что количество полностью запрошенных предметов может внезапно понизиться (поскольку оно стоит, это может только увеличиться). Это не только раздражает в абстрактном смысле, но и создает конкретные проблемы с согласованностью: издатель может быть в процессе доставки нескольких элементов, когда абонент неожиданно уменьшает количество запрошенных предметов до 1 - что теперь?

Ответ 2

Хотя Java 9 не реализует API Reactive Streams, он предлагает почти тот же API и определяет соответствующее поведение.

И в Reactive Streams спецификация это сложение определяется следующим образом:

Для издателя в 1.1:

Общее количество сигналов onNext, отправленных издателем подписчику, ДОЛЖНО быть меньше или равно общему количеству элементов, запрошенных подпиской подписчика в любое время.

А для подписки в 3.8:

В то время как подписка не отменена, Subscription.request(long n) ДОЛЖЕН зарегистрировать заданное количество дополнительных элементов, которые будут созданы соответствующему подписчику.

Итак, Java придерживается спецификации, которая была указана в API Reactive Streams.