У меня есть Java-типизированный актер, который отвечает за логику фильтра/повтора на внешнем ресурсе, который может быть временно недоступен. Поля актера и общие методы:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
Актер имеет несколько операций, которые имеют более или менее ту же логику повтора/буфера; каждая операция имеет свои поля [op]Wait
и [op]Buffer
.
- Родительский оператор вызывает
void operation(OpInput opInput)
- Предыдущий метод вызывает
void operation(OpInput opInput, long currentWait)
с помощьюDEFAULTWAIT
для второго параметра - Если параметр
currentWait
не равенopWait
, тогда вход сохраняется вopBuffer
, иначе вход отправляется на внешний ресурс. - Если внешний ресурс возвращает успех, то
opWait
устанавливается наDEFAULTWAIT
, а содержимоеopBuffer
отправляется обратно через методoperation(opInput)
. Если внешний ресурс (или, скорее, сеть) возвращает ошибку, то я обновляюopWait = updateWait(opWait)
и планируюoperation(opInput, opWait)
в планировщике системы действий с использованием задержкиopWait
ms.
т.е. Я использую планировщик системы действий для реализации экспоненциального отсрочки; Я использую параметр currentWait
для идентификации сообщения, которое я пытаюсь выполнить, и буферизую другие сообщения, пока основное сообщение не будет успешно обработано внешним ресурсом.
Проблема заключается в том, что если запланированное сообщение operation(opInput, currentWait)
потеряно, я навсегда буферизую сообщения, потому что защита currentWait == opWait
не будет работать для всех других сообщений. Я мог бы использовать что-то вроде spring-retry для реализации экспоненциального отсрочка, но я не вижу способа объединить петли повторения операций, что означает, что я мог бы использовать один поток за цикл повтора (в то время как использование планировщика системы актеров не ставит гораздо большую нагрузку на систему).
Я ищу более отказоустойчивый способ реализовать буферизацию и экспоненциальную отсрочку на интерфейсе между актером и внешним ресурсом, не выделяя слишком много ресурсов для задачи.