Подтвердить что ты не робот

Могу ли я использовать многопроцессорную очередь в функции, называемой пулом Pool.imap?

Я использую python 2.7 и пытаюсь запустить некоторые сложные задачи процессора в своих собственных процессах. Я хотел бы иметь возможность отправлять сообщения обратно в родительский процесс, чтобы информировать его о текущем состоянии процесса. Очередь многопроцессорности кажется идеальной для этого, но я не могу понять, как заставить ее работать.

Итак, это мой основной рабочий пример минус использование очереди.

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Я попытался передать очередь несколькими способами, и они получили сообщение об ошибке "RuntimeError: объекты Queue должны делиться только между процессами через наследование". Вот один из способов, которым я попытался, основываясь на более раннем ответе, который я нашел. (Я получаю ту же проблему, пытаясь использовать Pool.map_async и Pool.imap)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Наконец, подход 0 для фитнеса (сделать его глобальным) не генерирует никаких сообщений, он просто блокируется.

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Я знаю, что он, вероятно, будет работать с multiprocessing.Process напрямую и что есть другие библиотеки, чтобы выполнить это, но я ненавижу отказываться от стандартных функций библиотеки, которые отлично подходят, пока я не уверен, что это не просто мое отсутствие знаний не позволяет мне эксплуатировать их.

Спасибо.

4b9b3361

Ответ 1

Трюк состоит в том, чтобы передать очередь в качестве аргумента инициализатору. Появляется для работы со всеми способами отправки пула.

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()

Ответ 2

Здравствуйте, я пытаюсь следующий код, чтобы получить процесс инвертирования: очистка очереди внутри процессов

import multiprocessing as mp
from random import *


def f(x):
    myId=random()

    output=[]    
    while not f.q.empty():
        z=f.q.get()                 
        print (str(myId)+"#"+str(z))
        output.append(z)            

    print(str(myId)+" ok")        
    return output

def f_init(q):
    f.q = q

def main():
    jobs = []



    q = mp.Queue()    
    for x in range(10):
        q.put([x,2*x,3*x])
        jobs.append(x)

    p = mp.Pool(None, f_init, [q])       
    results = p.imap(f, jobs)
    p.close()


    for bResult in results:        
        print(bResult)


if __name__ == '__main__':
    main()

У меня нет ошибок, но пул, кажется, никогда не заканчивается. Не весь процесс закончен. Я могу это исправить?

Ответ 3

странно: если я удаленно печатаю статистику в функции f (x), она работает...

def f(x):
    myId=random()

    output=[]    
    while not f.q.empty():
        z=f.q.get()
        output.append(z)            


    return output