#java #многопоточность #concurrency
Есть таблица sites Есть таблица URL - urls url обрабатываются неким образом: с определенной периодичностью ставятся задачи с желаемым временем выполнения (desired_time) для каждого url и с помощью ScheduledThreadPool выполняются. Поток (назовем его A), в бесконечном цикле проверяет активные url (с флагом url.active == 1 в БД) и если нужно - ставит новую задачу, которая будет обработана другим потоком (назовем такие потоки B); или закрывает открытую задачу, так как ее не успел забрать ни один поток B и желаемое время desired_time ее выполнения истекло. Поставленные задачи ждут своего выполнения (когда их заберет один из выполняющих потоков B) в BlockingQueue. Возможна такая ситуация, когда B обработал задачу в тот момент, когда desired_time задачи вот-вот бы истекло. Пока поток B будет предпринимать действия по подготовке к передаче результата на сохранение в БД (или некую пост обработку) - desired_time окончательно истечет. Но задача уже выполнена и не может считаться просроченной. Однако, поток A в этот момент может как раз проверять задачу в БД и увидеть, что ее desired_time истек. Но так как результат еще не записан в БД - статус фактически выполненной задачи еще не изменен на DONE и A может закрыть задачу со статусом ADDLED (протух), после чего B завершит пост обработку и результат задачи будет сохранен в БД. В базе появится выполненная задача с корректным результатом, но со статусом ADDLED. Есть поток С, который выполняет роль некоего колбека для B. Если при обработке задачи в B возникла ошибка, не позволяющая продолжить обработку задачи, B передает задачу C. что бы она была закрыта со статусом ERROR. И того, мы имеем три не синхронизированных потока. Понятно, что надо блокировать задачу для других потоков, когда: B получил результат и начал пост обработку до момента сохранения результата в БД С сохраняет задачу со статусом ERROR Мониторов должно быть ровно столько, сколько URL обрабатывается. Т.е. один монитор на все задачи одного URL. Как можно реализовать такие блокировки?
Ответы
Ответ 1
Я обычно использую кэш, в котором держу объекты для синхронизации. Ehcache idLockingCache = ..... ; private Lock idLockingCacheLock = new ReentrantLock(); public Object getLock(String key) { Element element = idLockingCache.get(key); if (element == null) { idLockingCacheLock.lock(); try { element = idLockingCache.get(key); if (element == null) { element = new Element(key, new Object()); idLockingCache.put(element); } } finally { idLockingCacheLock.unlock(); } } return element.getObjectValue(); } Использование //key в вашем случае это либо id url-а из БД, либо сам URL Object lock = getLock(key); synchronized (lock) { //work } И соответственно синхронизируете во всех местах. P.S. Для распределенных блокировок использую Hazelcast IMap.lock(key)/unlock(key)Ответ 2
Я не понимаю, зачем здесь вообще нужны какие то блокировки. Нужно завести дополнительную структуру данных java.util.concurrent.ConcurrentHashMap. Ключем будем выступать url, а значением статус задачи, которая обрабатывает данный url. Теперь в момент, когда поток A забирает задачу, он выставляет ей статус PROCESSED и больше ее не трогает, передавая потоку B. Поток B занимается обработкой, после того, как задача завершилась, он просто сохраняет полученный результат в базу данных и удаляет задачу из map. Если произошла ошибка, аналогичным образом поступает поток C. Если хотите жестко регламентировать задачи по времени, т.е. не сохранять результат, если задача не выполнилась в уставленное время, есть следующий вариант. Вместо статуса хранить ссылку на поток Worker, занимающийся обработкой. Теперь когда поток B попытается сохранить результат он должен сделать проверку и если поток не прерван то сохраняем результат. Код примерно такой будет: private static class Worker extends Thread { private final AtomicBoolean isAddled = new AtomicBoolean(false); private void saveToDatabase(Result result) { if (!isAddled.get()) { //логика сохранения } workers.remove(result.url); } } а в потоке A делать следующую проверку: @AllArgsConstructor private static class Task { private final int desired_time ; private final String url ; } private static boolean isReadyForProcessing(Task task) { Worker updatedWorker = workers.computeIfPresent(task.url, (url, worker) -> { worker.isAddled.set(System.currentTimeMillis() > task.desired_time); return worker; }); return updatedWorker == null; } Получилось немного сумбурно и запутанно, но надеюсь суть ясна.
Комментариев нет:
Отправить комментарий