Подтвердить что ты не робот

Многопоточность для Python Django

Некоторые функции должны запускаться асинхронно на веб-сервере. Отправка электронных писем или пост-обработка данных являются типичными случаями использования.

Каков наилучший (или самый питонический) способ написать функцию декоратора для асинхронного запуска функции?

Моя установка обычная: Python, Django, Gunicorn или Waitress, стандарт AWS EC2 Linux

Например, вот начало:

from threading import Thread

def postpone(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

желаемое использование:

@postpone
def foo():
    pass #do stuff
4b9b3361

Ответ 1

Я продолжил использовать эту реализацию в масштабе и в производстве без проблем.

Определение декоратора:

def start_new_thread(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

Пример использования:

@start_new_thread
def foo():
  #do stuff

Со временем стек обновился и перешел без сбоев.

Первоначально Python 2.4.7, Django 1.4, Gunicorn 0.17.2, теперь Python 3.6, Django 2.1, Waitress 1.1.

Если вы используете какие-либо транзакции базы данных, Django создаст новое соединение, и его необходимо закрыть вручную:

from django.db import connection

@postpone
def foo():
  #do stuff
  connection.close()

Ответ 2

Celery является асинхронной очередью очереди задач/заданий. Это хорошо документировано и идеально подходит для вас. Я предлагаю вам начать здесь

Ответ 3

Наиболее распространенным способом обработки асинхронной обработки в Django является использование Celery и django-celery.

Ответ 4

Подход TomCounsell работает хорошо, если не слишком много входящих рабочих мест. Если много длительных заданий выполняются за короткий промежуток времени, что порождает много потоков, основной процесс пострадает. В этом случае вы можете использовать пул потоков с сопрограммой,

# in my_utils.py

from concurrent.futures import ThreadPoolExecutor

MAX_THREADS = 10


def run_thread_pool():
    """
    Note that this is not a normal function, but a coroutine.
    All jobs are enqueued first before executed and there can be
    no more than 10 threads that run at any time point.
    """
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        while True:
            func, args, kwargs = yield
            executor.submit(func, *args, **kwargs)


pool_wrapper = run_thread_pool()

# Advance the coroutine to the first yield (priming)
next(pool_wrapper)
from my_utils import pool_wrapper

def job(*args, **kwargs):
    # do something

def handle(request):
    # make args and kwargs
    pool_wrapper.send((job, args, kwargs))
    # return a response

Ответ 5

GET/api/mx_catalog/v1/ismyagenda/? Content_id = 1 HTTP 404 не найден Тип содержимого: application/json Варьировать: Принять Разрешить: ОПЦИИ, GET

{"status": false} GET/api/mx_catalog/v1/ismyagenda/? content_id = 1 HTTP 404 Не найдено Тип содержимого: приложение /json Зависит: Принять, разрешить: OPTIONS, GET

{"status": false}