Страницы

Поиск по вопросам

четверг, 2 января 2020 г.

Python + multiprocessing + signal == “Полный бред” # ?

#python #многопоточность #python_32


Добрый день. Недавно приобрел себе замечательную штуку rPI, и, соответственно, увлекся
питоном. Сейчас пилю небольшую софтину, которая должна стягивать некоторые данные с
определенного сайта. Так как объем работы немалый, то решено было использовать модуль
multiprocessing со всеми вытекающими.
Реализовал главный метод, который запускает воркеров и помещает в общую очередь материалы,
которые они должны обработать. После обработки воркер либо просто помечает элемент
отработанным (.task_done(), либо же, в случае успеха, воркер помечает элемент отработанным,
помещает результат в еще одну очередь (от воркеров к главному), устанавливает флаг
событию и завершается. Другие воркеры, увидев этот флаг, тоже завершаются. Главный
же процесс пишет результат в лог, выводит сообщение о завершении в консоль и завершается.
Казалось бы все идеально. Ан нет.
Дело в том, что иногда объем работы может быть очень большим (десятки часов). Соответственно,
нужно предусмотреть возможность отключения по требованию пользователя. Вот тут то и
начинается самое интересное.
Сначала я решаю использовать самый обычный keyboard interrupt в сочетании с try/exept.
Но не тут то было. В результате исключение срабатывает, но процесс не только не останавливается,
но еще и выдает пару тысяч строк traceback'a. Тогда я еще не понял причины произошедшего
и решил пойти другим способом. Установить обработчик на SIGTERM и не мучиться.
Но и это не сработало. И более того, я заметил странную особенность: функция termHandler
помимо выполнения инструкций завершения выводила в консоль еще и уведомление вроде
"--TERMINATE--". Так вот в консоли этих сообщений оказывалось ровно 21.
Из этого я делаю вывод, что каждый воркер - копия процесса-родителя в момент создания.
Соответственно, они тоже получают сигналы от терминала или системы. Тут я попытался
заставить воркеров вообще не обрабатывать этот сигнал. Но и это не спасло. Теперь этот
сигнал добрался до библиотеки, с которым работают воркеры. Как понимаете, результат
плачевен: все те же 2к+ строк trceback'a и никакого корректного завершения.
Копание интернета ничего не дало. В связи с этим я и прошу помощи у вас. Подскажите,
можно ли заставить процесс-родитель не передавать сигнал от терминала/системы дочерним
процессам? Если нет, то подскажите способ передать в главный процесс команду от юзера
так, чтобы это не повлияло ни на воркеров, ни на используемые библиотеки/фреймворки.
p.s. У меня была идея вынести генератор очереди в отдельный поток управления, а главный
повесить в режиме input. Пусть ждет себе ввода команды стоп от пользователя. Но тут
мне не понятно, как сделать так, чтобы на последнюю строку (где будет поле для ввода)
не залезал вывод от воркеров. А они в реальном времени должны выводить промежуточные
результаты своей работы.
p.s.s В дополнение скажу, что все это добро предназначено исключительно для личного
использования. Притом использования внутри screen'a на raspberry pi, который будет
преспокойно лежать в куче проводов около роутера. Из-за этого варианты с использованием
иксов отпадают.
p>s.*3 И да. Python 3.2 raspbian linux.    


Ответы

Ответ 1



Вы можете просто игнорировать SIGINT в воркерах, тогда ^C в них просто не будет обрабатываться. def worker(): signal.signal(signal.SIGINT, signal.SIG_IGN) # do work def main(): process = multiprocessing.Process(target=worker) process.start() # ... try: # ... process.join() except KeyboardInterrupt: process.terminate() Можете также добавить в worker обработку SIGTERM, если вам нужен graceful exit.

Ответ 2



В общем, я не стал заморачиваться и просто написал костыль, который висит в отдельном потоке управления и чекает каждую введенную строку. Если там находится слово STOP, то он возбуждает событие, которое убивает воркеров и корректно завершает скрипт. В принципе задача выполнена, однако, если кто-то подскажет более лаконичное и в тоже время несложное решение, то я буду только благодарен. Спасибо AVP за поддержку.

Ответ 3



Отвечу своим кодом, тут правда делится треды, а не на процессы, но суть у вас будет такая же. У меня проблема была того же плана, оказалась она вот в чем: обработчики сигналов должны остаться в главном треде, а ждать результата работы нужно в отдельном. join() как оказалось блокирует сигналы в треде, в котором находится. while running: time.sleep(1) использовал чтоб избежать блокировки. except KeyboardInterrupt - не нужен, он также ловится сигналом. #coding:utf8 __author__ = 'eri' import sys import threading import Queue import simplejson import grab.port import grab.protocol import signal,time class Reader(object): def __init__(self,port): self.wq = Queue.Queue() self.rq = Queue.Queue() self.reader = grab.port.Port_reader(port['port'], self.wq, self.rq) #TODO: differ drivers Parser = getattr(grab.protocol, port['driver']) self.parser = Parser(port['counters'], self.rq, self.wq) def start(self): self.reader.start() self.parser.start() def join(self): self.parser.join() self.reader.join() def stop(self): self.reader.stop() self.parser.stop() self.join() readers = [] running = True def term(*args,**kwargs): sys.stderr.write('\nStopping...\nTerminating readers:\n') for r in readers: r.stop() def shutdown(*args,**kwargs): term(*args,**kwargs) while readers: time.sleep(1) global running running = False def restart(*args,**kwargs): term(*args,**kwargs) while readers: time.sleep(1) time.sleep(1) sys.stderr.write('\nStarting:') start() signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGQUIT, shutdown) signal.signal(signal.SIGHUP, restart) signal.signal(signal.SIGUSR1, restart) def configure(): with open(path.get_base_path('etc/ports.json'), 'r') as f: ports = simplejson.load(f, encoding='utf-8') for port in ports: r = Reader(port) readers.append(r) r.start() sys.stderr.write('\nStarted worker #{}. '.format(id(r))) def join_all(): sys.stderr.write('\nWaiting {} workers. '.format(len(readers))) while readers: r=readers[0] r.join() readers.remove(r) sys.stderr.write('\nStopped worker #{}. '.format(id(r))) def start(): configure() t = threading.Thread(target=join_all) t.start() def main(): start() while running: time.sleep(1) sys.stderr.write('\nShutdown complete.\n') if __name__ == "__main__": main() В каждой читалке есть 2 потока: ридер работает с внешним миром, а парсер работает с базой данных, сделанно так, потому что во внешнем мире данные очень быстро меняются и не хотелось бы потерять их при блокировке в базе.

Ответ 4



Поднимите вглавном процессе локальный веб-сервер, который будет хостить простенькую страничку с кнопкой "стоп". Можно еще на этой страничке что-нибудь полезное показывать, логи например.

Комментариев нет:

Отправить комментарий