Подтвердить что ты не робот

Как я могу обернуть синхронную функцию в async coroutine?

Я использую aiohttp для создания сервера API, который отправляет запросы TCP на отдельный сервер. Модуль, который отправляет запросы TCP, является синхронным и является черным ящиком для моих целей. Поэтому моя проблема заключается в том, что эти запросы блокируют весь API. Мне нужен способ обернуть запросы модуля в асинхронной сопрограмме, которая не будет блокировать остальную часть API.

Итак, просто используя sleep в качестве простого примера, есть ли способ каким-то образом обернуть временный синхронный код в неблокирующей сопрограмме, что-то вроде этого:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'
4b9b3361

Ответ 1

В конце концов я нашел ответ в этой теме. Метод, который я искал, это run_in_executor. Это позволяет асинхронно запускать синхронную функцию без блокировки цикла событий.

В примере sleep я разместил выше, это может выглядеть так:

import asyncio
from time import sleep
from concurrent.futures import ProcessPoolExecutor

async def sleep_async(loop, delay):
    # Can set executor to None if a default has been set for loop
    await loop.run_in_executor(ProcessPoolExecutor(), sleep, delay)
    return 'I slept asynchronously'

Также см. Следующий ответ → Как мы вызываем нормальную функцию, где ожидается сопрограмма?

Ответ 2

Вы можете использовать декоратор для переноса версии синхронизации в асинхронную версию.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

или используйте aioify

% pip install aioify

затем

@aioify
def sleep_async(delay):
    pass

Ответ 3

Не уверен, что слишком поздно, но вы также можете использовать декоратор для выполнения своей функции в потоке. ХОТИТЕ, обратите внимание, что это все еще будет не-кооперативная блокировка в отличие от асинхронной, которая является кооперативной блокировкой.

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run