Страницы

Поиск по вопросам

воскресенье, 8 декабря 2019 г.

Одна переменная на несколько процессов в Python

#python #многопоточность #websocket #tornado #celery


По совету @andreymal из моего предыдущего вопроса:  


  Если разными частями приложения являются разные процессы, то для этого надо организовывать
межпроцессное взаимодействие, чтобы один процесс работал с соединениями, а другие процессы
отправляли этому процессу сообщения. Можно для этого написать свой велосипед на сокетах,
можно для этого использовать готовые решения вроде RabbitMQ и Redis.  


Пытаюсь реализовать межпроцессное взаимодействие при помощи celery.  

Исходные данные и какую задачу нужно решить:
К серверу по вебсокетам должны подключаться клиенты (люди, использующие браузер)
и ожидать сообщений от сервера.
На сервере время от времени срабатывает процесс, генерирующий это самое сообщение.
Это сообщение (которое нужно отдать клиенту) можно попросить у процесса передать в
указанный скрипт.  

Что для этого сделано:  


Обработчик сокет-соединений на Python tornado;
.py скрипт, которому на stdin подается сообщение от процесса, который генерирует
само сообщение;
функция-таск для celery, которая должна отправить сообщение в браузер.


То есть, процесс генерирует сообщение -> вызывает скрипт, передавая ему сообщение
на stdin -> скрипт вызывает celery таск -> таск берет массив соединений и рассылает
им сообщение.

Суть проблемы:
Сокет сервер складывает все соединения в массив, но этот массив оказывается пустым
для celery таска.   

Собственно, вопрос: как завести один массив на несколько процессов, или где я налажал,
и как это делать правильно? 

UPD добавил упрощенный код для понимания того, как и что есть.
Сокет-сервер на tornado: 

class SocketHandler(tornado.websocket.WebSocketHandler):
    connections = []

    def open(self):
        self.__class__.connections.append(self)

    @classmethod
    def get_connections(cls):
        return cls.connections


и celery таск, который должен отправлять сообщения клиентам: 

from handlers import SocketHandler

@celery.task
def send_msg(msg):
    for conn in SocketHandler.get_connections():
        conn.write_message(msg)


Но список полученный путем SocketHandler.get_connections() в файле с celery тасками
оказывается пустым.
Запускаю все это в разных терминалах так:   

(venv)$ python app.py  # это торнадо апп, импортирующий приведенный handlers
(venv)$ celery -A tasks worker

    


Ответы

Ответ 1



Из описания проблемы и документации Celery я так понимаю, что таски выполняются в отдельных воркерах, которые может запускать сам Celery, но суть в том, что таск должен выполняться в одном процессе с сокет-сервером, тогда массив с сокетами ему будет виден. Пробежавшсь по диагонали по документации Celery, я подобного не нашёл; если это на нём и правда невозможно, то, видимо, стоит взять что-то попроще. Вообще я представляю это как-то так (несколько кривенько, но, думаю, суть должна быть понятна): def queue_thread(): while thread_should_work(): msg = redis.blpop("websocket_queue", timeout=5) if not msg: continue data = pickle.loads(msg[1]) send_message_to_all_sockets(data) Threading.thread(target=queue_thread).start()

Комментариев нет:

Отправить комментарий