Задача: Есть клиенты, которые подключаются к серверу и периодически выполняют какие-то операции. Если за некоторое время клиент не провел ни одной операции, то в базе его статус меняется на оффлайн. При возобновлении операция статус в БД меняется на онлайн. Клиенты живут в разных потоках.
Нагрузка:
Клиентов порядка 100-200
средняя частота операций порядка секунды-двух
время бездействия: 5 минут
Алгоритм:
Есть Map у которого ключ - это ID клиента, а значение - время истечения ожидания. Клиенты сдвигают это время при каждом обращении к серверу.
Есть таймер, который срабатывает раз в минуту, пробегает по списку клиентов, находит тех, у которых истекло время ожидания, удаляет их из списка и меняет статус в базе.
Реализация:
public class ActiveClientManager implements Consumer
public ActiveClientManager(PoolDataSource pds, long timeout) {
clients = new ConcurrentHashMap<>();
this.pds = pds;
this.timeout = timeout;
dbMonitor = new Object();
}
// Действие клиента
@Override
public void accept(Integer clientID) {
// Время после которого, клиент уходит в оффлайн
long offlineTime = System.currentTimeMillis() + timeout;
// Вставляем нового клиента, или обновляем существующего
if (clients.put(clientID, offlineTime) == null) {
// Если была произведена вставка, то меняем статус в базе
try {
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) {
stmt.setInt(1, clientID);
// Перекрываем кислород таймеру
synchronized (dbMonitor) {
stmt.executeUpdate();
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// Обработка события таймера
public void testTimeout() throws SQLException {
long currentTime = System.currentTimeMillis();
// Сюда занесем всех клиентов, которые уже отвалились
List
Собственно вопрос - не упустил ли я, что-либо в синхронизации?
Update
Выяснилось, что конструкция
clients.replaceAll((key, val) -> {
if (val <= currentTime) {
// если время уже прошло, то сохраняем нашего клиента
removeClients.add(key);
// и удаляем его из общего списка
return null;
}
// иначе ничего не трогаем
return val;
});
не работает. Если из лямбды вернуть null, то метод бросает NullPointerException. Переписал так
clients.entrySet().removeIf((entry) -> {
if (entry.getValue() <= currentTime) {
removeClients.add(entry.getKey());
return true;
}
return false;
});
Ответ
Синхронизируясь по одному объекту dbMonitor вы ограничиваете паралелльность вашего решения. То есть, клиенты будут "ждать" базу данных, даже если они с разным clientId. Если использовать встроенную синхронизацию в ConcurrentHashMap, то получится большая "паралленость" ActiveClientManager'а.
Но конечно же лучше проверить производительность тестами. Попробуйте создать менеджер, и вызвать методы accept, testTimeout из разных потоков.
Не претендую на абсолютную истинность решения, указал лишь на то, что бросилось в глаза.
Пример:
public class ActiveClientManager implements Consumer
public ActiveClientManager(PoolDataSource pds, long timeout) {
this.pds = pds;
this.timeout = timeout;
}
// Действие клиента
@Override
public void accept(Integer clientID) {
// Время после которого, клиент уходит в оффлайн
long offlineTime = System.currentTimeMillis() + timeout;
// Вставляем нового клиента, или обновляем существующего
// синхронизируемся только если один и тот же clientID
clients.compute(clientID, (oldVal, newVal) -> {
if (oldVal == null) {
try {
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) {
stmt.setInt(1, clientID);
stmt.executeUpdate();
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return offlineTime;
});
}
// Обработка события таймера
public void testTimeout() throws SQLException {
long currentTime = System.currentTimeMillis();
// Сюда занесем всех клиентов, которые уже отвалились
// синхронизируемся на конерктном client
clients.replaceAll((client, val) -> {
if (val <= currentTime) {
// если время уже прошло, то сохраняем нашего клиента
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) {
// блокируем добавление нового клиента
stmt.setInt(1, client);
stmt.executeUpdate();
}
} catch (SQLException e) {
e.printStackTrace();
}
// и удаляем его из общего списка
return null;
}
// иначе ничего не трогаем
return val;
});
}
}
Update
По второй реализации бросается к глаза, что Вы не определились, что делать если база не работает(или возвращает ошибки). В первом случае - accept вы собираете список исключений, и пытаетесь обновлять "хоть каких-то" клиентов, даже если база начинает "сбоить". А в втором случае - testTimeout просто логируете сообщение в консоль.
На вашем месте, я бы в обоих случаях бросал RuntimeException, потому что продолжать работать при не работающей базе кажется не логично. Но это зависит от того как Вы работаете с исключениями выше по коду.
Насчет двух циклов, логично не создавать коннект к базе, если он не нужен. Но два идентичных условия проверки - лишь усложняют код. И в будущем если Вам понадобится изменить условие придется менять в двух местах. Я бы оставил создание коннекта внутри.
Так же кажется, что код по работе с базой логично вынести в два отдельных метода или даже объединить в один.
Есть ощущение, что интерфейс Consumer здесь лишний. Напрашивается интерфейс с двумя методами get(), put(). Так же из кода не ясно кто будет вызывать методtestTimeout. Кажется, что этот метод должен вызывать сам менеджер, через какой-то промежуток времени, ведь это его зона отвественности - поддержка в актуальном состоянии статусов клиентов. Тогда следует завести например Executors.newScheduledThreadPool(1), и в нем поток который будет вызывать приватный метод testTimeout.
Обновленная версия
public class ActiveClientManager implements Consumer
public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException {
this.pds = pds;
this.timeoutMSec = timeoutMSec;
executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES);
}
private void testTimeout() {
long currentTime = System.currentTimeMillis();
clients.entrySet().removeIf((entry) -> {
// если время уже прошло
if (entry.getValue() < currentTime) {
// обновляем БД
updateState(entry.getKey(), State.notactive);
// удаляем этот элемент
return true;
}
return false;
});
}
private void updateState(Integer clientId, State state) {
try (Connection con = pds.getConnection();
PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState() + " WHERE id = ?");) {
stmt.setInt(1, clientId);
stmt.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void accept(Integer clientID) {
long offlineTime = System.currentTimeMillis() + timeoutMSec;
clients.compute(clientID, (key, oldval) -> {
// Если запись новая - обновляем БД
if (oldval == null) {
updateState(clientID, State.active);
}
return offlineTime;
});
}
public enum State {
active("1"), notactive("NULL");
private final String sqlState;
State(String sqlState) {
this.sqlState = sqlState;
}
public String getSqlState() {
return sqlState;
}
}
}
Комментариев нет:
Отправить комментарий