Подтвердить что ты не робот

Clojure агенты, потребляющие из очереди

Я пытаюсь найти лучший способ использовать агенты для потребления элементов из очереди сообщений (Amazon SQS). Прямо сейчас у меня есть функция (process-queue-item), которая захватывает элементы из очереди и обрабатывает ее.

Я хочу обрабатывать эти элементы одновременно, но я не могу обернуть голову тем, как управлять агентами. В основном я хочу, чтобы все агенты были заняты как можно больше, не тянувшись к множеству предметов из очереди и не создавая отставание (я буду запускать это на нескольких машинах, поэтому элементы должны быть оставлены в очереди, пока они действительно необходимы).

Может ли кто-нибудь дать мне несколько указаний на улучшение моей реализации?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))
4b9b3361

Ответ 1

(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done

Ответ 2

То, о чем вы просите, это способ сохранить выполнение задач, но с некоторым верхним пределом. Один простой подход к этому заключается в использовании семафора для координации ограничения. Вот как я подойду к нему:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] [email protected])))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<[email protected]: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<[email protected]: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<[email protected]: :pending>
    ;; then submits the job

С этим теперь вам просто нужно координировать работу самих задач. Похоже, у вас уже есть механизмы для этого. Loop (submit-future (process-queue-item))

Ответ 3

Возможно, вы могли бы использовать функцию seque? Цитирование (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

То, что я имею в виду, - это ленивая последовательность, получающая элементы очереди по сети; вы обернули бы это в seque, положите, что в Ref и рабочие агенты потребляют предметы из этого seque. seque возвращает то, что выглядит как обычный seq с точки зрения вашего кода, при этом магия очереди происходит прозрачным способом. Обратите внимание, что если последовательность, которую вы помещаете внутри, помечена, то она все равно будет вынуждена блокировать за раз. Также обратите внимание, что первоначальный вызов seque сам по себе блокируется до тех пор, пока не будет получен первый или два элемента (или фрагмент, в зависимости от случая), я думаю, что больше связано с тем, как работают ленивые последовательности, чем seque, хотя).

Эскиз кода (действительно отрывочный, не протестированный вообще):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

На самом деле вам, вероятно, понадобится более сложный производитель позиции очереди seq, чтобы вы могли попросить его прекратить производство новых предметов (необходимость в том, чтобы все могло быть закрыто изящно, фьючерсы умрут когда источник задачи иссяк, используйте future-done?, чтобы убедиться, что они уже сделали это). И это только то, что я вижу на первый взгляд... Я уверен, что здесь есть все, что нужно сделать для полировки. Я думаю, что общий подход будет работать.

Ответ 4

Не знаю, насколько это идиоматично, поскольку я все еще новичок в этом языке, но для меня работает следующее решение:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))