#python #python_3x #async_await #asyncio
В питоне хочу сделать не блокирующую работу с 2мя сокетами. Для упрощения примера представим Proxy. Пытаюсь на asincio реализовать. Клиент_1 устанавливает соединение с сервером - получаю reader и writer. Сервер устанавливает новое соединение с внутренним процессом через файловый сокет (в данном проекте, но может понадобится и другие типы соединения). Процесс может создавать запросы клиенту, а клиент к процессу. Очень давно на втором питоне делал несколько тредов которые общались через multiprocessing.Queue. Эти схемы надо бы исключить потому как передача большого количества данных займет много оперативной памяти. С клиента читаю 4 байта заголовка, и по размеру тело сообщения. Тело обрабатывается и отправляется к процессу. С процесса читаю по мере поступления отдельные, их также надо перепаковать. Запускаю читалки в циклах. Та что с клиента отрабатывает одно сообщение, а вторая продолжает дергать процесс. Такое чувство что второй блокирует первый и вставляет awaitы в LIFO, не давая первому читать сокет клиента.
Ответы
Ответ 1
Давайте попробуем реализовать асинхронность по классике =) Асинхронность в отличии от синхронности заключается в том, что для каждого соединения мы должны создать контекст: некую структуру данных, которая будет хранить состояние соединения, информацию о том, что сейчас происходит на этом соединении. Действия же с соединением (чтение, запись и т.д.) происходят только тогда, когда соединение к этому готово. Поэтому, ни одно действие не вызывает ожидания. Ожидания в таком дизайне становятся недопустимы, так как все обработчики будут ждать одного. Когда-то давно мы в универе на сетевом программировании постигли дзен и обрели знание о методе select. Метод select предназначен для опроса блокирующих потоковых классов: стандартный ввод (stdin), запуск приложения с ожидание результата (popen), получение данных с сокета (socket). В Python она тоже есть и выглядит следующим образом: readst,writest,errorst = select(rlist, wlist, xlist[ , timeout]) Вместо того, чтобы запускать отдельный поток для каждого прокси-соединения, программа "крутится" в цикле событий и ждет активности на каждом из соединений, перемещая данные всякий раз, когда соединение будет готово. Этот подход особенно подходит для программ, которые делают большое количество операций ввода-вывода. #!/usr/bin/python3 import select import socket import sys import os import fcntl import logging USAGE = "usage: python selectproxy.py proxyhost:proxyport desthost:destport" class ProxyConnection(object): # определим максимальный размер буфера MAX_BUFFER_SIZE = 1024 # ProxyConnection - класс, который пересылает данные между клиентским и целевым сокетом def __init__(self,proxyserver,listensock,servaddr): self.proxyserver = proxyserver self.servaddr = servaddr # адрес сервера # открываем сокеты self.clisock, self.cliaddr = listensock.accept() # ждем подключения клиента self.clisock.setblocking(0) self.servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # сокет сервера self.servsock.setblocking(0) # буферы для данных, полученных из сокета self.buffers = { self.clisock:bytes(), self.servsock:bytes() } self.connected = False # подключен ли серверный сокет? # регистрируем сокеты и разрешать операции чтения self.proxyserver.registerSocket(self.clisock,self) self.proxyserver.registerSocket(self.servsock,self) self.proxyserver.activateRead(self.clisock) self.proxyserver.activateRead(self.servsock) # метод, возвращающий сокет на «другом конце» соединения def other(self,socket): if socket == self.clisock: return self.servsock else: return self.clisock # подключаемся к серверу def connect(self): # мы должны использовать метод connect_ex, потому что соединение является асинхронным и не будет выполнено немедленно self.servsock.connect_ex(self.servaddr) # метод, читающий данные из сокета def readfrom(self,s): if s == self.servsock and not self.connected: self.proxyserver.connection_count += 1 logging.getLogger("selectproxy") \ .info("соединение с установлено %s, количество текущих соединений %d"%(str(self.cliaddr),self.proxyserver.connection_count)) self.connected = True return # читаем из сокета capacity = ProxyConnection.MAX_BUFFER_SIZE - len(self.buffers[s]) try: data = s.recv(capacity) except Exception as ex: data = b"" # если чтение не удалось, закрываем сокет (это происходит, когда клиент или сервер закрывает соединение) if len(data) == 0: self.close() return self.buffers[s] += data self.proxyserver.activateWrite(self.other(s)) # перестаем читать, если буфер полон capacity -= len(data) if capacity <= 0: self.proxyserver.deactivateRead(s) # метод, который пишет в сокет def writeto(self,s): # получаем буфер, содержащий данные для чтения buf = self.buffers[self.other(s)] try: # пишем его в сокет written = s.send(buf) except Exception as ex: self.close() return # удаляем записанные данные из буфера buf = buf[written:] self.buffers[self.other(s)] = buf if len(buf) == 0: self.proxyserver.deactivateWrite(s) # разрешаем чтение, если все записали if written: self.proxyserver.activateRead(self.other(s)) # метод, закрывающий соединение def close(self): for sock in [self.clisock,self.servsock]: if sock: self.proxyserver.deactivateRead(sock) self.proxyserver.deactivateWrite(sock) sock.close() self.proxyserver.unregisterSocket(sock,self) self.proxyserver.connection_count -= 1 logging.getLogger("selectproxy") \ .info("соединение с %s разорвано, количество текущих соединений %d"%(self.cliaddr,self.proxyserver.connection_count)) class ProxyServer(object): def __init__(self,host,port,serverhost,serverport): self.address = (host,port) self.server = (serverhost,serverport) self.listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listensock.bind(self.address) self.listensock.listen(5) self.connections = {} # словарь сокет <-> ProxyConnection self.readsockets = [] self.writesockets = [] self.allsockets = [self.listensock] self.connection_count = 0 # количество активных соединений def run(self): loop = 0 while True: # Блокировать до тех пор, пока в одном из сокетов не будет активности, тайм-аут каждые 60 секунд по умолчанию r, w, e = select.select( [self.listensock]+self.readsockets, self.writesockets, self.allsockets, 60) loop += 1 # обрабатываем любые чтения for s in r: if s is self.listensock: self.open() else: if s in self.connections: self.connections[s].readfrom(s) # обрабатываем запись for s in w: if s in self.connections: self.connections[s].writeto(s) # обрабатываем ошибки for s in e: if s in self.connections: self.connections[s].close() self.sock.close() self.sock = None def activateRead(self,sock): if not sock in self.readsockets: self.readsockets.append(sock) def deactivateRead(self,sock): if sock in self.readsockets: self.readsockets.remove(sock) def activateWrite(self,sock): if not sock in self.writesockets: self.writesockets.append(sock) def deactivateWrite(self,sock): if sock in self.writesockets: self.writesockets.remove(sock) def registerSocket(self,sock,conn): self.connections[sock] = conn self.allsockets.append(sock) def unregisterSocket(self,sock,conn): del self.connections[sock] self.allsockets.remove(sock) def open(self): conn = ProxyConnection(self,self.listensock,self.server) conn.connect() if __name__ == '__main__': try: proxy = sys.argv[1].split(":") dest = sys.argv[2].split(":") proxyhost = proxy[0] proxyport = int(proxy[1]) serverhost = dest[0] serverport = int(dest[1]) except: print(USAGE) sys.exit(-1) logger = logging.getLogger('selectproxy') logger.setLevel(logging.INFO) hdlr = logging.StreamHandler() hdlr.setLevel(logging.INFO) hdlr.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(hdlr) server = ProxyServer(proxyhost,proxyport,serverhost,serverport) server.run()
Комментариев нет:
Отправить комментарий