Подтвердить что ты не робот

Асинхронная обработка исключений в Python

У меня есть следующий код, использующий asyncio и aiohttp для создания асинхронных HTTP-запросов.

import sys
import asyncio
import aiohttp

@asyncio.coroutine
def get(url):
    try:
        print('GET %s' % url)
        resp = yield from aiohttp.request('GET', url)
    except Exception as e:
        raise Exception("%s has error '%s'" % (url, e))
    else:
        if resp.status >= 400:
            raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))

    return (yield from resp.text())

@asyncio.coroutine
def fill_data(run):
    url = 'http://www.google.com/%s' % run['name']
    run['data'] = yield from get(url)

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    task = asyncio.wait([fill_data(r) for r in runs])
    loop.run_until_complete(task)   
    return runs

try:
    get_runs()
except Exception as e:
    print(repr(e))
    sys.exit(1)

По какой-то причине исключения, возникающие внутри функции get, не попадают:

Future/Task exception was never retrieved
Traceback (most recent call last):
  File "site-packages/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "mwe.py", line 25, in fill_data
    run['data'] = yield from get(url)
  File "mwe.py", line 17, in get
    raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'

Итак, каков правильный способ обработки исключений, вызванных курутами?

4b9b3361

Ответ 1

asyncio.wait фактически не использует переданный ему Futures, он просто ждет их завершения, а затем возвращает объекты Future:

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Подождите, пока объекты Futures и coroutine заданный фьючерсом последовательности для завершения. Корутины будут обернуты в Задачи. Возвращает два набора Future: (сделано, ожидается).

Пока вы на самом деле yield from элементы в списке done, они останутся неиспользованными. Поскольку ваша программа выходит без использования фьючерсов, вы видите, что сообщения "исключение никогда не были получены".

Для вашего прецедента, вероятно, имеет смысл использовать asyncio.gather, который фактически потребляет каждый Future, а затем возвратите одиночный Future, который агрегирует все их результаты (или поднимает первый Exception, брошенный будущим в списке ввода).

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*[fill_data(r) for r in runs])
    loop.run_until_complete(tasks)
    return runs

Вывод:

GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)

Обратите внимание, что asyncio.gather фактически позволяет настраивать его поведение, когда одно из фьючерсов вызывает исключение; поведение по умолчанию заключается в том, чтобы поднять первое исключение, которое оно нажимает, но оно также может просто вернуть каждый объект исключения в выходной список:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Возвращает будущие агрегирующие результаты из заданных объектов coroutine или фьючерсы.

Все фьючерсы должны иметь один и тот же цикл событий. Если все задачи выполнены успешно, результат возврата фьючерсов - это список результатов (в порядок исходной последовательности, не обязательно порядок результаты прибытия). Если return_exceptions есть True, исключения в задачи обрабатываются так же, как и успешные результаты, и список результатов; в противном случае первое поднятое исключение будет немедленно распространяется на возвращенное будущее.

Ответ 2

Отладка или "обработка" исключений в callback:

Coroutine, которые возвращают некоторые результаты или вызывают исключения:

@asyncio.coroutine
def async_something_entry_point(self):
    try:
        return self.real_stuff_which_throw_exceptions()
    except:
        raise Exception(some_identifier_here + ' ' + traceback.format_exc())

И обратный вызов:

def callback(self, future: asyncio.Future):
    exc = future.exception()
    if exc:
        # Handle wonderful empty TimeoutError exception
        if type(exc) == TimeoutError:
            self.logger('<Some id here> callback exception TimeoutError')
        else:
            self.logger("<Some id here> callback exception " + str(exc))

    # store your result where you want
    self.result.append(
        future.result()
    )