Подтвердить что ты не робот

Правильный способ отмены приема и закрытия обработки/многопроцессорности Python.

(Я использую модуль pyprocessing в этом примере, но замена обработки на многопроцессорность, вероятно, будет работать, если вы запустите python 2.6 или используйте многопроцессорный backport)

В настоящее время у меня есть программа, которая прослушивает сокет unix (используя обработчик .connection.Listener), принимает соединения и порождает поток, обрабатывающий запрос. В какой-то момент я хочу выйти из процесса изящно, но так как accept() - вызов блокируется, и я не вижу способа отменить его красивым способом. У меня есть один способ, который работает здесь (OS X), по крайней мере, установка обработчика сигнала и сигнализация процесса из другого потока следующим образом:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

Единственный другой способ, о котором я думал, - это глубоко проникнуть в соединение (listener._listener._socket) и установить параметр блокировки... но это, вероятно, имеет некоторые побочные эффекты и, как правило, очень страшно.

Есть ли у кого-нибудь более элегантный (и, возможно, даже правильный!) способ сделать это? Он должен быть портативным для OS X, Linux и BSD, но переносимость Windows и т.д. Не требуется.

Разъяснение: Спасибо всем! Как обычно, обнаруживаются двусмысленности в моем первоначальном вопросе:)

  • Мне нужно выполнить очистку после того, как я отменил прослушивание, и я не всегда хочу выйти из этого процесса.
  • Мне нужно иметь доступ к этому процессу из других процессов, не созданных из одного и того же родителя, что делает Queues неудобными
  • Причины для потоков:
    • Они получают доступ к общему состоянию. На самом деле более или менее общая база данных в памяти, поэтому я полагаю, что это можно сделать по-другому.
    • Я должен иметь возможность одновременного подключения нескольких соединений, но фактические потоки блокируют что-то большую часть времени. Каждое принятое соединение создает новый поток; это, чтобы не блокировать всех клиентов в операциях ввода-вывода.

Что касается потоков против процессов, я использую потоки для того, чтобы блокировать операции блокировки и процессы, чтобы обеспечить многопроцессорность.

4b9b3361

Ответ 1

Разве это не то, что выбрано для?

Разрешить только прием в сокете, если выбор указывает, что он не будет блокировать...

У выбора есть тайм-аут, поэтому вы иногда можете периодически выходить, чтобы проверить если его время для закрытия....

Ответ 2

Я думал, что могу избежать этого, но мне кажется, что я должен сделать что-то вроде этого:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

Я знаю, что это нарушает закон demeter и вводит какую-то вредоносную атаку обезьян, но кажется, что это будет самый простой способ сделать это. Если у кого-то будет более элегантное решение, я был бы рад услышать его:)

Ответ 3

Я новичок в модуле многопроцессорности, но мне кажется, что смешивание модуля обработки и потокового модуля противоречиво, не нацелены ли они на решение одной и той же проблемы?

В любом случае, как обматывать ваши функции прослушивания в самом процессе? Я не понимаю, как это влияет на остальную часть вашего кода, но это может быть более чистая альтернатива.

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'

Ответ 4

Вероятно, не идеальный, но вы можете освободить блок, отправив сокету некоторые данные из обработчика сигнала или поток, который завершает процесс.

EDIT: Другой способ реализовать это может заключаться в использовании Connection Queues, поскольку они кажутся для поддержки тайм-аутов (извинения, я неправильно читаю ваш код в первом чтении).