Подтвердить что ты не робот

Ожидание бесконечно для сообщения, которое никогда не может прибыть

У меня есть Java-типизированный актер, который отвечает за логику фильтра/повтора на внешнем ресурсе, который может быть временно недоступен. Поля актера и общие методы:

public class MyActorImpl implements MyActor {
    private static final long MINWAIT = 50;
    private static final long MAXWAIT = 1000;
    private static final long DEFAULTWAIT = 0;
    private static final double BACKOFFMULTIPLIER = 1.5;

    private long updateWait(long currentWait) {
        return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
    }

    // mutable
    private long opWait = DEFAULTWAIT;
    private final Queue<OpInput> opBuffer = new ArrayDeque<>();

    // called from external actor
    public void operation(OpInput opInput) {
        operation(opInput, DEFAULTWAIT);
    }

    // called internally
    public void operation(OpInput opInput, long currentWait);
}

Актер имеет несколько операций, которые имеют более или менее ту же логику повтора/буфера; каждая операция имеет свои поля [op]Wait и [op]Buffer.

  • Родительский оператор вызывает void operation(OpInput opInput)
  • Предыдущий метод вызывает void operation(OpInput opInput, long currentWait) с помощью DEFAULTWAIT для второго параметра
  • Если параметр currentWait не равен opWait, тогда вход сохраняется в opBuffer, иначе вход отправляется на внешний ресурс.
  • Если внешний ресурс возвращает успех, то opWait устанавливается на DEFAULTWAIT, а содержимое opBuffer отправляется обратно через метод operation(opInput). Если внешний ресурс (или, скорее, сеть) возвращает ошибку, то я обновляю opWait = updateWait(opWait) и планирую operation(opInput, opWait) в планировщике системы действий с использованием задержки opWait ms.

т.е. Я использую планировщик системы действий для реализации экспоненциального отсрочки; Я использую параметр currentWait для идентификации сообщения, которое я пытаюсь выполнить, и буферизую другие сообщения, пока основное сообщение не будет успешно обработано внешним ресурсом.

Проблема заключается в том, что если запланированное сообщение operation(opInput, currentWait) потеряно, я навсегда буферизую сообщения, потому что защита currentWait == opWait не будет работать для всех других сообщений. Я мог бы использовать что-то вроде spring-retry для реализации экспоненциального отсрочка, но я не вижу способа объединить петли повторения операций, что означает, что я мог бы использовать один поток за цикл повтора (в то время как использование планировщика системы актеров не ставит гораздо большую нагрузку на систему).

Я ищу более отказоустойчивый способ реализовать буферизацию и экспоненциальную отсрочку на интерфейсе между актером и внешним ресурсом, не выделяя слишком много ресурсов для задачи.

4b9b3361

Ответ 1

Если я правильно понимаю вас, если единственная проблема заключается в потере запланированного сообщения, почему бы вам просто не использовать что-то вроде Надежного шаблона прокси-сервера для этого конкретное сообщение, а затем, если он терпит неудачу opWait = DEFAULTWAIT;

Есть кое-что о вашем коде, который я получаю, я не понимаю, что вы имеете в виду, когда говорите, что public void operation(OpInput opInput) вызывается извне. Вы имеете в виду, что этот метод взаимодействует с сетью, которая использует ресурсы, которые иногда недоступны?

Если я могу предложить альтернативу. Из того, что я понимаю, ваша основная проблема заключается в том, что у вас есть ресурсы, которые иногда недоступны, поэтому у вас есть какой-то тип que/buffer, который вы реализуете с помощью какой-то логики ожидания, чтобы сообщение обрабатывалось, когда оно снова доступно, что, к сожалению, включает некоторые сообщения, которые могут быть потеряны и привести к бесконечному ожиданию. Я думаю, вы могли бы достичь того, что хотите, используя фьючерсы с таймаутами. Затем повторите попытку, если будущее не будет завершено за определенное количество времени, до трех повторных попыток. Вы даже можете настроить это время на основе нагрузки на сервер и сколько времени потребуется для завершения сообщения. Надеюсь, что это поможет.