Подтвердить что ты не робот

Параллельные функции отображения в IPython с несколькими параметрами

Я пытаюсь использовать параллельную среду IPython, и до сих пор она выглядит великолепно, но я столкнулся с проблемой. Допустим, что у меня есть функция, определенная в библиотеке

def func(a,b):
   ...

который я использую, когда хочу оценить одно значение a и кучу значений b.

[func(myA, b) for b in myLongList]

Очевидно, что реальная функция сложнее, но суть дела в том, что она принимает несколько параметров, и я хотел бы отобразить только одну из них. Проблема в том, что map, @dview.parallel и т.д. Отображают все аргументы.

Итак, скажем, я хочу получить ответ на func (myA, myLongList). Очевидный способ сделать это - карри, либо w/functools.partial, либо как

dview.map_sync(lambda b: func(myA, b),   myLongList)

Однако это не работает правильно на удаленных компьютерах. Причина в том, что когда выражение лямбда маринуется, значение myA не включается и вместо этого используется значение myA из локальной области на удаленной машине. Когда закрытие замачивается, переменные, которые они закрывают, не делают.

Два способа, которые я могу придумать, чтобы это сделать, это на самом деле работать, чтобы вручную создавать списки для каждого аргумента и иметь работу над всеми аргументами,

dview.map_sync(func, [myA]*len(myLongList), myLongList)   

или для ужасного использования данных в качестве аргументов по умолчанию для функции, заставляющей ее замачиваться:

# Can't use a lambda here b/c lambdas don't use default arguments :(
def parallelFunc(b, myA = myA):
    return func(myA, b)

dview.map_sync(parallelFunc, myLongList)

Действительно, все это кажется ужасно искаженным, когда реальная функция принимает множество параметров и сложнее. Есть ли какой-то идиоматический способ сделать это? Что-то вроде

@parallel(mapOver='b')
def  bigLongFn(a, b):
   ...

но, насколько я знаю, ничего подобного "mapOver" не существует. Я, вероятно, имею представление о том, как его реализовать... это просто очень простая операция, в которой должна существовать поддержка, поэтому я хочу проверить, не хватает ли я чего-то.

4b9b3361

Ответ 1

Я могу немного улучшить ответ на бату (который я считаю хорошим, но, возможно, не документирую так подробно, ПОЧЕМУ вы используете эти параметры). Документация ipython также в настоящее время является крайне неадекватной в этом вопросе. Таким образом, ваша функция имеет форму:

def myfxn(a,b,c,d):
  ....
  return z

и сохраняется в файле mylib. Допустим, что b, c и d одинаковы во время вашего прогона, поэтому вы пишете лямбда-функцию, чтобы уменьшить ее до 1-параметрической функции.

import mylib
mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

и вы хотите запустить:

z=dview.map_sync(mylamfxn, iterable_of_a)

В мире мечты все будет так волноваться. Однако сначала вы получите сообщение об ошибке "mylib not found", потому что процессы ipcluster не загружают mylib. Убедитесь, что процессы ipcluster имеют "mylib" в своем пути python и при необходимости находятся в правильном рабочем каталоге для myfxn. Затем вам нужно добавить код python:

dview.execute('import mylib')

который запускает команду import mylib для каждого процесса. Если вы попробуете еще раз, вы получите ошибку в строках "global variable b not defined", потому что, пока переменные находятся в вашем сеансе python, они не находятся в процессах ipcluster. Однако python предоставляет метод копирования группы переменных в подпроцессы. Продолжая приведенный выше пример:

mydict=dict(b=b, c=c, d=d)
dview.push(mydict)

Теперь все подпроцессы имеют доступ к b, c и d. Затем вы можете просто запустить:

z=dview.map_sync(mylamfxn, iterable_of_a)

и теперь он должен работать как рекламируемый. Во всяком случае, я новичок в параллельных вычислениях с помощью python и нашел этот поток полезным, поэтому я подумал, что постараюсь объяснить несколько моментов, которые немного смутили меня.

Конечным кодом будет:

import mylib

#set up parallel processes, start ipcluster from command line prior!
from IPython.parallel import Client
rc=Client()
dview=rc[:]

#...do stuff to get iterable_of_a and b,c,d....

mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

dview.execute('import mylib')
mydict=dict(b=b, c=c, d=d)
dview.push(mydict)
z=dview.map_sync(mylamfxn, iterable_of_a)

Это, пожалуй, самый быстрый и простой способ сделать практически любой неловко параллельный код, выполняемый параллельно в python....

ОБНОВЛЕНИЕ. Вы также можете использовать dview, чтобы выталкивать все данные без циклов, а затем использовать lview (т.е. lview=rc.load_balanced_view(); lview.map(...) для выполнения фактического расчета в режиме балансировки нагрузки.

Ответ 2

Это мое первое сообщение для StackOverflow, поэтому, пожалуйста, будьте осторожны;) Я пытался сделать то же самое и придумал следующее. Я уверен, что это не самый эффективный способ, но, похоже, работает несколько. Одно из предостережений на данный момент заключается в том, что по какой-то причине я вижу только два двигателя, работающих на 100%, остальные сидят почти без дела...

Чтобы вызвать множественную функцию arg на карте, я сначала написал эту процедуру в своем личном модуле parallel.py:

def map(r,func, args=None, modules=None):
"""
Before you run parallel.map, start your cluster (e.g. ipcluster start -n 4)

map(r,func, args=None, modules=None):
args=dict(arg0=arg0,...)
modules='numpy, scipy'    

examples:
func= lambda x: numpy.random.rand()**2.
z=parallel.map(r_[0:1000], func, modules='numpy, numpy.random')
plot(z)

A=ones((1000,1000));
l=range(0,1000)
func=lambda x : A[x,l]**2.
z=parallel.map(r_[0:1000], func, dict(A=A, l=l))
z=array(z)

"""
from IPython.parallel import Client
mec = Client()
mec.clear()
lview=mec.load_balanced_view()
for k in mec.ids:
  mec[k].activate()
  if args is not None:
    mec[k].push(args)
  if modules is not None:
    mec[k].execute('import '+modules)
z=lview.map(func, r)
out=z.get()
return out

Как вы можете видеть, функция принимает параметр args, который является параметром параметров в рабочем пространстве головных узлов. Затем эти параметры переносятся в двигатели. В этот момент они становятся локальными объектами и могут использоваться в функции напрямую. Например, в последнем примере, приведенном выше в комментариях, матрица А срезается с использованием локальной переменной l.

Я должен сказать, что, хотя эта функция работает, на данный момент я не на 100% доволен ею. Если я смогу придумать что-то лучшее, я отправлю его здесь.

UPDATE: 2013/04/11 Я внес небольшие изменения в код: - В инструкции активации отсутствуют скобки. Причинить его не запускать. - Перемещено mec.clear() в начало функции, а не в конец. Я также заметил, что он работает лучше всего, если я запускаю его в ipython. Например, я могу получить ошибки, если я запустил script, используя указанную выше функцию как "python./myparallelrun.py", но не, если я запустил ее в ipython, используя "% run./myparallelrun.py". Не знаете, почему...

Ответ 3

Элегантный способ сделать это - частичные функции.

Если вы знаете, что первый аргумент foo должен быть myArg, вы можете создать новую функциональную панель с помощью

from functools import partial
bar = partial(foo, myARg)

bar(otherArg) затем вернет foo(myArg,otherArg)

Ответ 4

построим на этом:

dview.map_sync(func, [myA]*len(myLongList), myLongList)

возможно следующее:

from itertools import izip_longest
dview.map_sync(func, izip_longest(myLongList, [], fillvalue=myA))

Пример:

>>> # notice that a is a tuple
... concat = lambda a: '%s %s' % a
>>> mylonglist = range(10)
>>> from itertools import izip_longest
>>> map(concat, izip_longest(mylonglist, [], fillvalue='mississippi'))
['0 mississippi', '1 mississippi', '2 mississippi', '3 mississippi',
'4 mississippi', '5 mississippi', '6 mississippi', '7 mississippi',
'8 mississippi', '9 mississippi']

Ответ 5

Я отправляю комментарий Alex S. как ответ. Вероятно, это правильный подход для этой проблемы:

Просто выполните частичное приложение с помощью лямбда. Я знаю, что это выглядит странно, но используя my_f = lambda a, my = other, arguments = go, right = here: f (a, my, arguments, right) - это самый простой способ обойти это, не впадая в травление и проблемы с нажатием.