Подтвердить что ты не робот

Python asyncio, как создавать и отменять задачи из другого потока

У меня есть многопоточное приложение python. Я хочу запустить цикл asyncio в потоке, а затем отправить кавычки и сопрограммы к нему из другого потока. Должно быть легко, но я не могу опуститься вокруг asyncio.

Я подошел к следующему решению, которое делает половину того, что я хочу, не стесняйтесь комментировать что-либо:

import asyncio
from threading import Thread

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop) #why do I need that??
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        f = functools.partial(self.loop.create_task, coro)
        return self.loop.call_soon_threadsafe(f)

    def cancel_task(self, xx):
        #no idea

@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?

b.stop()

Таким образом, запуск и остановка цикла работает нормально. Я думал о создании задачи с помощью create_task, но этот метод не является потоковым, поэтому я завернул его в call_soon_threadsafe. Но я хотел бы иметь возможность получить объект задачи, чтобы иметь возможность отменить задачу. Я мог бы сделать сложный материал, используя Future and Condition, но должен быть более простой способ, isnt'it?

4b9b3361

Ответ 1

Я думаю, вам может понадобиться сделать ваш метод add_task осведомленным о том, вызывается ли его из потока, отличного от цикла события. Таким образом, если он вызывается из одного потока, вы можете просто вызвать asyncio.async напрямую, в противном случае он может выполнить некоторую дополнительную работу, чтобы передать задачу из потока цикла в вызывающий поток. Вот пример:

import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
    def __init__(self, start_event):
        Thread.__init__(self)
        self.loop = None
        self.tid = None
        self.event = start_event

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.tid = current_thread()
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        def _async_add(func, fut):
            try:
                ret = func()
                fut.set_result(ret)
            except Exception as e:
                fut.set_exception(e)

        f = functools.partial(asyncio.async, coro, loop=self.loop)
        if current_thread() == self.tid:
            return f() # We can call directly if we're not going between threads.
        else:
            # We're in a non-event loop thread so we use a Future
            # to get the task from the event loop thread once
            # it ready.
            fut = Future()
            self.loop.call_soon_threadsafe(_async_add, f, fut)
            return fut.result()

    def cancel_task(self, task):
        self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()

Сначала мы сохраняем идентификатор потока цикла событий в методе run, поэтому мы можем выяснить, поступают ли вызовы на add_task из других потоков позже. Если add_task вызывается из потока цикла без события, мы используем call_soon_threadsafe для вызова функции, которая будет как планировать сопрограмму, так и затем использовать concurrent.futures.Future, чтобы передать задачу обратно вызывающему потоку, который ждет результат Future.

Заметка об отмене задачи: вы, когда вы вызываете cancel в Task, a CancelledError будут подняты в сопрограмме при следующем запуске цикла события. Это означает, что сопрограмма, которая завершает задачу Task, будет прервана из-за исключения в следующий раз, когда она нажмет предел текучести - если сопрограмма не улавливает CancelledError и предотвращает ее прерывание. Также обратите внимание, что это работает только в том случае, если выполняемая функция фактически является прерывистой сопрограммой; a asyncio.Future, возвращаемый BaseEventLoop.run_in_executor, например, не может быть действительно отменен, потому что он фактически обернут вокруг concurrent.futures.Future, и они не могут быть отменены, как только их базовая функция начинает выполняться. В этом случае asyncio.Future скажет, что его отменено, но функция, выполняемая в исполнителе, будет продолжать работать.

Изменить: Обновлен первый пример использования concurrent.futures.Future вместо queue.Queue, по предложению Андрея Светлова.

Примечание: asyncio.async устарел с версии 3.4.4, используя asyncio.ensure_future.

Ответ 2

Вы все делаете правильно. Для остановки задачи make make

class B(Thread):
    # ...
    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

Кстати, вам нужно настроить цикл событий для созданного потока явно

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

потому что asyncio создает неявный цикл событий только для основного потока.

Ответ 3

только для справки, здесь это код, который я, наконец, реализовал на основе той помощи, которую я получил на этом сайте, проще, поскольку мне не нужны все функции. Еще раз спасибо!

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def _add_task(self, future, coro):
        task = self.loop.create_task(coro)
        future.set_result(task)

    def add_task(self, coro):
        future = Future()
        p = functools.partial(self._add_task, future, coro)
        self.loop.call_soon_threadsafe(p)
        return future.result() #block until result is available

    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

Ответ 4

Так как версия 3.4.4 asyncio предоставляет функцию под названием run_coroutine_threadsafe, чтобы отправить сопроводительный объект из потока в цикл событий. Он возвращает concurrent.futures.Future, чтобы получить доступ к результату или отменить задачу.

Используя ваш пример:

@asyncio.coroutine
def test(loop):
    try:
        while True:
            print("Running")
            yield from asyncio.sleep(1, loop=loop)
    except asyncio.CancelledError:
        print("Cancelled")
        loop.stop()
        raise

loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)

thread.start()
time.sleep(5)
future.cancel()
thread.join()