Страницы

Поиск по вопросам

пятница, 12 октября 2018 г.

Одна переменная на несколько процессов в Python

По совету @andreymal из моего предыдущего вопроса:
Если разными частями приложения являются разные процессы, то для этого надо организовывать межпроцессное взаимодействие, чтобы один процесс работал с соединениями, а другие процессы отправляли этому процессу сообщения. Можно для этого написать свой велосипед на сокетах, можно для этого использовать готовые решения вроде RabbitMQ и Redis.
Пытаюсь реализовать межпроцессное взаимодействие при помощи celery.
Исходные данные и какую задачу нужно решить: К серверу по вебсокетам должны подключаться клиенты (люди, использующие браузер) и ожидать сообщений от сервера. На сервере время от времени срабатывает процесс, генерирующий это самое сообщение. Это сообщение (которое нужно отдать клиенту) можно попросить у процесса передать в указанный скрипт.
Что для этого сделано:
Обработчик сокет-соединений на Python tornado; .py скрипт, которому на stdin подается сообщение от процесса, который генерирует само сообщение; функция-таск для celery, которая должна отправить сообщение в браузер.
То есть, процесс генерирует сообщение -> вызывает скрипт, передавая ему сообщение на stdin -> скрипт вызывает celery таск -> таск берет массив соединений и рассылает им сообщение.
Суть проблемы: Сокет сервер складывает все соединения в массив, но этот массив оказывается пустым для celery таска.
Собственно, вопрос: как завести один массив на несколько процессов, или где я налажал, и как это делать правильно?
UPD добавил упрощенный код для понимания того, как и что есть. Сокет-сервер на tornado:
class SocketHandler(tornado.websocket.WebSocketHandler): connections = []
def open(self): self.__class__.connections.append(self)
@classmethod def get_connections(cls): return cls.connections
и celery таск, который должен отправлять сообщения клиентам:
from handlers import SocketHandler
@celery.task def send_msg(msg): for conn in SocketHandler.get_connections(): conn.write_message(msg)
Но список полученный путем SocketHandler.get_connections() в файле с celery тасками оказывается пустым. Запускаю все это в разных терминалах так:
(venv)$ python app.py # это торнадо апп, импортирующий приведенный handlers (venv)$ celery -A tasks worker


Ответ

Из описания проблемы и документации Celery я так понимаю, что таски выполняются в отдельных воркерах, которые может запускать сам Celery, но суть в том, что таск должен выполняться в одном процессе с сокет-сервером, тогда массив с сокетами ему будет виден. Пробежавшсь по диагонали по документации Celery, я подобного не нашёл; если это на нём и правда невозможно, то, видимо, стоит взять что-то попроще.
Вообще я представляю это как-то так (несколько кривенько, но, думаю, суть должна быть понятна):
def queue_thread(): while thread_should_work(): msg = redis.blpop("websocket_queue", timeout=5) if not msg: continue data = pickle.loads(msg[1]) send_message_to_all_sockets(data)
Threading.thread(target=queue_thread).start()

Комментариев нет:

Отправить комментарий