Я занят написанием маленького игрового сервера, чтобы опробовать колбу. Игра предоставляет API через REST для пользователей. Пользователям легко выполнять действия и данные запроса, однако я бы хотел обслуживать "игровой мир" вне цикла app.run() для обновления игровых сущностей и т.д. Учитывая, что Flask настолько чисто реализован, я бы хотел чтобы увидеть, есть ли способ Flask сделать это.
Как добавить фоновой поток в колбу?
Ответ 1
Ваши дополнительные потоки должны быть инициированы из того же приложения, которое вызывается сервером WSGI.
В приведенном ниже примере создается фоновый поток, который выполняется каждые 5 секунд и управляет структурами данных, которые также доступны для маршрутизируемых функций Flask.
import threading
import atexit
from flask import Flask
POOL_TIME = 5 #Seconds
# variables that are accessible from anywhere
commonDataStruct = {}
# lock to control access to variable
dataLock = threading.Lock()
# thread handler
yourThread = threading.Thread()
def create_app():
app = Flask(__name__)
def interrupt():
global yourThread
yourThread.cancel()
def doStuff():
global commonDataStruct
global yourThread
with dataLock:
# Do your stuff with commonDataStruct Here
# Set the next thread to happen
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()
def doStuffStart():
# Do initialisation stuff here
global yourThread
# Create your thread
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()
# Initiate
doStuffStart()
# When you kill Flask (SIGTERM), clear the trigger for the next thread
atexit.register(interrupt)
return app
app = create_app()
Назовите его из Gunicorn с чем-то вроде этого:
gunicorn -b 0.0.0.0:5000 --log-config log.conf --pid=app.pid myfile:app
Ответ 2
Похоже, хакерский способ сделать это, но я не думаю, что это технически поддерживается.
Я также нашел этот ответ, в котором говорится об использовании для этого колба-сельдерея.
Ответ 3
В дополнение к использованию чистой нити или очереди сельдерея (обратите внимание, что колбочка-сельдерей больше не требуется), вы также можете взглянуть на колбу-аппроцессор:
https://github.com/viniciuschiele/flask-apscheduler
Простой пример, скопированный из https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.py:
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'jobs:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()