Как я могу написать asyncio сопрограммы, которые могут действовать как обычные функции? - программирование

Как я могу написать asyncio сопрограммы, которые могут действовать как обычные функции?

Я пишу библиотеку, которую я бы хотел, чтобы конечные пользователи могли произвольно использовать, как если бы ее методы и функции не были сопрограммами.

Например, с учетом этой функции:

@asyncio.coroutine
def blah_getter():
    return (yield from http_client.get('http://blahblahblah'))

Конечный пользователь, который не хочет использовать какие-либо асинхронные функции в своем собственном коде, все равно должен импортировать asyncio и запустить это:

>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())

Было бы здорово, если бы я мог, внутри blah_getter определить, был ли я вызван как сопрограмма или нет, и отреагировать соответственно.

Так что-то вроде:

@asyncio.coroutine
def blah_getter():
    if magically_determine_if_being_yielded_from():
        return (yield from http_client.get('http://blahblahblah'))
    else:
        el = asyncio.get_event_loop()
        return el.run_until_complete(http_client.get('http://blahblahblah'))
4b9b3361

Ответ 1

Вам нужны две функции: асинхронная сопрограмма и синхронная регулярная функция:

@asyncio.coroutine
def async_gettter():
    return (yield from http_client.get('http://example.com'))

def sync_getter()
    return asyncio.get_event_loop().run_until_complete(async_getter())

magically_determine_if_being_yielded_from() на самом деле event_loop.is_running(), но я настоятельно не рекомендую смешивать синхронизацию и асинхронный код в одной и той же функции.

Ответ 2

Я согласен с ответом Andrew, я просто хочу добавить, что если вы имеете дело с объектами, а не с функциями верхнего уровня, вы можете использовать метакласс для автоматического добавления синхронных версий ваших асинхронных методов. См. Этот пример:

import asyncio
import aiohttp

class SyncAdder(type):
    """ A metaclass which adds synchronous version of coroutines.

    This metaclass finds all coroutine functions defined on a class
    and adds a synchronous version with a '_s' suffix appended to the
    original function name.

    """
    def __new__(cls, clsname, bases, dct, **kwargs):
        new_dct = {}
        for name,val in dct.items():
            # Make a sync version of all coroutine functions
            if asyncio.iscoroutinefunction(val):
                meth = cls.sync_maker(name)
                syncname = '{}_s'.format(name)
                meth.__name__ = syncname
                meth.__qualname__ = '{}.{}'.format(clsname, syncname)
                new_dct[syncname] = meth
        dct.update(new_dct)
        return super().__new__(cls, clsname, bases, dct)

    @staticmethod
    def sync_maker(func):
        def sync_func(self, *args, **kwargs):
            meth = getattr(self, func)
            return asyncio.get_event_loop().run_until_complete(meth(*args, **kwargs))
        return sync_func

class Stuff(metaclass=SyncAdder):
    @asyncio.coroutine
    def getter(self, url):
        return (yield from aiohttp.request('GET', url))

Использование:

>>> import aio, asyncio
>>> aio.Stuff.getter_s
<function Stuff.getter_s at 0x7f90459c2bf8>
>>> aio.Stuff.getter
<function Stuff.getter at 0x7f90459c2b70>
>>> s = aio.Stuff()
>>> s.getter_s('http://example.com')
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:13:21 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:13:21 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xyz.com:80', 'CONNECTION': 'keep-alive'}>
>>> asyncio.get_event_loop().run_until_complete(s.getter('http://example.com'))
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:25:09 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:25:09 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xys.com:80', 'CONNECTION': 'keep-alive'}>

Ответ 3

Также вы можете создать простой декоратор, который сделает вашу функцию синхронной. Этот подход может быть применен к глобальным функциям и методам.

Пример.

# the decorator
def sync(f):
    ASYNC_KEY = 'async'

    def f_in(*args, **kwargs):
        if ASYNC_KEY in kwargs:
            async = kwargs.get(ASYNC_KEY)
            del kwargs[ASYNC_KEY]
        else:
            async = True

        if async:
            return f(*args, **kwargs)           
        else:
            return asyncio.get_event_loop().run_until_complete(f())

    return f_in

# below: the usage    
@sync
async def test():
    print('In sleep...')
    await asyncio.sleep(1)
    print('After sleep')    


# below: or asyncio.get_event_loop().create_task(test())
asyncio.get_event_loop().run_until_complete(test()) 
# and here is your syncronious version
test(async=False)

Кроме того: у него, вероятно, есть смысл создать специальный класс-оболочку, чтобы не передавать async каждому вызову метода. Пример ниже.

class SyncCallerWrapper(object):
    def __init__(self, obj, is_async=True):
        self._obj = obj 
        self._is_async = is_async


    def __getattr__(self, name):
        def sync_wrapper(obj_attr):
            def f(*args, **kwargs):
                return asyncio.get_event_loop().run_until_complete(obj_attr(*args, **kwargs))

            return f

        obj_attr = getattr(self._obj, name)

        if not self._is_async and asyncio.iscoroutinefunction(obj_attr):
            return sync_wrapper(obj_attr)           

        return obj_attr


class C(object):
    async def sleep1(self):
        print('In sleep1...')
        await asyncio.sleep(1)
        print('After sleep1')


    async def sleep2(self):
        print('In sleep2...')
        await asyncio.sleep(1)
        print('After sleep2')       


# you don't want any concurrency in your code
c_sync = SyncCallerWrapper(C(), is_async=False)
c_sync.sleep1()
c_sync.sleep2()

# here you want concurrency: class methods are coroutines
c_async = SyncCallerWrapper(C(), is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())

Чтобы быть более элегантным, вы можете заменить свой класс функцией (глобальным конструктором). Затем пользователь может создать класс C, передающий параметр is_async и имеющий желаемое поведение: методы будут действовать как обычные (is_async=False) или как async функции (is_async=True).

def C(*args, **kwargs):
    KEY_ISASYNC = 'is_async'
    if KEY_ISASYNC in kwargs:
        is_async = kwargs.get(KEY_ISASYNC)
        del kwargs[KEY_ISASYNC]
    else:
        is_async = False
    return SyncCallerWrapper(_C(*args, **kwargs), is_async=is_async)

# you don't want any concurrency in your code
c_sync = C(is_async=False)
c_sync.sleep1()
c_sync.sleep2()

# here you want concurrency: class methods are coroutines
c_async = C(is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())