Подтвердить что ты не робот

Python: как я могу запускать функции python параллельно?

Я исследовал сначала и не смог найти ответ на мой вопрос. Я пытаюсь запустить несколько функций параллельно в Python.

У меня есть что-то вроде этого:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или с одним и тем же объектом. Прямо сейчас мне нужно дождаться, когда func1 завершится до начала func2. Как мне сделать что-то вроде ниже:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Я хочу, чтобы иметь возможность создавать оба каталога довольно близко к одному времени, потому что каждый мин я подсчитываю, сколько файлов создается. Если каталог не существует, он отключит мое время.

4b9b3361

Ответ 1

Вы можете использовать threading или multiprocessing.

Из-за особенностей CPython, threading вряд ли достигнет истины parallelism. По этой причине multiprocessing обычно лучше.

Вот полный пример:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()
Механизм запуска/объединения дочерних процессов может быть легко инкапсулирован в функцию по строкам вашего runBothFunc:
def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

Ответ 2

Это может быть сделано элегантно с Ray, системой, которая позволяет вам легко распараллеливать и распространять ваш код Python.

Чтобы распараллелить ваш пример, вам нужно определить свои функции с @ray.remote декоратора @ray.remote, а затем вызвать их с помощью .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Если вы передаете один и тот же аргумент обеим функциям, а аргумент велик, более эффективный способ сделать это - использовать ray.put(). Это позволяет избежать сериализации большого аргумента и создания двух его копий в памяти:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Если func1() и func2() возвращают результаты, вам нужно переписать код следующим образом:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Существует несколько преимуществ использования Ray по сравнению с многопроцессорным модулем. В частности, один и тот же код будет работать как на одной машине, так и на кластере машин. Для получения дополнительных преимуществ Рэй см. Этот пост.

Ответ 3

Нельзя гарантировать, что две функции будут выполняться синхронно друг с другом, что похоже на то, что вы хотите сделать.

Самое лучшее, что вы можете сделать, - разделить функцию на несколько шагов, а затем дождаться, когда оба будут завершены в критических точках синхронизации, используя Process.join, как упоминания @aix answer.

Это лучше, чем time.sleep(10), потому что вы не можете гарантировать точное время. При явном ожидании вы говорите, что функции должны выполняться, выполняя этот шаг, прежде чем перейти к следующему, вместо того, чтобы предполагать, что это будет сделано в течение 10 мс, что не гарантируется на основе того, что еще происходит на машине.

Ответ 4

Если вы являетесь пользователем Windows и используете python 3, этот пост поможет вам выполнить параллельное программирование в python. Когда вы запускаете обычное программирование пула библиотеки многопроцессорности, вы получите сообщение об ошибке в отношении основной функции вашей программы. Это связано с тем, что в Windows нет функции fork(). Следующая статья дает решение указанной проблемы.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Поскольку я использовал python 3, я немного изменил программу:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

После этой функции вышеупомянутый код проблемы также немного изменился:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

И я получил вывод как:

[1, 8, 27, 64, 125, 216]

Я думаю, что этот пост может быть полезен для некоторых пользователей Windows.

Ответ 6

Если ваши функции в основном выполняют работу по вводу/выводу (и меньше работы с ЦП), и у вас есть Python 3. 2+, вы можете использовать ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Если ваши функции в основном выполняют работу ЦП (и меньше операций ввода-вывода) и у вас есть Python 2. 6+, вы можете использовать модуль многопроцессорной обработки:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])