Подтвердить что ты не робот

Gevent monkeypatching прерывает многопроцессорность

Я пытаюсь использовать многопроцессорный пул для запуска группы процессов, каждый из которых будет запускать пул gevent из greenlets. Причина этого в том, что существует много активности в сети, но также много активности процессора, поэтому, чтобы максимизировать пропускную способность и все мои ядра процессора, мне нужно несколько процессов и gentent async monkey patching. Я использую многопроцессорный менеджер для создания очереди, к которой процессы будут обращаться, чтобы получить данные для обработки.

Вот упрощенный фрагмент кода:

import multiprocessing

from gevent import monkey
monkey.patch_all(thread=False)

manager = multiprocessing.Manager()
q = manager.Queue()

Вот его исключение:

Traceback (most recent call last):
  File "multimonkeytest.py", line 7, in <module>
    q = manager.Queue()
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
 IOError: [Errno 35] Resource temporarily unavailable

Я считаю, что это должно быть связано с некоторой разницей между поведением нормального модуля сокета и модулем сокета gevent.

Если я обезвреживаю в подпроцессе, очередь создается успешно, но когда подпроцесс пытается получить() из очереди, возникает очень подобное исключение. Сокет должен быть обезврежен из-за большого количества сетевых запросов в подпроцессах.

Моя версия gevent, которая, как мне кажется, самая последняя:

>>> gevent.version_info
(1, 0, 0, 'alpha', 3)

Любые идеи?

4b9b3361

Ответ 1

использовать monkey.patch_all(thread=False, socket=False)

Я столкнулся с той же проблемой в аналогичной ситуации и проследил ее до строки 115 в gevent/monkey.py в функции patch_socket(): _socket.socket = socket.socket. Комментирование этой строки предотвращает поломку.

Здесь gevent заменяет библиотеку stdlib socket своей собственной. multiprocessing.connection использует библиотеку socket довольно широко и, по-видимому, не допускает этого изменения.

В частности, вы увидите это в любом сценарии, когда импортируемый модуль выполняет вызов gevent.monkey.patch_all() без установки socket=False. В моем случае это было grequests, и я должен был переопределить исправление модуля сокета, чтобы исправить эту ошибку.

Ответ 2

Известно, что применение многопроцессорности в контексте gevent вызывает проблемы. Однако ваше обоснование разумно ( "много активности сети, но также и много активности процессора" ). Если хотите, просмотрите http://gehrcke.de/gipc. Это предназначено в первую очередь для вашего случая использования. С помощью gipc вы можете легко создать несколько полностью управляемых gevent дочерних процессов и позволить им взаимодействовать друг с другом и/или с родителем через каналы.

Если у вас есть конкретные вопросы, вы можете вернуться ко мне.

Ответ 3

Если вы будете использовать исходную очередь, то код будет работать нормально даже с патчей-патчей с обезьяной.

import multiprocessing

from gevent import monkey
monkey.patch_all(thread=False)

q= multiprocessing.Queue()

Ответ 4

Ваш предоставленный код работает для меня в Windows 7.

EDIT:

Удален предыдущий ответ, потому что я пробовал свой код на Ubuntu 11.10 VPS, и я получаю ту же ошибку.

Посмотрите, как У этой проблемы тоже есть эта проблема

Ответ 5

Написал новый плагин Nose Multiprocess - он должен хорошо играть со всеми типами сумасшедших патчей на основе Gevent.

https://pypi.python.org/pypi/nose-gevented-multiprocess/

https://github.com/dvdotsenko/nose_gevent_multiprocess

  • Переключает с multiprocess.fork в обычный subprocess.popen для рабочих процессов (исправляет проблемы с ошибочно разделяемыми модульными уровнями для меня)
  • Переключено с multiprocess.Queue на JSON-RPC через HTTP для master-to-клиентов RPC
  • Теперь это теоретически позволяет распределять тесты на несколько машин.