Подтвердить что ты не робот

Тема и очередь

Мне интересно знать, что было бы лучшим способом реализации очереди на основе потоков.

Например:

У меня есть 10 действий, которые я хочу выполнить только с 4 потоками. Я хотел бы создать очередь со всеми 10 действиями, расположенными линейно, и запустить первые 4 действия с 4 потоками, как только один из потоков будет выполнен, следующий запустится и т.д. - Таким образом, количество потоков равно либо 4, либо менее 4.

4b9b3361

Ответ 1

В стандартной библиотеке есть класс Queue в thread. Используя это, вы можете сделать что-то вроде этого:

require 'thread'

queue = Queue.new
threads = []

# add work to the queue
queue << work_unit

4.times do
  threads << Thread.new do
    # loop until there are no more things to do
    until queue.empty?
      # pop with the non-blocking flag set, this raises
      # an exception if the queue is empty, in which case
      # work_unit will be set to nil
      work_unit = queue.pop(true) rescue nil
      if work_unit
        # do work
      end
    end
    # when there is no more work, the thread will stop
  end
end

# wait until all threads have completed processing
threads.each { |t| t.join }

Причина, по которой я использую неблокирующий флаг, заключается в том, что между until queue.empty? и pop другим потоком может быть выбрана очередь, поэтому, если не установлен флаг блокировки, мы могли бы застрять на этой линии навсегда.

Если вы используете MRI, интерпретатор Ruby по умолчанию, помните, что потоки не будут абсолютно параллельными. Если ваша работа связана с процессором, вы можете просто работать с одним потоком. Если у вас есть какая-то операция, которая блокируется в IO, вы можете получить несколько parallelism, но YMMV. Кроме того, вы можете использовать интерпретатор, который позволяет использовать полный concurrency, например, jRuby или Rubinius.

Ответ 2

Там есть несколько драгоценных камней, которые реализуют этот шаблон для вас; параллельный, персиковый и шахтный называется threach (или jruby_threach под jruby). Это замена для #each, но позволяет указать, сколько потоков будет выполняться, используя SizedQueue внизу, чтобы не допустить, чтобы вещи не выходили из-под контроля.

Итак...

(1..10).threach(4) {|i| do_my_work(i) }

Не толкать мои собственные вещи; есть много хороших реализаций, чтобы сделать вещи проще.

Если вы используете JRuby, jruby_threach - это намного лучшая реализация - Java просто предлагает гораздо более богатый набор примитивов и структур данных для использования.

Ответ 3

Исполняемый описательный пример:

require 'thread'

p tasks = [
    {:file => 'task1'},
    {:file => 'task2'},
    {:file => 'task3'},
    {:file => 'task4'},
    {:file => 'task5'}
]

tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}

# run workers
workers_count = 3
workers = []
workers_count.times do |n|
    workers << Thread.new(n+1) do |my_n|
        while (task = tasks_queue.shift(true) rescue nil) do
            delay = rand(0)
            sleep delay
            task[:result] = "done by worker ##{my_n} (in #{delay})"
            p task
        end
    end
end

# wait for all threads
workers.each(&:join)

# output results
puts "all done"
p tasks

Ответ 6

Я использую драгоценный камень, называемый work_queue. Это действительно практично.

Пример:

require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
    wq.enqueue_b("Thread#{number}") do |thread_name|  
        puts "Hello from the #{thread_name}"
    end
end
wq.join