Страницы

Поиск по вопросам

пятница, 1 марта 2019 г.

Потокобезопасная проверка состояния

Задача: Есть клиенты, которые подключаются к серверу и периодически выполняют какие-то операции. Если за некоторое время клиент не провел ни одной операции, то в базе его статус меняется на оффлайн. При возобновлении операция статус в БД меняется на онлайн. Клиенты живут в разных потоках. Нагрузка:
Клиентов порядка 100-200 средняя частота операций порядка секунды-двух время бездействия: 5 минут Алгоритм:
Есть Map у которого ключ - это ID клиента, а значение - время истечения ожидания. Клиенты сдвигают это время при каждом обращении к серверу. Есть таймер, который срабатывает раз в минуту, пробегает по списку клиентов, находит тех, у которых истекло время ожидания, удаляет их из списка и меняет статус в базе. Реализация:
public class ActiveClientManager implements Consumer { private final Map clients; private final PoolDataSource pds; private final long timeout; private final Object dbMonitor;
public ActiveClientManager(PoolDataSource pds, long timeout) { clients = new ConcurrentHashMap<>(); this.pds = pds; this.timeout = timeout; dbMonitor = new Object(); }
// Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего if (clients.put(clientID, offlineTime) == null) { // Если была произведена вставка, то меняем статус в базе try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); // Перекрываем кислород таймеру synchronized (dbMonitor) { stmt.executeUpdate(); } } } } catch (SQLException e) { e.printStackTrace(); } } }
// Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились List removeClients = new ArrayList<>(); clients.replaceAll((key, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента removeClients.add(key); // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); // если есть кого удалять if (removeClients.size() > 0) { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента synchronized (dbMonitor) { for (Integer client : removeClients) { // если клиент не появился опять в общем списке if (!clients.containsKey(client)) { // то сбрасываем ему статус stmt.setInt(1, client); stmt.executeUpdate(); } } } } } } } }
Собственно вопрос - не упустил ли я, что-либо в синхронизации?
Update
Выяснилось, что конструкция
clients.replaceAll((key, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента removeClients.add(key); // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; });
не работает. Если из лямбды вернуть null, то метод бросает NullPointerException. Переписал так
clients.entrySet().removeIf((entry) -> { if (entry.getValue() <= currentTime) { removeClients.add(entry.getKey()); return true; } return false; });


Ответ

Синхронизируясь по одному объекту dbMonitor вы ограничиваете паралелльность вашего решения. То есть, клиенты будут "ждать" базу данных, даже если они с разным clientId. Если использовать встроенную синхронизацию в ConcurrentHashMap, то получится большая "паралленость" ActiveClientManager'а.
Но конечно же лучше проверить производительность тестами. Попробуйте создать менеджер, и вызвать методы accept, testTimeout из разных потоков.
Не претендую на абсолютную истинность решения, указал лишь на то, что бросилось в глаза.
Пример:
public class ActiveClientManager implements Consumer { private final Map clients = new ConcurrentHashMap<>(); private final PoolDataSource pds; private final long timeout;
public ActiveClientManager(PoolDataSource pds, long timeout) { this.pds = pds; this.timeout = timeout; }
// Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего // синхронизируемся только если один и тот же clientID clients.compute(clientID, (oldVal, newVal) -> { if (oldVal == null) { try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); stmt.executeUpdate(); } } } catch (SQLException e) { e.printStackTrace(); } } return offlineTime; }); }
// Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились // синхронизируемся на конерктном client clients.replaceAll((client, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента stmt.setInt(1, client); stmt.executeUpdate(); } } catch (SQLException e) { e.printStackTrace(); } // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); } }
Update По второй реализации бросается к глаза, что Вы не определились, что делать если база не работает(или возвращает ошибки). В первом случае - accept вы собираете список исключений, и пытаетесь обновлять "хоть каких-то" клиентов, даже если база начинает "сбоить". А в втором случае - testTimeout просто логируете сообщение в консоль. На вашем месте, я бы в обоих случаях бросал RuntimeException, потому что продолжать работать при не работающей базе кажется не логично. Но это зависит от того как Вы работаете с исключениями выше по коду.
Насчет двух циклов, логично не создавать коннект к базе, если он не нужен. Но два идентичных условия проверки - лишь усложняют код. И в будущем если Вам понадобится изменить условие придется менять в двух местах. Я бы оставил создание коннекта внутри.
Так же кажется, что код по работе с базой логично вынести в два отдельных метода или даже объединить в один.
Есть ощущение, что интерфейс Consumer здесь лишний. Напрашивается интерфейс с двумя методами get(), put(). Так же из кода не ясно кто будет вызывать методtestTimeout. Кажется, что этот метод должен вызывать сам менеджер, через какой-то промежуток времени, ведь это его зона отвественности - поддержка в актуальном состоянии статусов клиентов. Тогда следует завести например Executors.newScheduledThreadPool(1), и в нем поток который будет вызывать приватный метод testTimeout.
Обновленная версия
public class ActiveClientManager implements Consumer { private final Map clients = new ConcurrentHashMap<>();; private final PoolDataSource pds; private final long timeoutMSec; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException { this.pds = pds; this.timeoutMSec = timeoutMSec; executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES); }
private void testTimeout() { long currentTime = System.currentTimeMillis();
clients.entrySet().removeIf((entry) -> { // если время уже прошло if (entry.getValue() < currentTime) { // обновляем БД updateState(entry.getKey(), State.notactive); // удаляем этот элемент return true; } return false; }); }
private void updateState(Integer clientId, State state) { try (Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState() + " WHERE id = ?");) { stmt.setInt(1, clientId); stmt.executeUpdate(); } catch (SQLException e) { throw new RuntimeException(e); } }
@Override public void accept(Integer clientID) { long offlineTime = System.currentTimeMillis() + timeoutMSec; clients.compute(clientID, (key, oldval) -> { // Если запись новая - обновляем БД if (oldval == null) { updateState(clientID, State.active); } return offlineTime; }); }
public enum State { active("1"), notactive("NULL");
private final String sqlState;
State(String sqlState) { this.sqlState = sqlState; }
public String getSqlState() { return sqlState; } } }

Комментариев нет:

Отправить комментарий