Подтвердить что ты не робот

Multiprocessing.Pool делает матричное умножение Numpy медленнее

Итак, я играю с multiprocessing.Pool и Numpy, но, похоже, я пропустил какой-то важный момент. Почему версия pool намного медленнее? Я посмотрел на htop, и я вижу, что несколько процессов создаются, но все они разделяют один из процессоров, добавляющих до 100%.

$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925

[обновление]

  • изменен на timeit для процессов бенчмаркинга
  • init Пул с несколькими ядрами
  • изменилось вычисление, так что есть больше вычислений и меньше передачи памяти (надеюсь)

По-прежнему нет изменений. pool версия все еще медленнее, и я вижу в htop, что используется только одно ядро, также генерируется несколько процессов.

[Update2]

На данный момент я читаю о том, как @Jan-Philip Gehrcke предлагает использовать multiprocessing.Process() и Queue. Но тем временем я хотел бы знать:

  • Почему мой пример работает на tiago? Что может быть причиной того, что он не работает на моей машине 1?
  • Является ли в моем примере кодом любое копирование между процессами? Я намеревался, чтобы мой код дал каждому потоку одну матрицу списка матриц.
  • Мой код плохой пример, потому что я использую Numpy?

Я узнал, что часто получается лучший ответ, когда другие знают мою конечную цель: у меня много файлов, которые загружаются и обрабатываются в серийном режиме. Обработка - интенсивность процессора, поэтому я предполагаю, что многое может быть достигнуто путем распараллеливания. Моя цель - вызвать функцию python, которая анализирует файл параллельно. Кроме того, эта функция является просто интерфейсом к C-коду, я полагаю, это имеет значение.

1 Ubuntu 12.04, Python 2.7.3, i7 860 @2.80 - Пожалуйста, оставьте комментарий, если вам нужна дополнительная информация.

[Update3]

Вот результаты из примера кода Стефано. По какой-то причине ускорения не происходит.:/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34

[обновление 4] ответ на комментарий Джан-Филипа Герке

Извините, что я не стал более ясным. Как я писал в обновлении 2, моя главная цель - распараллелить многие последовательные вызовы функции библиотеки Python сторонних разработчиков. Эта функция является интерфейсом к некоторому C-коду. Мне было рекомендовано использовать pool, но это не сработало, поэтому я попробовал что-то более простое, показанный выше пример с Numpy. Но и там я не мог добиться улучшения производительности, даже если он ищет меня "emberassing parallelizable". Поэтому я предполагаю, что, должно быть, я пропустил что-то важное. Эта информация - это то, что я ищу с этим вопросом и щедростью.

[обновление 5]

Спасибо за ваш огромный вклад. Но чтение ваших ответов только создает для меня больше вопросов. По этой причине я прочитаю об основах и создаю новые вопросы SO, когда у меня будет более четкое понимание того, чего я не знаю.

4b9b3361

Ответ 1

Что касается того, что все ваши процессы работают на одном CPU, см. мой ответ здесь.

Во время импорта numpy изменяет близость процессора к родительскому процессу, так что, когда вы позже используете Pool, все рабочие процессы, которые он порождает, в конечном итоге будут бороться за одно и то же ядро, а не использовать все ядра, доступные на вашей машине.

Вы можете вызвать taskset после импорта numpy в reset сродства к процессору, чтобы использовать все ядра:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

Вывод:

    $ python tmp.py                                     
    12.4765810966
    pid 29150 current affinity mask: 1
    pid 29150 new affinity mask: ff
    13.4136221409

Если вы просматриваете использование ЦП с использованием top, пока вы запускаете этот script, вы должны увидеть его, используя все ваши ядра, когда он выполняет "параллельную" часть. Как указывали другие, в вашем первоначальном примере накладные расходы, связанные с травлением данных, создание процессов и т.д., Вероятно, перевешивают любую возможную выгоду от параллелизации.

Изменить: Я подозреваю, что часть причины, по которой единый процесс, по-видимому, выполняется быстрее, заключается в том, что numpy может иметь некоторые приемы для ускорения этого умножения по матрице, которое он не может использовать когда задания распределяются по нескольким ядрам.

Например, если я просто использую обычные списки Python для вычисления последовательности Фибоначчи, я могу получить огромное ускорение от параллелизации. Точно так же, если я делаю элементарное умножение таким образом, чтобы не использовать преимущества векторизации, я получаю аналогичное ускорение для параллельной версии:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt

Вывод:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528 current affinity mask: 1
pid 29528 new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163

Ответ 2

Непредсказуемая конкуренция между накладными расходами на связь и ускорением вычислений определенно является проблемой здесь. То, что вы наблюдаете, прекрасно. Независимо от того, получаете ли вы чистое ускорение, зависит от многих факторов и что-то, что должно быть определено количественно (как и вы).

Итак, почему multiprocessing так "неожиданно медленно" в вашем случае? multiprocessing map и map_async функции фактически распиливают объекты Python взад и вперед по каналам, которые соединяют родителя с ребенок обрабатывает. Это может занять значительное время. В течение этого времени дочерние процессы почти ничего не делают, что можно увидеть в htop. Между разными системами может быть значительная разница в производительности транспортировки труб, что также объясняет, почему для некоторых людей ваш код пула быстрее, чем ваш единственный код ЦП, хотя для вас это не так (здесь могут возникнуть другие факторы, это просто пример для объяснения эффекта).

Что вы можете сделать, чтобы сделать это быстрее?

  • Не рассортируйте входные данные в POSIX-совместимых системах.

    Если вы работаете в Unix, вы можете обойти потоки связи с родительским → потомком, используя преимущества поведения fork для процесса POSIX (копирование памяти при записи):

    Создайте свой ввод задания (например, список больших матриц) для работы в родительском процессе в глобально доступной переменной. Затем создайте рабочие процессы, вызвав multiprocessing.Process() самостоятельно. У детей возьмите задание от глобальной переменной. Проще говоря, это позволяет ребенку получить доступ к памяти родителя без каких-либо затрат на связь (*, пояснение ниже). Отправьте результат обратно родителям, например, a multiprocessing.Queue. Это сэкономит много коммуникационных издержек, особенно если выход мал по сравнению с входом. Этот метод не будет работать, например, Windows, потому что multiprocessing.Process() создает совершенно новый процесс Python, который не наследует состояние родителя.

  • Использовать многопоточность numpy. В зависимости от вашей реальной задачи расчета может случиться так, что включение multiprocessing вообще не поможет. Если вы скомпилируете numpy самостоятельно и включите директивы OpenMP, операции с крупными матрицами могут стать очень эффективными многопоточными (и распределены по многим ядрам ЦП, а GIL здесь не является ограничивающим фактором). В принципе, это наиболее эффективное использование нескольких ядер процессора, которые вы можете получить в контексте numpy/scipy.

* Ребенок не может напрямую обращаться к родительской памяти в целом. Однако после fork() родительский и дочерний объекты находятся в эквивалентном состоянии. Было бы глупо копировать всю память родителя в другое место в ОЗУ. Именно поэтому переходите к принципу копирования на запись. Пока ребенок не изменяет свое состояние памяти, он фактически обращается к родительской памяти. Только после модификации соответствующие биты и куски копируются в пространство памяти дочернего элемента.

Основное изменение:

Позвольте мне добавить фрагмент кода, который сжимает большое количество входных данных с несколькими рабочими процессами и следует рекомендациям "1. Не распиливайте входные данные в POSIX-совместимых системах". Кроме того, объем информации, переданной менеджеру-работнику (родительский процесс), довольно низок. Тяжелая вычислительная часть этого примера представляет собой разложение по одному значению. Он может широко использовать OpenMP. Я выполнил этот пример несколько раз:

  • Один раз с 1, 2 или 4 рабочими процессами и OMP_NUM_THREADS=1, поэтому каждый рабочий процесс создает максимальную нагрузку 100%. Там упомянутое поведение "число рабочих" - вычисление времени - почти линейно, а коэффициент ускорения сети соответствует количеству задействованных рабочих.
  • Один раз с 1, 2 или 4 рабочими процессами и OMP_NUM_THREADS=4, так что каждый процесс создает максимальную нагрузку 400% (путем нереста 4 потоков OpenMP). Моя машина имеет 16 реальных ядер, поэтому 4 процесса с максимальной нагрузкой 400% каждый будет получать максимальную производительность от машины. Масштабирование не является полностью линейным, а коэффициент ускорения - это не количество задействованных рабочих, но абсолютное время вычисления значительно уменьшается по сравнению с OMP_NUM_THREADS=1, а время по-прежнему значительно уменьшается с числом рабочих процессов.
  • Один раз с большими входными данными, 4 ядрами и OMP_NUM_THREADS=4. Это приводит к средней загрузке системы 1253%.
  • Однажды с той же настройкой, что и последняя, ​​но OMP_NUM_THREADS=5. Это приводит к средней загрузке системы 1598%, что говорит о том, что мы получили все от этой 16-ядерной машины. Однако фактическое время стены вычислений не улучшается по сравнению с последним случаем.

Код:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()

Выход:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps

Ответ 3

Ваш код верен. Я просто запустил систему (с двумя ядрами, гиперпоточность) и получил следующие результаты:

$ python test_multi.py 
30.8623809814
19.3914041519

Я просмотрел процессы и, как и ожидалось, параллельную часть, показывающую несколько процессов, работающих почти на 100%. Это должно быть что-то в вашей системе или установке python.

Ответ 4

По умолчанию Pool использует только n процессов, где n - количество процессоров на вашем компьютере. Вам нужно указать, сколько процессов вы хотите использовать, например Pool(5).

Подробнее см. здесь

Ответ 5

Измерение арифметической пропускной способности - очень сложная задача: в основном ваш тестовый пример слишком прост, и я вижу много проблем.

Сначала вы тестируете целочисленную арифметику: есть ли специальная причина? С плавающей точкой вы получаете результаты, сопоставимые по многим различным архитектурам.

Второй matrix = matrix*matrix перезаписывает входной параметр (матрицы передаются по ref, а не по значению), и каждый образец должен работать с разными данными...

Последние тесты должны проводиться в более широком диапазоне размеров проблем и количества рабочих, чтобы понять общие тенденции.

Итак, вот мой модифицированный тест script

import numpy as np
from timeit import timeit
from multiprocessing import Pool

def mmul(matrix):
    mymatrix = matrix.copy()
    for i in range(100):
        mymatrix *= mymatrix
    return mymatrix

if __name__ == '__main__':

    for n in (16, 32, 64):
        matrices = []
        for i in range(n):
            matrices.append(np.random.random_sample(size=(1000, 1000)))

        stmt = 'from __main__ import mmul, matrices'
        print 'testing with', n, 'matrices'
        print 'base',
        print '%5.2f' % timeit('r = map(mmul, matrices)', setup=stmt, number=1)

        stmt = 'from __main__ import mmul, matrices, pool'
        for i in (1, 2, 4, 8, 16):
            pool = Pool(i)
            print "%4d" % i, 
            print '%5.2f' % timeit('r = pool.map(mmul, matrices)', setup=stmt, number=1)
            pool.close()
            pool.join()

и мои результаты:

$ python test_multi.py 
testing with 16 matrices
base  5.77
   1  6.72
   2  3.64
   4  3.41
   8  2.58
  16  2.47
testing with 32 matrices
base 11.69
   1 11.87
   2  9.15
   4  5.48
   8  4.68
  16  3.81
testing with 64 matrices
base 22.36
   1 25.65
   2 15.60
   4 12.20
   8  9.28
  16  9.04

[UPDATE] Я запускаю этот пример дома на другом компьютере, получая последовательное замедление:

testing with 16 matrices
base  2.42
   1  2.99
   2  2.64
   4  2.80
   8  2.90
  16  2.93
testing with 32 matrices
base  4.77
   1  6.01
   2  5.38
   4  5.76
   8  6.02
  16  6.03
testing with 64 matrices
base  9.92
   1 12.41
   2 10.64
   4 11.03
   8 11.55
  16 11.59

Должен признаться, что я не знаю, кто виноват (numpy, python, компилятор, ядро)...

Ответ 6

Поскольку вы упоминаете, что у вас много файлов, я бы предложил следующее решение:

  • Составьте список имен файлов.
  • Напишите функцию, которая загружает и обрабатывает один файл, названный в качестве входного параметра.
  • Используйте Pool.map() для применения функции к списку файлов.

Поскольку каждый экземпляр теперь загружает свой собственный файл, единственными переданными данными являются имена файлов, а не (потенциально большие) массивы numpy.

Ответ 7

Я также заметил, что когда я запускал умножение матриц в функции Pool.map(), он работал намного медленнее на некоторых машинах. Моей целью было распараллелить мою работу с помощью Pool.map() и запустить процесс на каждом ядре моего компьютера. Когда дела шли быстро, умножение матриц-матриц было лишь небольшой частью всей параллельной работы. Когда я посмотрел на использование процессора процессами, я увидел, что каждый процесс может использовать, например, 400+% ЦП на машинах, где он работал медленно, но всегда & lt; = 100% на машинах, где он работал быстро. Для меня решение было остановить многопоточность. Оказывается, что numpy был настроен на многопоточность именно на тех машинах, где мой Pool.map() работал медленно. Очевидно, что если вы уже распараллеливаетесь с помощью Pool.map(), то наличие параллелизма и numpy также создает помехи. Я только что позвонил export MKL_NUM_THREADS=1 перед запуском своего кода на Python, и он везде работал быстро.

Ответ 8

Решение

Установите следующие переменные среды перед любым вычислением (вам может потребоваться установить их перед выполнением import numpy для некоторых более ранних версий numpy):

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"

Как это работает

Реализация numpy уже использует многопоточность с библиотеками оптимизации, такими как OpenMP или MKL или OpenBLAS и т.д. Поэтому мы не видим значительных улучшений, внедряя многопроцессорность самостоятельно. Еще хуже, мы страдаем слишком много потоков. Например, если моя машина имеет 8 ядер ЦП, когда я пишу однопроцессный код, numpy может использовать 8 потоков для расчета. Затем я использую многопроцессорность для запуска 8 процессов, я получаю 64 потока. Это не выгодно, и переключение контекста между потоками и другими издержками может стоить больше времени. Устанавливая вышеупомянутые переменные среды, мы ограничиваем число потоков на процесс до 1, поэтому мы получаем наиболее эффективное количество всех потоков.

Пример кода

from timeit import timeit
from multiprocessing import Pool
import sys
import os

import numpy as np

def matmul(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

def mixed(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

    s = 0
    for i in range(1000000):
        s += i

if __name__ == '__main__':
    if sys.argv[1] == "--set-num-threads":
        os.environ["OMP_NUM_THREADS"] = "1"
        os.environ["MKL_NUM_THREADS"] = "1"
        os.environ["OPENBLAS_NUM_THREADS"] = "1"
        os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
        os.environ["NUMEXPR_NUM_THREADS"] = "1"

    if sys.argv[2] == "matmul":
        f = matmul
    elif sys.argv[2] == "mixed":
        f = mixed

    print("Serial:")
    print(timeit(lambda: list(map(f, [0] * 8)), number=20))

    with Pool(8) as pool:
        print("Multiprocessing:")
        print(timeit(lambda: pool.map(f, [0] * 8), number=20))

Я протестировал код на экземпляре AWS p3.2xlarge, который имеет 8 виртуальных ЦП (что не обязательно означает 8 ядер):

$ python test_multi.py --no-set-num-threads matmul
Serial:
3.3447616740000115
Multiprocessing:
3.5941055110000093

$ python test_multi.py --set-num-threads matmul
Serial:
9.464500446000102
Multiprocessing:
2.570238267999912

Перед установкой этих переменных среды серийная версия и многопроцессорная версия не имели большого значения, примерно за 3 секунды, часто многопроцессорная версия была медленнее, как это демонстрирует OP. После установки количества потоков, мы видим, что серийная версия заняла 9,46 секунды, становясь намного медленнее! Это доказательство того, что Numpy использует многопоточность, даже когда используется один процесс. Многопроцессорная версия заняла 2,57 секунды, немного улучшенная, это может быть потому, что в моей реализации было сэкономлено время передачи данных между потоками.

Этот пример не продемонстрировал много возможностей многопроцессорной обработки, поскольку numpy уже использует распараллеливание. Многопроцессорная обработка наиболее полезна, когда обычные вычисления с интенсивным использованием процессора в Python смешиваются с пустыми операциями. Например,

$ python test_multi.py --no-set-num-threads mixed
Serial:
12.380275611000116
Multiprocessing:
8.190792100999943

$ python test_multi.py --set-num-threads mixed
Serial:
18.512066430999994
Multiprocessing:
4.8058130150000125

Здесь многопроцессорная обработка с числом потоков, установленным в 1, является самой быстрой.

Примечание: это также работает для некоторых других библиотек вычислений ЦП, таких как PyTorch.