Страницы

Поиск по вопросам

среда, 11 декабря 2019 г.

Правильная организация многопоточной работы Java

#java #многопоточность #long_poll


Предисловие

Работаю с технологией longpoll-инга. То есть суть работы заключается в том, что я
делаю запросы к определенному серверу, и, в случае наличия обновлений, сервер возвращает
мне их список, если нет - через определенное время мне возвращается пустой ответ. Если
сказать немного конкретнее - я работаю с VK API и личными сообщениями, поэтому время
обработки каждого обновления из списка крайне критично.

Проблема

Когда я получаю большой список обновлений, они, естественно, обрабатываются каждое
по порядку. Их может быть до 256 за запрос, а запросов я могу делать до 3 в секунду
как минимум, итого - 750 обновлений каждую секунду вполне себе возможный вариант. Допустим,
время обработки одного обновления (которое оповещает о входящем сообщении) занимает
десятую часть секунды. Потом вспоминаем об общем их количестве и понимаем, что, в таком
случае, до следующего сообщения очередь дойдет очень нескоро. Нас это, конечно же,
не устраивает.

Как я пытался эту проблему решить

Естественно, подумал я, логичнее всего будет отдавать обработку всего этого в новый
поток, дабы каждое новое сообщение обрабатывалось мгновенно после получения, а значит
задержек быть не должно. 
Изначально я без задней мысли написал везде new Thread(() -> handle(...)).start();
и думал, что проблем теперь не будет. Лишь потом я задумался, что в таком случае на
каждое обновление будет создаваться новый анонимный поток, и таких потоков уже может
быть по 750 штук каждую секунду. 

Изначально (первые несколько минут) все работало отлично, затем задержки стали расти
и расти, в итоге дойдя до десятков минут. Вполне логично было предположить, что где-то
происходят косяки, засоряется память, сеть и всё прочее, видимо, потоки сами не очень-то
и хотели закрываться сразу после выполнения обработки события. Я подключил профайлер,
ничего особого не увидел, как ни странно - по нагрузке на память, процессор и прочее,
мои классы и объекты были где-то далеко в низу, а в топе были char[] и String, с непонятными
кракозябрами в содержании.  А запущенных потоков было всего не более 150, и созданных
объектов не более 3 миллионов.

Дабы исправить создание огромного количества "беспризорных" потоков, я организовал
действие примерно так: я делаю запрос к longpoll-серверу, получаю кучу обновлений,
отдаю их обработчику. Обработчик - класс, наследующийся от потока, который просто на
вечном цикле берёт обновления из очереди и обрабатывает их. По сути, тут всего один
поток, работающий параллельно с главным, и я просто убрал задержки между запросами
для получения обновлений, однако они всё также обрабатываются друг за другом. Как я
это организовал (исходный код) можно увидеть здесь - в классе LongPoll само взаимодействие
с сервером, в классе UpdatesHandler непосредственно обработка.

Вопрос

Как лучше в данном случае организовать работу или хотя бы отследить, из-за чего случаются
проблемы? В логе всё хорошо, веду полное логгирование через log4j. 

Основная проблема заключается в том, что спустя несколько минут, обработка сообщений
начинает занимать неприлично большое количество времени, хотя изначально при запуске
всё работает как часы. Даже нагрузка не так влияет на это, дело тут в чём-то другом,
и в чём - я не могу пока понять.

Мне предложили попробовать executor service, который, вроде бы, создает пул потоков,
сам ими управляет и переиспользует, но я не знаю, поможет ли это и вообще в этом ли
проблема? И если использовать его всё-таки, то как правильнее поступить - создать 750
потоков, которые будут переиспользоваться? А не многовато ли? А если нет, то какая
разница, один поток или десять будут обрабатывать сотни обновлений, пусть будет задержка
не в минуту, а немного меньше, это не тот результат, который нужен.

Желаемый результат

Чтобы обработка каждого обновления проходила асинхронно, и чтобы каждое обновление
не ожидало окончания обработки предыдущего, также и чтобы запрос за новым списком обновлений
не ожидал окончания обработки всех предыдущих обновлений. Но необходимо, чтобы не было
потрачено огромное количество ресурсов, которые ограничены, и чтобы был учтен большой
объем обновлений, и чтобы работало раз и на всё время стабильно :) Проблема ещё в том,
что обработка одного сообщения может занять как сотую часть секунды, так и секунд 5
в некоторых случаях, поэтому сделать всего потоков 10 не будет смысла. 
    


Ответы

Ответ 1



Как я указал в самом вопросе, мне посоветовали попробовать ExecutorService, и он действительно решил все проблемы. Логика решения была примерно такой: простая итерация по списку полученных событий времени не занимает, а вот обработка некоторых (сообщений) занимает, и порой немало. Значит, нужно пробегаться по массиву событий, и, если текущее событие оповещает о сообщении, отдавать его обработчику в новом потоке. Можно было бы использовать FixedThreadPool, но тогда мне было не очень понятно, сколько потоков было бы необходимо создать и сколько их вообще нужно. Как подсказала данная статья, есть ещё один хороший вариант — использовать CachedThreadPool. Он создаёт новые потоки, только если текущие заняты, и затем сам их подчищает. Таким образом, большое количество потоков будет создано только в случае большого количества сообщений, которые не успевали бы обрабатываться, но по окончании их обработки, все новые потоки будут "убиты". Конкретная реализация: Используем: ExecutorService service = Executors.newCachedThreadPool(); Отдаём наше "задание", которое требует обработки в новом потоке: service.submit(() -> { // код, который будет выполнен в новом потоке }); Всё, вот так просто всё оказалось. Проверил на практике, среднее количество запущенных потоков стало равно 7-8, а в максимуме доходило до 30, а это вполне приемлемо.

Комментариев нет:

Отправить комментарий