Страницы

Поиск по вопросам

суббота, 11 января 2020 г.

Потокобезопасная проверка состояния

#java #многопоточность #инспекция_кода



Задача: Есть клиенты, которые подключаются к серверу и периодически выполняют какие-то
операции. Если за некоторое время клиент не провел ни одной операции, то в базе его
статус меняется на оффлайн. При возобновлении операция статус в БД меняется на онлайн.
Клиенты живут в разных потоках.
Нагрузка: 


Клиентов порядка 100-200
средняя частота операций порядка секунды-двух
время бездействия: 5 минут

Алгоритм:


Есть Map у которого ключ - это ID клиента, а значение - время истечения ожидания.
Клиенты сдвигают это время при каждом обращении к серверу.
Есть таймер, который срабатывает раз в минуту, пробегает по списку клиентов, находит
тех, у которых истекло время ожидания, удаляет их из списка и меняет статус в базе.

Реализация:

public class ActiveClientManager implements Consumer {
  private final Map clients;
  private final PoolDataSource pds;
  private final long timeout;
  private final Object dbMonitor;

  public ActiveClientManager(PoolDataSource pds, long timeout) {
    clients = new ConcurrentHashMap<>();
    this.pds = pds;
    this.timeout = timeout;
    dbMonitor = new Object();
  }

  // Действие клиента
  @Override
  public void accept(Integer clientID) {
    // Время после которого, клиент уходит в оффлайн
    long offlineTime = System.currentTimeMillis() + timeout;
    // Вставляем нового клиента, или обновляем существующего
    if (clients.put(clientID, offlineTime) == null) {
        // Если была произведена вставка, то меняем статус в базе
        try {
            try (Connection con = pds.getConnection()) {
                try (PreparedStatement stmt = con.prepareStatement("UPDATE clients
SET state = 1 WHERE id = ?")) {
                    stmt.setInt(1, clientID);
                    // Перекрываем кислород таймеру
                    synchronized (dbMonitor) {
                        stmt.executeUpdate();
                    }
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
  }

  // Обработка события таймера
  public void testTimeout() throws SQLException {
    long currentTime = System.currentTimeMillis();
    // Сюда занесем всех клиентов, которые уже отвалились
    List removeClients = new ArrayList<>();
    clients.replaceAll((key, val) -> {
        if (val <= currentTime) {
            // если время уже прошло, то сохраняем нашего клиента
            removeClients.add(key);
            // и удаляем его из общего списка
            return null;
        }
        // иначе ничего не трогаем
        return val;
    });
    // если есть кого удалять
    if (removeClients.size() > 0) {
        try (Connection con = pds.getConnection()) {
            try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET
state = NULL WHERE id = ?")) {
                // блокируем добавление нового клиента
                synchronized (dbMonitor) {
                    for (Integer client : removeClients) {
                        // если клиент не появился опять в общем списке
                        if (!clients.containsKey(client)) {
                            // то сбрасываем ему статус
                            stmt.setInt(1, client);
                            stmt.executeUpdate();
                        }
                    }
                }
            }
        }
    }
  }
}



Собственно вопрос - не упустил ли я, что-либо в синхронизации?

Update

Выяснилось, что конструкция

clients.replaceAll((key, val) -> {
    if (val <= currentTime) {
        // если время уже прошло, то сохраняем нашего клиента
        removeClients.add(key);
        // и удаляем его из общего списка
        return null;
    }
    // иначе ничего не трогаем
    return val;
});


не работает. Если из лямбды вернуть null, то метод бросает NullPointerException.
Переписал так

clients.entrySet().removeIf((entry) -> {
    if (entry.getValue() <= currentTime) {
        removeClients.add(entry.getKey());
        return true;
    }
    return false;
});

    


Ответы

Ответ 1



Синхронизируясь по одному объекту dbMonitor вы ограничиваете паралелльность вашего решения. То есть, клиенты будут "ждать" базу данных, даже если они с разным clientId. Если использовать встроенную синхронизацию в ConcurrentHashMap, то получится большая "паралленость" ActiveClientManager'а. Но конечно же лучше проверить производительность тестами. Попробуйте создать менеджер, и вызвать методы accept, testTimeout из разных потоков. Не претендую на абсолютную истинность решения, указал лишь на то, что бросилось в глаза. Пример: public class ActiveClientManager implements Consumer { private final Map clients = new ConcurrentHashMap<>(); private final PoolDataSource pds; private final long timeout; public ActiveClientManager(PoolDataSource pds, long timeout) { this.pds = pds; this.timeout = timeout; } // Действие клиента @Override public void accept(Integer clientID) { // Время после которого, клиент уходит в оффлайн long offlineTime = System.currentTimeMillis() + timeout; // Вставляем нового клиента, или обновляем существующего // синхронизируемся только если один и тот же clientID clients.compute(clientID, (oldVal, newVal) -> { if (oldVal == null) { try { try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) { stmt.setInt(1, clientID); stmt.executeUpdate(); } } } catch (SQLException e) { e.printStackTrace(); } } return offlineTime; }); } // Обработка события таймера public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); // Сюда занесем всех клиентов, которые уже отвалились // синхронизируемся на конерктном client clients.replaceAll((client, val) -> { if (val <= currentTime) { // если время уже прошло, то сохраняем нашего клиента try (Connection con = pds.getConnection()) { try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) { // блокируем добавление нового клиента stmt.setInt(1, client); stmt.executeUpdate(); } } catch (SQLException e) { e.printStackTrace(); } // и удаляем его из общего списка return null; } // иначе ничего не трогаем return val; }); } } Update По второй реализации бросается к глаза, что Вы не определились, что делать если база не работает(или возвращает ошибки). В первом случае - accept вы собираете список исключений, и пытаетесь обновлять "хоть каких-то" клиентов, даже если база начинает "сбоить". А в втором случае - testTimeout просто логируете сообщение в консоль. На вашем месте, я бы в обоих случаях бросал RuntimeException, потому что продолжать работать при не работающей базе кажется не логично. Но это зависит от того как Вы работаете с исключениями выше по коду. Насчет двух циклов, логично не создавать коннект к базе, если он не нужен. Но два идентичных условия проверки - лишь усложняют код. И в будущем если Вам понадобится изменить условие придется менять в двух местах. Я бы оставил создание коннекта внутри. Так же кажется, что код по работе с базой логично вынести в два отдельных метода или даже объединить в один. Есть ощущение, что интерфейс Consumer здесь лишний. Напрашивается интерфейс с двумя методами get(), put(). Так же из кода не ясно кто будет вызывать методtestTimeout. Кажется, что этот метод должен вызывать сам менеджер, через какой-то промежуток времени, ведь это его зона отвественности - поддержка в актуальном состоянии статусов клиентов. Тогда следует завести например Executors.newScheduledThreadPool(1), и в нем поток который будет вызывать приватный метод testTimeout. Обновленная версия public class ActiveClientManager implements Consumer { private final Map clients = new ConcurrentHashMap<>();; private final PoolDataSource pds; private final long timeoutMSec; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException { this.pds = pds; this.timeoutMSec = timeoutMSec; executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES); } private void testTimeout() { long currentTime = System.currentTimeMillis(); clients.entrySet().removeIf((entry) -> { // если время уже прошло if (entry.getValue() < currentTime) { // обновляем БД updateState(entry.getKey(), State.notactive); // удаляем этот элемент return true; } return false; }); } private void updateState(Integer clientId, State state) { try (Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState() + " WHERE id = ?");) { stmt.setInt(1, clientId); stmt.executeUpdate(); } catch (SQLException e) { throw new RuntimeException(e); } } @Override public void accept(Integer clientID) { long offlineTime = System.currentTimeMillis() + timeoutMSec; clients.compute(clientID, (key, oldval) -> { // Если запись новая - обновляем БД if (oldval == null) { updateState(clientID, State.active); } return offlineTime; }); } public enum State { active("1"), notactive("NULL"); private final String sqlState; State(String sqlState) { this.sqlState = sqlState; } public String getSqlState() { return sqlState; } } }

Ответ 2



С учетом замечания @volyx получился такой класс public class ActiveClientManager implements Consumer { private final Map clients; private final PoolDataSource pds; private final long timeoutMSec; public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException { clients = new ConcurrentHashMap<>(); this.pds = pds; this.timeoutMSec = timeoutMSec; } private boolean isOffline(long currentTime, long limitTime) { return limitTime <= currentTime; } // Ищем, есть ли хоть один клиент с просроченным таймаутом private boolean searchTimeout(long currentTime) { for (Long val : clients.values()) { if (isOffline(currentTime, val)) return true; }; return false; } protected void updateState(Integer clientID, PreparedStatement stmt) throws SQLException { stmt.setInt(1, clientID); stmt.executeUpdate(); } public void testTimeout() throws SQLException { long currentTime = System.currentTimeMillis(); if (!searchTimeout(currentTime)) return; // Есть ли хоть один клиент с просроченным таймаутом. Создаем соединение List> exceptions = new ArrayList<>(); try ( Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?"); ) { clients.entrySet().removeIf((entry) -> { // если время уже прошло if (isOffline(currentTime, entry.getValue())) { // обновляем БД try { updateState(entry.getKey(), stmt); } catch (SQLException e) { exceptions.add(new Pair<>(entry.getKey(), e)); } // удаляем этот элемент return true; } return false; }); } // если в лямбде были исключения, обрабатываем if (exceptions.size() > 0) throw exceptions.get(0).getVal(); } @Override public void accept(Integer clientID) { long offlineTime = System.currentTimeMillis() + timeoutMSec; clients.compute(clientID, (key, oldval) -> { // Если запись новая - обновляем БД if (oldval == null) { try ( Connection con = pds.getConnection(); PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?"); ) { updateState(entry.getKey(), stmt); } catch (SQLException e) { throw new RuntimeException(e); } } return offlineTime; }); } }

Комментариев нет:

Отправить комментарий