Страницы

Поиск по вопросам

четверг, 5 декабря 2019 г.

Блокирующий элемент очереди

#java #многопоточность #concurrency


Есть таблица sites
Есть таблица URL - urls

url обрабатываются неким образом: с определенной периодичностью ставятся задачи с
желаемым временем выполнения (desired_time) для каждого url и с помощью ScheduledThreadPool
выполняются.
Поток (назовем его A), в бесконечном цикле проверяет активные url (с флагом url.active
== 1 в БД) и если нужно - ставит новую задачу, которая будет обработана другим потоком
(назовем такие потоки B); или закрывает открытую задачу, так как ее не успел забрать
ни один поток B и желаемое время desired_time ее выполнения истекло.

Поставленные задачи ждут своего выполнения (когда их заберет один из выполняющих
потоков B) в BlockingQueue.

Возможна такая ситуация, когда B обработал задачу в тот момент, когда desired_time
задачи вот-вот бы истекло. Пока поток B будет предпринимать действия по подготовке
к передаче результата на сохранение в БД (или некую пост обработку) - desired_time
окончательно истечет. Но задача уже выполнена и не может считаться просроченной. Однако,
поток A в этот момент может как раз проверять задачу в БД и увидеть, что ее desired_time
истек. Но так как результат еще не записан в БД - статус фактически выполненной задачи
еще не изменен на DONE и A может закрыть задачу со статусом ADDLED (протух), после
чего B завершит пост обработку и результат задачи будет сохранен в БД. В базе появится
выполненная задача с корректным результатом, но со статусом ADDLED.

Есть поток С, который выполняет роль некоего колбека для B. Если при обработке задачи
в B возникла ошибка, не позволяющая продолжить обработку задачи, B передает задачу
C. что бы она была закрыта со статусом ERROR. И того, мы имеем три не синхронизированных
потока.

Понятно, что надо блокировать задачу для других потоков, когда:


B получил результат и начал пост обработку до момента сохранения результата в БД
С сохраняет задачу со статусом ERROR


Мониторов должно быть ровно столько, сколько URL обрабатывается. Т.е. один монитор
на все задачи одного URL.

Как можно реализовать такие блокировки?
    


Ответы

Ответ 1



Я обычно использую кэш, в котором держу объекты для синхронизации. Ehcache idLockingCache = ..... ; private Lock idLockingCacheLock = new ReentrantLock(); public Object getLock(String key) { Element element = idLockingCache.get(key); if (element == null) { idLockingCacheLock.lock(); try { element = idLockingCache.get(key); if (element == null) { element = new Element(key, new Object()); idLockingCache.put(element); } } finally { idLockingCacheLock.unlock(); } } return element.getObjectValue(); } Использование //key в вашем случае это либо id url-а из БД, либо сам URL Object lock = getLock(key); synchronized (lock) { //work } И соответственно синхронизируете во всех местах. P.S. Для распределенных блокировок использую Hazelcast IMap.lock(key)/unlock(key)

Ответ 2



Я не понимаю, зачем здесь вообще нужны какие то блокировки. Нужно завести дополнительную структуру данных java.util.concurrent.ConcurrentHashMap. Ключем будем выступать url, а значением статус задачи, которая обрабатывает данный url. Теперь в момент, когда поток A забирает задачу, он выставляет ей статус PROCESSED и больше ее не трогает, передавая потоку B. Поток B занимается обработкой, после того, как задача завершилась, он просто сохраняет полученный результат в базу данных и удаляет задачу из map. Если произошла ошибка, аналогичным образом поступает поток C. Если хотите жестко регламентировать задачи по времени, т.е. не сохранять результат, если задача не выполнилась в уставленное время, есть следующий вариант. Вместо статуса хранить ссылку на поток Worker, занимающийся обработкой. Теперь когда поток B попытается сохранить результат он должен сделать проверку и если поток не прерван то сохраняем результат. Код примерно такой будет: private static class Worker extends Thread { private final AtomicBoolean isAddled = new AtomicBoolean(false); private void saveToDatabase(Result result) { if (!isAddled.get()) { //логика сохранения } workers.remove(result.url); } } а в потоке A делать следующую проверку: @AllArgsConstructor private static class Task { private final int desired_time ; private final String url ; } private static boolean isReadyForProcessing(Task task) { Worker updatedWorker = workers.computeIfPresent(task.url, (url, worker) -> { worker.isAddled.set(System.currentTimeMillis() > task.desired_time); return worker; }); return updatedWorker == null; } Получилось немного сумбурно и запутанно, но надеюсь суть ясна.

Комментариев нет:

Отправить комментарий