Подтвердить что ты не робот

RxJava + Дооснащение длинным опросом

Моя проблема: я не могу получить бесконечный поток с Retrofit. После того, как я получу учетные данные для первоначального запроса poll(), я выполняю первоначальный запрос poll(). Каждый запрос poll() отвечает через 25 секунд, если изменений нет, или раньше, если есть какие-либо изменения - возврат измененных_данных []. Каждый ответ содержит timestamp данные, необходимые для следующего запроса опроса - я должен выполнить новый запрос poll() после каждого опроса(). Вот мой код:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});

Я новичок в RxJava, может быть, я ничего не понимаю, но я не могу получить бесконечный поток. Я получаю 3 вызова, затем onNext и onComplete.

P.S. Может быть, есть лучшее решение для внедрения долгого опроса на Android?

4b9b3361

Ответ 1

Хотя я не идеальный, я считаю, что вы можете использовать побочные эффекты RX для достижения желаемого результата (операции doOn).

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
    // side effect variable
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
            .flatMap(credentials -> api.query("request", credentials, timestamp.get()))  // this will use the value from previous doOnNext
            .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
            .repeat();
})
        .retry()
        .share();

private static class CredentialsWithTimestamp {

    public final String credentials;
    public final long timestamp; // I assume this is necessary for you from the first request

    public CredentialsWithTimestamp(String credentials, long timestamp) {
        this.credentials = credentials;
        this.timestamp = timestamp;
    }
}

При подписке на "o" внутренний наблюдаемый будет повторяться. Если возникла ошибка, тогда "o" повторит запрос и повторно запросит его из потока учетных данных.

В вашем примере управление вычислением достигается путем обновления переменной timestamp, необходимой для следующего запроса.