Подтвердить что ты не робот

Каков правильный способ выхода из потока?

У меня есть объект Connection, который используется для хранения потоков чтения и записи соединений asyncio:

class Connection(object):

    def __init__(self, stream_in, stream_out):
        object.__init__(self)

        self.__in = stream_in
        self.__out = stream_out

    def read(self, n_bytes : int = -1):
        return self.__in.read(n_bytes)

    def write(self, bytes_ : bytes):
        self.__out.write(bytes_)
        yield from self.__out.drain()

На стороне сервера connected создает объект Connection каждый раз, когда клиент подключается, затем читает 4 байта.

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

И на стороне клиента выписываются 4 байта.

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

Это работает почти так, как ожидалось, но я должен yield from каждый вызов read и write. Я пробовал yield from ing изнутри Connection:

def read(self, n_bytes : int = -1):
    data = yield from self.__in.read(n_bytes)
    return data

Но вместо получения данных я получаю вывод вроде

<generator object StreamReader.read at 0x1109983b8>

Если я вызываю read и write из нескольких мест, я бы предпочел не повторять yield from каждый раз; сохраняя их внутри Connection. Моя конечная цель - сократить мою функцию new_conection:

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))
4b9b3361

Ответ 1

Поскольку StreamReader.read является сопрограммой, ваши единственные возможности для ее вызова: a) обернуть ее в Task или Future и запускать это через цикл событий, b) await используя его из сопрограммы, определенной с помощью async def, или c) используя yield from с ней из сопрограммы, определенной как функция, украшенная @asyncio.coroutine.

Так как Connection.read вызывается из цикла события (через сопрограмму new_connection), вы не можете повторно использовать этот цикл событий для запуска Task или Future для StreamReader.read: циклы событий не могут быть запущены, пока они уже запущены. Вы должны либо остановить цикл событий (катастрофично и, вероятно, не возможно сделать правильно), либо создать новый цикл событий (беспорядок и победить цель использования сопрограмм). Ни один из них не нужен, поэтому Connection.read должен быть сопрограммой или функцией async.

Остальные два параметра (await в сопрограмме async def или yield from в @asyncio.coroutine -декорированной функции) в основном эквивалентны. Единственное отличие состоит в том, что async def и await были добавлены в Python 3.5, поэтому для 3.4, используя yield from и @asyncio.coroutine, единственный вариант (сопрограммы и asyncio не существовали до 3.4, поэтому другие версии не имеют значения). Лично я предпочитаю использовать async def и await, потому что определение сопрограммы с async def более чистое и четкое, чем с декоратором.

Вкратце: Connection.read и new_connection - сопрограммы (используя либо декоратор, либо ключевое слово async), и используйте await (или yield from) при вызове других сопрограмм (await conn.read(4) in new_connection и await self.__in.read(n_bytes) в Connection.read).

Ответ 2

Я нашел фрагмент Исходный код StreamReader на линии 620 на самом деле является прекрасным примером использования функции.

В моем предыдущем ответе я упускал из виду тот факт, что self.__in.read(n_bytes) - это не только сопрограмма (которую я должен был знать, учитывая, что это был модуль asyncio XD), но он дает результат в строке. Так что это на самом деле генератор, и вам нужно будет уступить его.

Заимствуя этот цикл из исходного кода, ваша функция чтения должна выглядеть примерно так:

def read(self, n_bytes : int = -1):
    data = bytearray() #or whatever object you are looking for
    while 1:
        block = yield from self.__in.read(n_bytes)
        if not block:
            break
        data += block
    return data

Поскольку self.__in.read(n_bytes) является генератором, вы должны продолжать его получать, пока он не даст пустой результат, чтобы сигнализировать конец чтения. Теперь ваша функция чтения должна возвращать данные, а не генератор. Вам не придется выходить из этой версии conn.read().