Страницы

Поиск по вопросам

суббота, 9 марта 2019 г.

asyncio: корректная остановка не своих сопрограмм

Я опять с той же проблемой. В предыдущем вопросе все таски были под моим контролем, и я мог их вручную закрывать. Но вот беда: мне тут потребовались вебсокеты. В примерах в документации к одноимённому модулю никакого корректного закрытия не производится:
start_server = websockets.serve(hello, 'localhost', 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
Я поигрался с примерами, и да, при Ctrl+C ругается, если есть подключенный клиент:
$ python websocket_server.py ^C KeyboardInterrupt Exception ignored in: Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at asyncio/tasks.py:399]> Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at asyncio/tasks.py:399]> Task was destroyed but it is pending! task: wait_for=>
Так как корректного способа завершения в документации не представлено и быстрое тыкание наличия всяких server.close() ни к какому продуктивному результату не привело, снова появляется вопрос о корректном завершении приложения, если таски мной не контролируются.
Это просто эта библиотека кривая? Или это характерно для любых asyncio-библиотек и я чего-то не понимаю в самой сути asyncio и кто-нибудь ткнёт меня а какую-нибудь матчасть по этому поводу?
Как всё-таки корректно завершать всё это дело в общем случае?

С общим случаем принцип понятен, но websockets закрываться никак не хочет. Минимальный пример для повторения (на основе ответа @jfs):
#!/usr/bin/env python3 # -*- coding: utf-8 -*-
import asyncio import websockets
@asyncio.coroutine def echo(ws, path): print('client started') while True: data = yield from ws.recv() if data is None: break yield from ws.send(data) print('client finished')
loop = asyncio.get_event_loop() start_server = websockets.serve(echo, '127.0.0.1', 8888, loop=loop) server = loop.run_until_complete(start_server)
print('Listen') try: server = loop.run_forever() except KeyboardInterrupt: pass
server.close() loop.run_until_complete(server.wait_closed()) loop.close() print('Finished')
Достаточно просто подключить к нему любой клиент (хоть из браузера — ws = new WebSocket('ws://127.0.0.1:8888/');) и нажать Ctrl+C — отпечатается приведённая ранее ошибка (правда, уже без KeyboardInterrupt).


Ответ

asyncio реализует кооперативную многозадачность. Это значит, что websockets библиотека должна кооперировать (предоставлять возможность чисто завершить соединения). Был открыт баг на websockets Review cancellation management
Даже для обычных (preemptive) потоков не существует общего решения, которое бы корректно остановило бы потоки вне зависимости от кода, который они исполняют (Java выучила этот урок тяжёлым путём). Безопасным для остановки ресурсом является процесс операционной системы, если нас не волнует потеря данных из-за неочищенного файлового буфера, уведомления другой стороны о прекращении сетевого соединения, завершение транзакции базы данных итд. Если волнует, то процесс также обязан кооперировать, если мы хотим его завершить преждевременно.
Чтобы обработать Ctrl+C в Питоне, как обычно нужно поймать KeyboardInterrupt или определить свой обработчик для SIGINT сигнала. Пример echo сервера в asyncio документации показывает как это можно сделать (KeyboardInterrupt + wait_closed())
import asyncio
@asyncio.coroutine def handle_echo(reader, writer): data = yield from reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print("Received %r from %r" % (message, addr))
print("Send: %r" % message) writer.write(data) yield from writer.drain()
print("Close the client socket") writer.close()
loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) server = loop.run_until_complete(coro)
# Serve requests until CTRL+c is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass
# Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
В gevent (кооперативная многозадачность) также необходимо KeyboardInterrtupt ловить.
Документация websockets.server.serve() явно упоминает:
serve() yields a Server which provides a close() method and a wait_closed() coroutine to stop serving requests.
то есть для websocket сервера можно точно такой же код использовать для корректной остановки:
try: loop.run_forever() except KeyboardInterrupt: pass
# Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()

Комментариев нет:

Отправить комментарий