Вложенный parallelism в Python - программирование
Подтвердить что ты не робот

Вложенный parallelism в Python

Я пытаюсь выполнить многопроцессорное программирование с помощью Python. Возьмите алгоритм разделения и покоя, например, Fibonacci. Поток выполнения программы разветвляется как дерево и выполняется параллельно. Другими словами, мы имеем пример вложенный parallelism.

Из Java я использовал шаблон threadpool для управления ресурсами, так как программа могла быстро разветвляться и создавать слишком много короткоживущих потоков. Один статический (общий) поток можно создать через ExecutorService.

Я бы ожидал того же для Пул, но кажется, что Объект пула не должен быть глобально распространен. Например, совместное использование пула с помощью multiprocessing.Manager.Namespace() приведет к ошибке.

объекты пула не могут быть переданы между процессами или маринованными

У меня есть вопрос из двух частей:

  • Что мне здесь не хватает; почему не следует делиться пулом между процессами?
  • Что такое шаблон для реализации вложенных parallelism в Python?. Если возможно, поддерживая рекурсивную структуру, а не торгуя на итерации.

from concurrent.futures import ThreadPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

def main():
    global pool

    N = int(10)
    with ThreadPoolExecutor(2**N) as pool:
        print(fibonacci(N))

main()

Java

public class FibTask implements Callable<Integer> {

    public static ExecutorService pool = Executors.newCachedThreadPool();
    int arg;

    public FibTask(int n) {
        this.arg= n;
    }

    @Override
    public Integer call() throws Exception {
        if (this.arg > 2) { 
            Future<Integer> left = pool.submit(new FibTask(arg - 1));
            Future<Integer> right = pool.submit(new FibTask(arg - 2));
            return left.get() + right.get();
        } else {
            return 1;
        }

    } 

  public static void main(String[] args) throws Exception {
      Integer n = 14;
      Callable<Integer> task = new FibTask(n);
      Future<Integer> result =FibTask.pool.submit(task); 
      System.out.println(Integer.toString(result.get()));
      FibTask.pool.shutdown();            
  }    

}

Я не уверен, имеет ли это значение здесь, но я игнорирую разницу между "процессом" и "потоком"; для меня они оба означают "виртуализированный процессор". Я понимаю, что цель пула заключается в совместном использовании "пула" или ресурсов. Запуск задач может сделать запрос в пул. По мере выполнения параллельных задач в других потоках эти потоки могут быть восстановлены и назначены для новых задач. Мне не имеет смысла запрещать совместное использование пула, чтобы каждый поток должен создавать свой собственный новый пул, поскольку это, казалось бы, превзошло цель пула потоков.

4b9b3361

Ответ 1

1) Что мне здесь не хватает; почему нельзя распределять пул между процессами?

Не все объекты/экземпляры выбираются/сериализуются, в этом случае пул использует threading.lock, который не подбирается:

>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
[...]
  File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

или лучше:

>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File 
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle lock objects

Если вы думаете об этом, это имеет смысл, блокировка - это примитив семафора, управляемый операционной системой (поскольку python использует собственные потоки). Возможность рассортировать и сохранять это состояние объекта внутри исполняемого файла python действительно не будет достигать ничего значимого, поскольку его истинное состояние хранится ОС.

2) Что такое шаблон для реализации вложенных parallelism в Python? Если возможно, сохранение рекурсивной структуры, а не торговля ею для итерации

Теперь, для престижа, все, о чем я упоминал выше, на самом деле не относится к вашему примеру, поскольку вы используете потоки (ThreadPoolExecutor), а не процессы (ProcessPoolExecutor), поэтому никакого обмена данными между процессами не должно произойти.

Ваш пример java просто кажется более эффективным, так как пул потоков, который вы используете (CachedThreadPool), создает новые потоки по мере необходимости, тогда как реализации исполнителей python ограничены и требуют явного максимального количества потоков (max_workers). Там немного различий синтаксиса между языками, которые также, кажется, отбрасывают вас (статические экземпляры в python - это, по сути, все, что явно не охвачено), но по сути оба примера создавали бы точно такое же количество потоков для выполнения. Например, здесь приведен пример использования довольно наивной реализации CachedThreadPoolExecutor в python:

from concurrent.futures import ThreadPoolExecutor

class CachedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self):
        super(CachedThreadPoolExecutor, self).__init__(max_workers=1)

    def submit(self, fn, *args, **extra):
        if self._work_queue.qsize() > 0:
            print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
            self._max_workers +=1

        return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)

pool = CachedThreadPoolExecutor()

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

print(fibonacci(10))

Настройка производительности:

Я настоятельно рекомендую заглянуть в gevent, так как это даст вам высокий concurrency без накладных расходов на поток. Это не всегда так, но ваш код на самом деле является плакатным ребенком для использования gevent. Вот пример:

import gevent

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = gevent.spawn(fibonacci, n - 1)
    b = gevent.spawn(fibonacci, n - 2)
    return a.get()  + b.get()

print(fibonacci(10))

Полностью ненаучный, но на моем компьютере приведенный выше код работает быстрее 9x, чем его эквивалент в потоке.

Надеюсь, это поможет.

Ответ 2

1. Что мне здесь не хватает; почему нельзя распределять пул между процессами?

Обычно вы не можете совместно использовать потоки ОС между процессами, независимо от языка.

Вы можете организовать доступ к диспетчеру пула с рабочими процессами, но это, вероятно, не является хорошим решением любой проблемы; см. ниже.

2. Что такое шаблон для реализации вложенных parallelism в Python? Если возможно, поддерживая рекурсивную структуру, а не торгуя ее для итерации.

Это сильно зависит от ваших данных.

В CPython общий ответ заключается в использовании структуры данных, которая реализует эффективные параллельные операции. Хорошим примером этого является NumPy оптимизированные типы массивов: здесь является примером их использования для разделения операции большого массива на несколько процессорных ядер.

Функция Fibonacci, реализованная с использованием рекурсии блокировки, является особенно пессимальной для любого подхода, основанного на рабочем пуле, хотя: fib (N) потратит большую часть своего времени, просто связывая N рабочих, которые ничего не делают, кроме ожидания других работников. Существует много других способов прямого подхода к функции Фибоначчи (например, используя CPS, чтобы устранить блокировку и заполнить постоянное число работников), но это вероятно, лучше решить вашу стратегию, основанную на реальных проблемах, которые вы будете решать, а не на примерах, подобных этому.