Я пишу инструмент, который подключается к X-ряду UNIX-сокетов, отправляет команду и сохраняет результат в локальной файловой системе. Он запускается через каждые X секунд. Чтобы выполнить некоторую очистку, когда инструмент принимает сигналы окончания, я регистрирую функцию (выключение) на сигнал. Сигналы SIGHUP и signal.SIGTERM. Эта функция отменяет все задачи, а затем закрывает цикл событий.
Моя проблема в том, что я получаю
RuntimeError: цикл событий остановлен до завершения в будущем
когда я посылаю сигнал .SIGTERM(kill 'pid'). Я прочитал документацию об отмене задач дважды, но я не заметил, что я делаю неправильно здесь.
Я также заметил что-то странное, когда я посылаю сигнал завершения, программа находится в спящем режиме, и я вижу в журнале, что он просыпает сопрограмму pull_stats(), вы можете увидеть это в первых двух строках журнала.
Log:
21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats ] INFO pull statistics
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,859 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,859 [23031] [MainThread:shutdown ] INFO received stop signal, cancelling tasks...
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,860 [23031] [MainThread:shutdown ] INFO True
21:53:45,860 [23031] [MainThread:shutdown ] INFO stopping event loop
21:53:45,860 [23031] [MainThread:shutdown ] INFO bye, exiting...
Traceback (most recent call last):
File "./pull.py", line 249, in <module>
main()
File "./pull.py", line 245, in main
supervisor(loop, config)
File "./pull.py", line 161, in supervisor
config['pull']['socket-dir'], storage_dir, loop))
File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.
Вот код:
def shutdown(loop):
LOGGER.info('received stop signal, cancelling tasks...')
for task in asyncio.Task.all_tasks():
LOGGER.info(task.cancel())
LOGGER.info('stopping event loop')
loop.stop()
LOGGER.info('bye, exiting...')
def write_file(filename, data):
try:
with open(filename, 'w') as file_handle:
file_handle.write(data.decode())
except OSError as exc:
return False
else:
return True
@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop):
connect = asyncio.open_unix_connection(socket_file)
reader, writer = yield from asyncio.wait_for(connect, 1)
writer.write('{c}\n'.format(c=cmd).encode())
data = yield from reader.read()
writer.close()
filename = os.path.basename(socket_file) + '_' + cmd.split()[1]
filename = os.path.join(storage_dir, filename)
result = yield from loop.run_in_executor(None, write_file, filename, data)
return result
@asyncio.coroutine
def pull_stats(socket_dir, storage_dir, loop):
socket_files = glob.glob(socket_dir + '/*sock*')
coroutines = [get(socket_file, cmd, storage_dir, loop)
for socket_file in socket_files
for cmd in CMDS]
status = yield from asyncio.gather(*coroutines)
if len(set(status)) == 1 and True in set(status):
return True
else:
return False
def supervisor(loop, config):
dst_dir = config.get('pull', 'dst-dir')
tmp_dst_dir = config.get('pull', 'tmp-dst-dir')
while True:
start_time = int(time.time())
storage_dir = os.path.join(tmp_dst_dir, str(start_time))
try:
os.makedirs(storage_dir)
except OSError as exc:
msg = "failed to create directory {d}:{e}".format(d=storage_dir,
e=exc)
LOGGER.critical(msg)
# Launch all connections.
result = loop.run_until_complete(pull_stats(
config['pull']['socket-dir'], storage_dir, loop))
if result:
try:
shutil.move(storage_dir, dst_dir)
except OSError as exc:
LOGGER.critical("failed to move %s to %s: %s", storage_dir,
dst_dir, exc)
break
else:
LOGGER.info('statistics are saved in %s', os.path.join(
dst_dir, os.path.basename(storage_dir)))
else:
LOGGER.critical('failed to pull stats')
shutil.rmtree(storage_dir)
sleep = config.getint('pull', 'pull-interval') - (time.time() -
start_time)
if 0 < sleep < config.getint('pull', 'pull-interval'):
time.sleep(sleep)
loop.close()
sys.exit(1)
def main():
args = docopt(__doc__, version=VERSION)
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read_dict(copy.copy(DEFAULT_OPTIONS))
config.read(args['--file'])
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)
LOGGER.setLevel(num_level)
supervisor(loop, config)
# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
main()