#python #многопоточность #websocket #tornado #celery
По совету @andreymal из моего предыдущего вопроса: Если разными частями приложения являются разные процессы, то для этого надо организовывать межпроцессное взаимодействие, чтобы один процесс работал с соединениями, а другие процессы отправляли этому процессу сообщения. Можно для этого написать свой велосипед на сокетах, можно для этого использовать готовые решения вроде RabbitMQ и Redis. Пытаюсь реализовать межпроцессное взаимодействие при помощи celery. Исходные данные и какую задачу нужно решить: К серверу по вебсокетам должны подключаться клиенты (люди, использующие браузер) и ожидать сообщений от сервера. На сервере время от времени срабатывает процесс, генерирующий это самое сообщение. Это сообщение (которое нужно отдать клиенту) можно попросить у процесса передать в указанный скрипт. Что для этого сделано: Обработчик сокет-соединений на Python tornado; .py скрипт, которому на stdin подается сообщение от процесса, который генерирует само сообщение; функция-таск для celery, которая должна отправить сообщение в браузер. То есть, процесс генерирует сообщение -> вызывает скрипт, передавая ему сообщение на stdin -> скрипт вызывает celery таск -> таск берет массив соединений и рассылает им сообщение. Суть проблемы: Сокет сервер складывает все соединения в массив, но этот массив оказывается пустым для celery таска. Собственно, вопрос: как завести один массив на несколько процессов, или где я налажал, и как это делать правильно? UPD добавил упрощенный код для понимания того, как и что есть. Сокет-сервер на tornado: class SocketHandler(tornado.websocket.WebSocketHandler): connections = [] def open(self): self.__class__.connections.append(self) @classmethod def get_connections(cls): return cls.connections и celery таск, который должен отправлять сообщения клиентам: from handlers import SocketHandler @celery.task def send_msg(msg): for conn in SocketHandler.get_connections(): conn.write_message(msg) Но список полученный путем SocketHandler.get_connections() в файле с celery тасками оказывается пустым. Запускаю все это в разных терминалах так: (venv)$ python app.py # это торнадо апп, импортирующий приведенный handlers (venv)$ celery -A tasks worker
Ответы
Ответ 1
Из описания проблемы и документации Celery я так понимаю, что таски выполняются в отдельных воркерах, которые может запускать сам Celery, но суть в том, что таск должен выполняться в одном процессе с сокет-сервером, тогда массив с сокетами ему будет виден. Пробежавшсь по диагонали по документации Celery, я подобного не нашёл; если это на нём и правда невозможно, то, видимо, стоит взять что-то попроще. Вообще я представляю это как-то так (несколько кривенько, но, думаю, суть должна быть понятна): def queue_thread(): while thread_should_work(): msg = redis.blpop("websocket_queue", timeout=5) if not msg: continue data = pickle.loads(msg[1]) send_message_to_all_sockets(data) Threading.thread(target=queue_thread).start()
Комментариев нет:
Отправить комментарий