Я пытаюсь найти лучший способ использовать агенты для потребления элементов из очереди сообщений (Amazon SQS). Прямо сейчас у меня есть функция (process-queue-item), которая захватывает элементы из очереди и обрабатывает ее.
Я хочу обрабатывать эти элементы одновременно, но я не могу обернуть голову тем, как управлять агентами. В основном я хочу, чтобы все агенты были заняты как можно больше, не тянувшись к множеству предметов из очереди и не создавая отставание (я буду запускать это на нескольких машинах, поэтому элементы должны быть оставлены в очереди, пока они действительно необходимы).
Может ли кто-нибудь дать мне несколько указаний на улучшение моей реализации?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))