Подтвердить что ты не робот

Продюсер с квалификацией

Я новичок в clojure и пытаюсь понять, как правильно использовать его функции concurrency, поэтому любые критические замечания и предложения оценены. Поэтому я пытаюсь написать небольшую тестовую программу в clojure, которая работает следующим образом:

  • там 5 производителей и 2 потребителя
  • продюсер ждет случайного времени, а затем выталкивает номер в общую очередь.
  • потребитель должен вывести номер из очереди, как только очередь будет непустой, а затем засыпать на короткое время, чтобы имитировать выполнение работы.
  • потребители должны блокировать, когда очередь пуста.
  • производители должны блокировать, когда очередь имеет более 4 элементов в ней, чтобы предотвратить ее рост.

Вот мой план для каждого шага выше:

  • производители и потребители будут агентами, которые не заботятся о своем состоянии (только нулевые ценности или что-то еще); я просто использую агентов для отправки функции "потребитель" или "производитель", чтобы сделать это в какое-то время. Тогда общая очередь будет (def queue (ref [])). Может быть, это должен быть атом?

в функции агента производителя, просто (Thread/sleep (rand-int 1000)), а затем (dosync (alter queue conj (rand-int 100))), чтобы нажать на очередь. Я думаю, чтобы потребительские агенты наблюдали за очередью изменений с помощью add-watcher. Не уверен в этом, хотя... он будет пробуждать потребителей при любых изменениях, даже если изменение исходит от потребителя, который тянет что-то (возможно, делает его пустым). Возможно, проверка этого в функции наблюдателя достаточно. Другая проблема, которую я вижу, заключается в том, что, если все потребители заняты, что происходит, когда производитель добавляет что-то новое в очередь? Наблюдает ли наблюдаемое событие очередь на каком-то потребительском агенте или исчезает? см. выше Я действительно не знаю, как это сделать. Я слышал, что clojure seque может быть полезным, но я не мог найти достаточно документа о том, как его использовать, и мое первоначальное тестирование, похоже, не работает (извините, у меня больше нет кода).
4b9b3361

Ответ 1

Здесь я беру на себя это. Я решил использовать структуры данных Clojure, чтобы понять, как это будет работать. Обратите внимание, что было бы совершенно обычным и идиоматичным взять блокирующую очередь из панели инструментов Java и использовать ее здесь; Я думаю, что код будет легко адаптироваться. Обновление: я действительно адаптировал его к java.util.concurrent.LinkedBlockingQueue, см. Ниже.

clojure.lang.PersistentQueue

Вызовите (pro-con), чтобы запустить пробный запуск; затем просмотрите содержимое output, чтобы узнать, произошло ли что-либо, и queue-lengths, чтобы увидеть, остались ли они в пределах данной границы.

Обновление. Чтобы объяснить, почему я почувствовал необходимость использовать ensure ниже (об этом я спрашивал об этом в IRC), это необходимо для предотвращения перекоса записи (см. статью в Википедии о Изоляция снимков для определения). Если бы я заменил @queue на (ensure queue), для двух или более производителей можно было бы проверить длину очереди, найти, что она меньше 4, а затем поместить дополнительные элементы в очередь и, возможно, привести общую длину очередь выше 4, нарушая ограничение. Аналогично, два пользователя, делающие @queue, могут принять один и тот же элемент для обработки, а затем поместить два элемента из очереди. ensure предотвращает любой из этих сценариев.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

Версия выше написана с использованием LinkedBlockingQueue. Обратите внимание, как общий контур кода в основном тот же, причем некоторые детали на самом деле немного чище. Я удалил queue-lengths из этой версии, так как LBQ позаботится об этом ограничении для нас.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))