#java #многопоточность #инспекция_кода
Задача: Есть клиенты, которые подключаются к серверу и периодически выполняют какие-то операции. Если за некоторое время клиент не провел ни одной операции, то в базе его статус меняется на оффлайн. При возобновлении операция статус в БД меняется на онлайн. Клиенты живут в разных потоках. Нагрузка: Клиентов порядка 100-200 средняя частота операций порядка секунды-двух время бездействия: 5 минут Алгоритм: Есть Map у которого ключ - это ID клиента, а значение - время истечения ожидания. Клиенты сдвигают это время при каждом обращении к серверу. Есть таймер, который срабатывает раз в минуту, пробегает по списку клиентов, находит тех, у которых истекло время ожидания, удаляет их из списка и меняет статус в базе. Реализация: public class ActiveClientManager implements Consumer{ private final Map clients; private final PoolDataSource pds; private final long timeout; private final Object dbMonitor; public ActiveClientManager(PoolDataSource pds, long timeout) { clients = new ConcurrentHashMap<>(); this.pds = pds; this.timeout = timeout; dbMonitor = new Object(); } // Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего if (clients.put(clientID, offlineTime) == null) { // Если была произведена вставка, то меняем статус в базе try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); // Перекрываем кислород таймеру synchronized (dbMonitor) { stmt.executeUpdate(); } } } } catch (SQLException e) { e.printStackTrace(); } } } // Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились List removeClients = new ArrayList<>(); clients.replaceAll((key, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента removeClients.add(key); // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); // если есть кого удалять if (removeClients.size() > 0) { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента synchronized (dbMonitor) { for (Integer client : removeClients) { // если клиент не появился опять в общем списке if (!clients.containsKey(client)) { // то сбрасываем ему статус stmt.setInt(1, client); stmt.executeUpdate(); } } } } } } } } Собственно вопрос - не упустил ли я, что-либо в синхронизации? Update Выяснилось, что конструкция clients.replaceAll((key, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента removeClients.add(key); // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); не работает. Если из лямбды вернуть null, то метод бросает NullPointerException. Переписал так clients.entrySet().removeIf((entry) -> { if (entry.getValue() <= currentTime) { removeClients.add(entry.getKey()); return true; } return false; });
Ответы
Ответ 1
Синхронизируясь по одному объекту dbMonitor вы ограничиваете паралелльность вашего решения. То есть, клиенты будут "ждать" базу данных, даже если они с разным clientId. Если использовать встроенную синхронизацию в ConcurrentHashMap, то получится большая "паралленость" ActiveClientManager'а. Но конечно же лучше проверить производительность тестами. Попробуйте создать менеджер, и вызвать методы accept, testTimeout из разных потоков. Не претендую на абсолютную истинность решения, указал лишь на то, что бросилось в глаза. Пример: public class ActiveClientManager implements Consumer{ private final Map clients = new ConcurrentHashMap<>(); private final PoolDataSource pds; private final long timeout; public ActiveClientManager(PoolDataSource pds, long timeout) { this.pds = pds; this.timeout = timeout; } // Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего // синхронизируемся только если один и тот же clientID clients.compute(clientID, (oldVal, newVal) -> { if (oldVal == null) { try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); stmt.executeUpdate(); } } } catch (SQLException e) { e.printStackTrace(); } } return offlineTime; }); } // Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились // синхронизируемся на конерктном client clients.replaceAll((client, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента stmt.setInt(1, client); stmt.executeUpdate(); } } catch (SQLException e) { e.printStackTrace(); } // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); } } Update По второй реализации бросается к глаза, что Вы не определились, что делать если база не работает(или возвращает ошибки). В первом случае - accept вы собираете список исключений, и пытаетесь обновлять "хоть каких-то" клиентов, даже если база начинает "сбоить". А в втором случае - testTimeout просто логируете сообщение в консоль. На вашем месте, я бы в обоих случаях бросал RuntimeException, потому что продолжать работать при не работающей базе кажется не логично. Но это зависит от того как Вы работаете с исключениями выше по коду. Насчет двух циклов, логично не создавать коннект к базе, если он не нужен. Но два идентичных условия проверки - лишь усложняют код. И в будущем если Вам понадобится изменить условие придется менять в двух местах. Я бы оставил создание коннекта внутри. Так же кажется, что код по работе с базой логично вынести в два отдельных метода или даже объединить в один. Есть ощущение, что интерфейс Consumer здесь лишний. Напрашивается интерфейс с двумя методами get(), put(). Так же из кода не ясно кто будет вызывать методtestTimeout. Кажется, что этот метод должен вызывать сам менеджер, через какой-то промежуток времени, ведь это его зона отвественности - поддержка в актуальном состоянии статусов клиентов. Тогда следует завести например Executors.newScheduledThreadPool(1), и в нем поток который будет вызывать приватный метод testTimeout. Обновленная версия public class ActiveClientManager implements Consumer { private final Map clients = new ConcurrentHashMap<>();; private final PoolDataSource pds; private final long timeoutMSec; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException { this.pds = pds; this.timeoutMSec = timeoutMSec; executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES); } private void testTimeout() { long currentTime = System.currentTimeMillis(); clients.entrySet().removeIf((entry) -> { // если время уже прошло if (entry.getValue() < currentTime) { // обновляем БД updateState(entry.getKey(), State.notactive); // удаляем этот элемент return true; } return false; }); } private void updateState(Integer clientId, State state) { try (Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState() + " WHERE id = ?");) { stmt.setInt(1, clientId); stmt.executeUpdate(); } catch (SQLException e) { throw new RuntimeException(e); } } @Override public void accept(Integer clientID) { long offlineTime = System.currentTimeMillis() + timeoutMSec; clients.compute(clientID, (key, oldval) -> { // Если запись новая - обновляем БД if (oldval == null) { updateState(clientID, State.active); } return offlineTime; }); } public enum State { active("1"), notactive("NULL"); private final String sqlState; State(String sqlState) { this.sqlState = sqlState; } public String getSqlState() { return sqlState; } } } Ответ 2
С учетом замечания @volyx получился такой класс public class ActiveClientManager implements Consumer{ private final Map clients; private final PoolDataSource pds; private final long timeoutMSec; public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException { clients = new ConcurrentHashMap<>(); this.pds = pds; this.timeoutMSec = timeoutMSec; } private boolean isOffline(long currentTime, long limitTime) { return limitTime <= currentTime; } // Ищем, есть ли хоть один клиент с просроченным таймаутом private boolean searchTimeout(long currentTime) { for (Long val : clients.values()) { if (isOffline(currentTime, val)) return true; }; return false; } protected void updateState(Integer clientID, PreparedStatement stmt) throws SQLException { stmt.setInt(1, clientID); stmt.executeUpdate(); } public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); if (!searchTimeout(currentTime)) return; // Есть ли хоть один клиент с просроченным таймаутом. Создаем соединение List > exceptions = new ArrayList<>(); try ( Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?"); ) { clients.entrySet().removeIf((entry) -> { // если время уже прошло if (isOffline(currentTime, entry.getValue())) { // обновляем БД try { updateState(entry.getKey(), stmt); } catch (SQLException e) { exceptions.add(new Pair<>(entry.getKey(), e)); } // удаляем этот элемент return true; } return false; }); } // если в лямбде были исключения, обрабатываем if (exceptions.size() > 0) throw exceptions.get(0).getVal(); } @Override public void accept(Integer clientID) { long offlineTime = System.currentTimeMillis() + timeoutMSec; clients.compute(clientID, (key, oldval) -> { // Если запись новая - обновляем БД if (oldval == null) { try ( Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?"); ) { updateState(entry.getKey(), stmt); } catch (SQLException e) { throw new RuntimeException(e); } } return offlineTime; }); } }
Комментариев нет:
Отправить комментарий