Подтвердить что ты не робот

Получить уникальный идентификатор для рабочего в многопроцессорном пуле python

Есть ли способ назначить каждому работнику в пуле многопроцессорности python уникальный идентификатор таким образом, чтобы задание, выполняемое конкретным работником в пуле, могло знать, какой рабочий его выполняет? Согласно документам, Process имеет name, но

Имя - это строка, используемая только для целей идентификации. Он не имеет семантика. Несколько процессов могут иметь одно и то же имя.

Для моего конкретного случая использования я хочу запустить кучу заданий на группе из четырех графических процессоров и установить номер устройства для GPU, над которым должно работать задание. Поскольку задания имеют неравномерную длину, я хочу быть уверенным, что у меня нет столкновения на графическом процессоре задания, пытающегося запустить его до того, как предыдущий завершится (так что это исключает предварительную привязку идентификатора к единица работы раньше времени).

4b9b3361

Ответ 1

Кажется, что вы хотите просто: multiprocessing.current_process(). Например:

import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

Вывод:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]

Это возвращает сам объект процесса, поэтому процесс может быть его собственной идентичностью. Вы также можете вызвать id для него для уникального числового id - в cpython, это адрес памяти объекта процесса, поэтому я не думаю, что есть какая-либо возможность перекрытия. Наконец, вы можете использовать свойство ident или pid для процесса - но это значение устанавливается только после запуска процесса.

Кроме того, глядя на источник, мне кажется очень вероятным, что автогенерируемые имена (как показано в первом значении в строках Process выше) уникальны. multiprocessing поддерживает объект itertools.counter для каждого процесса, который используется для генерации кортежа _identity для любых дочерних процессов, которые он порождает. Таким образом, процесс верхнего уровня создает дочерний процесс с идентификаторами с одним значением, и они порождают процесс с двузначными идентификаторами и так далее. Затем, если имя не передано конструктору Process, оно просто автогенерирует имя на основе _identity, используя ':'.join(...). Затем Pool изменяет имя процесса с помощью replace, оставив автогенерированный идентификатор одинаковым.

Результатом всего этого является то, что хотя два Process es могут иметь одно и то же имя, потому что вы можете назначить им одно и то же имя при их создании, они уникальны, если вы не коснетесь параметра имени. Кроме того, теоретически можно использовать _identity как уникальный идентификатор; но я понимаю, что они сделали эту переменную конфиденциальной по какой-то причине!

Пример приведенного выше:

import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

Вывод:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]

Ответ 2

Вы можете использовать multiprocessing.Queue для хранения идентификаторов, а затем получить идентификатор при инициализации процесса пула.

Преимущества:

  • Вам не нужно полагаться на внутренние компоненты.
  • Если ваш вариант использования предназначен для управления ресурсами/устройствами, вы можете напрямую ввести номер устройства. Это также гарантирует, что ни одно устройство не будет использоваться дважды: если в вашем пуле больше процессов, чем у устройств, дополнительные процессы будут блокироваться на queue.get() и не будут выполнять какую-либо работу (это не будет блокировать вашу программу или, по крайней мере, не было, когда я тестировал).

Недостатки:

  • У вас есть дополнительные накладные расходы на связь и нерестование пула процесс занимает немного больше времени: без sleep(1) в Например, вся работа может быть выполнена первым процессом, так как другие еще не инициализируются.
  • Вам нужен глобальный (или, по крайней мере, я не знаю пути вокруг него)

Пример:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))

Вывод:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]

Обратите внимание, что существует только 4 разных pid, хотя пул содержит 8 процессов, а один idx используется только одним процессом.

Ответ 3

Я сделал это с потоковой обработкой и закончил использование очереди для управления работой. Вот базовая линия. В моей полной версии есть куча try-catches (особенно у рабочего, чтобы убедиться, что q.task_done() вызывается даже при ошибке).

from threading import Thread
from queue import Queue
import time
import random


def run(idx, *args):
    time.sleep(random.random() * 1)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        while True:
            args = q.get()
            run(idx, *args)
            q.task_done()

    for job in jobs:
        q.put(job)

    for i in range(0, workers):
        t = Thread(target=worker, args=[i])
        t.daemon = True
        t.start()

    q.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

Мне не нужно было использовать многопроцессорность (мои работники только для вызова внешнего процесса), но это может быть расширено. API для многопроцессорности меняет его на ощупь, вот как вы могли бы адаптироваться:

from multiprocessing import Process, Queue
from Queue import Empty
import time
import random

def run(idx, *args):
    time.sleep(random.random() * i)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        try:
            while True:
                args = q.get(timeout=1)
                run(idx, *args)
        except Empty:
            return

    for job in jobs:
        q.put(job)

    processes = []
    for i in range(0, workers):
        p = Process(target=worker, args=[i])
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes: 
        p.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

Обе версии выдадут что-то вроде:

0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)