Подтвердить что ты не робот

Убедитесь, что только один рабочий запускает событие apscheduler в веб-приложении пирамиды, в котором работают несколько сотрудников

У нас есть веб-приложение, сделанное с пирамидой и поданное через gunicorn + nginx. Он работает с 8 рабочими потоками/процессами

Нам нужно было работать, мы выбрали apscheduler. вот как мы его запускаем

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Проблема заключается в том, что все рабочие процессы стрельбы выбирают планировщика. Мы попытались реализовать блокировку файлов, но это не похоже на достаточно хорошее решение. Каким будет лучший способ убедиться, что в любой момент времени только один из рабочих процессов выбирает запланированное событие вверх, и ни один другой поток не поднимает его до следующего JOB_INTERVAL?

Решение должно работать даже с mod_wsgi, если мы решили перейти на apache2 + modwsgi позже. Он должен работать с сервером разработки одного процесса, который является официанткой.

Обновление от спонсора bounty

Я столкнулся с той же проблемой, описанной OP, только с приложением Django. Я в основном уверен, что добавление этой детали не сильно изменится, если исходный вопрос. По этой причине и для получения большей видимости я также отметил этот вопрос с помощью django.

4b9b3361

Ответ 1

Поскольку Gunicorn начинается с 8 рабочих (в вашем примере), это forks приложение 8 раз в 8 процессов. Эти 8 процессов разворачиваются из процесса Мастер, который контролирует каждый их статус и имеет возможность добавлять/удалять работников.

Каждый процесс получает копию вашего объекта APScheduler, который изначально является точной копией ваших APScheduler ваших мастер-процессов. Это приводит к тому, что каждый "n-й" рабочий (процесс) выполняет каждое задание в общей сложности "n" раз.

Взлом вокруг этого заключается в том, чтобы запустить стрельбу из-за следующих вариантов:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

Флаг --preload сообщает Gunicorn "загружать приложение перед тем, как развернуть рабочие процессы". Таким образом, каждому работнику предоставляется "экземпляр приложения, уже созданный мастером, а не экземпляр самого приложения". Это означает, что следующий код выполняется только один раз в Мастер-процессе:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Кроме того, нам нужно установить jobstore как нечто, отличное от : memory:. Этот способ, хотя каждый рабочий - это собственный независимый процесс, неспособный общаться с другой 7, используя локальную базу данных (а не память), мы гарантируем одноточечную истину для операций CRUD на сервере заданий.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Наконец, мы хотим использовать BackgroundScheduler из-за его реализации start(). Когда мы вызываем start() в BackgroundScheduler, в фоновом режиме создается новый поток, который отвечает за планирование/выполнение заданий. Это важно, потому что помните на шаге (1), из-за нашего флага --preload мы выполняем только функцию start() один раз, в процессе Master Gunicorn. По определению forked процессы не наследуют потоки их родителя,, поэтому каждый рабочий не запускает поток BackgroundScheduler.

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

В результате всего этого каждый рабочий Gunicorn имеет APScheduler, который был обманут в состояние "НАЧАЛО", но на самом деле не работает, потому что он отбрасывает потоки его родителя! Каждый экземпляр также может обновлять базу данных storestore, просто не выполняя никаких заданий!

Зайдите в flask-APScheduler, чтобы быстро запустить APScheduler на веб-сервере (например, Gunicorn) и включить операции CRUD для каждого работа.

Ответ 2

Я нашел исправление, которое работало с проектом Django, имеющим очень схожую проблему. Я просто привязываю TCP-сокет при первом запуске планировщика и затем проверяет его. Я думаю, что следующий код может работать и для вас, и с небольшими трюками.

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"

Ответ 3

в вашем коде для Django выше это работает, если Django запускается в нескольких процессах, таких как Apache mod_wsgi с включенным многопроцессорным режимом? Проверка связанного сокета предотвратит запуск планировщика несколько раз, но как процесс, запускающий Django без планировщика, может взаимодействовать с процессом Django, который запускает планировщик?

(извиняюсь за публикацию этого в качестве ответа - у меня недостаточно репутации, чтобы оставлять комментарии к ответу Паоло)