Предисловие
Работаю с технологией longpoll-инга. То есть суть работы заключается в том, что я делаю запросы к определенному серверу, и, в случае наличия обновлений, сервер возвращает мне их список, если нет - через определенное время мне возвращается пустой ответ. Если сказать немного конкретнее - я работаю с VK API и личными сообщениями, поэтому время обработки каждого обновления из списка крайне критично.
Проблема
Когда я получаю большой список обновлений, они, естественно, обрабатываются каждое по порядку. Их может быть до 256 за запрос, а запросов я могу делать до 3 в секунду как минимум, итого - 750 обновлений каждую секунду вполне себе возможный вариант. Допустим, время обработки одного обновления (которое оповещает о входящем сообщении) занимает десятую часть секунды. Потом вспоминаем об общем их количестве и понимаем, что, в таком случае, до следующего сообщения очередь дойдет очень нескоро. Нас это, конечно же, не устраивает.
Как я пытался эту проблему решить
Естественно, подумал я, логичнее всего будет отдавать обработку всего этого в новый поток, дабы каждое новое сообщение обрабатывалось мгновенно после получения, а значит задержек быть не должно.
Изначально я без задней мысли написал везде new Thread(() -> handle(...)).start(); и думал, что проблем теперь не будет. Лишь потом я задумался, что в таком случае на каждое обновление будет создаваться новый анонимный поток, и таких потоков уже может быть по 750 штук каждую секунду.
Изначально (первые несколько минут) все работало отлично, затем задержки стали расти и расти, в итоге дойдя до десятков минут. Вполне логично было предположить, что где-то происходят косяки, засоряется память, сеть и всё прочее, видимо, потоки сами не очень-то и хотели закрываться сразу после выполнения обработки события. Я подключил профайлер, ничего особого не увидел, как ни странно - по нагрузке на память, процессор и прочее, мои классы и объекты были где-то далеко в низу, а в топе были char[] и String, с непонятными кракозябрами в содержании. А запущенных потоков было всего не более 150, и созданных объектов не более 3 миллионов.
Дабы исправить создание огромного количества "беспризорных" потоков, я организовал действие примерно так: я делаю запрос к longpoll-серверу, получаю кучу обновлений, отдаю их обработчику. Обработчик - класс, наследующийся от потока, который просто на вечном цикле берёт обновления из очереди и обрабатывает их. По сути, тут всего один поток, работающий параллельно с главным, и я просто убрал задержки между запросами для получения обновлений, однако они всё также обрабатываются друг за другом. Как я это организовал (исходный код) можно увидеть здесь - в классе LongPoll само взаимодействие с сервером, в классе UpdatesHandler непосредственно обработка.
Вопрос
Как лучше в данном случае организовать работу или хотя бы отследить, из-за чего случаются проблемы? В логе всё хорошо, веду полное логгирование через log4j.
Основная проблема заключается в том, что спустя несколько минут, обработка сообщений начинает занимать неприлично большое количество времени, хотя изначально при запуске всё работает как часы. Даже нагрузка не так влияет на это, дело тут в чём-то другом, и в чём - я не могу пока понять.
Мне предложили попробовать executor service, который, вроде бы, создает пул потоков, сам ими управляет и переиспользует, но я не знаю, поможет ли это и вообще в этом ли проблема? И если использовать его всё-таки, то как правильнее поступить - создать 750 потоков, которые будут переиспользоваться? А не многовато ли? А если нет, то какая разница, один поток или десять будут обрабатывать сотни обновлений, пусть будет задержка не в минуту, а немного меньше, это не тот результат, который нужен.
Желаемый результат
Чтобы обработка каждого обновления проходила асинхронно, и чтобы каждое обновление не ожидало окончания обработки предыдущего, также и чтобы запрос за новым списком обновлений не ожидал окончания обработки всех предыдущих обновлений. Но необходимо, чтобы не было потрачено огромное количество ресурсов, которые ограничены, и чтобы был учтен большой объем обновлений, и чтобы работало раз и на всё время стабильно :) Проблема ещё в том, что обработка одного сообщения может занять как сотую часть секунды, так и секунд 5 в некоторых случаях, поэтому сделать всего потоков 10 не будет смысла.
Ответ
Как я указал в самом вопросе, мне посоветовали попробовать ExecutorService, и он действительно решил все проблемы.
Логика решения была примерно такой: простая итерация по списку полученных событий времени не занимает, а вот обработка некоторых (сообщений) занимает, и порой немало.
Значит, нужно пробегаться по массиву событий, и, если текущее событие оповещает о сообщении, отдавать его обработчику в новом потоке.
Можно было бы использовать FixedThreadPool, но тогда мне было не очень понятно, сколько потоков было бы необходимо создать и сколько их вообще нужно.
Как подсказала данная статья, есть ещё один хороший вариант — использовать CachedThreadPool. Он создаёт новые потоки, только если текущие заняты, и затем сам их подчищает. Таким образом, большое количество потоков будет создано только в случае большого количества сообщений, которые не успевали бы обрабатываться, но по окончании их обработки, все новые потоки будут "убиты".
Конкретная реализация:
Используем: ExecutorService service = Executors.newCachedThreadPool();
Отдаём наше "задание", которое требует обработки в новом потоке:
service.submit(() -> {
// код, который будет выполнен в новом потоке
});
Всё, вот так просто всё оказалось. Проверил на практике, среднее количество запущенных потоков стало равно 7-8, а в максимуме доходило до 30, а это вполне приемлемо.
Комментариев нет:
Отправить комментарий