Страницы

Поиск по вопросам

среда, 20 февраля 2019 г.

Индекс в кольцевом буфере, поддерживающем многопоточность

Вводные данные: вектор элементов постоянного размера (кольцевой буфер), плюс атомарное число, указывающее на текущее положение в этом самом векторе.
const int MAXIMUM = 100; QVector _buffer(MAXIMUM);
QAtomicInt _atomic_index;
Далее метод, который позволяет многопоточно вставлять данные в вектор:
void insert(const Data &data) { _buffer[_atomic_index++] = data; }
Здесь очевидна проблема, что по достижению границы размера вектора необходимо вернуться к нулевому индексу:
void insert(const Data &data) { _atomic_index.testAndSetOrdered(MAXIMUM,0); _buffer[_atomic_index++] = data; }
Это не решение, т.к. если, например, два или более потоков одновременно пройдут проверку в первой строке и при этом значение _atomic_index будет неподалёку от границы вектора, то выход за неё и последующий сегфолт будут фактически гарантированы.
Получается, что в данной ситуации требуется атомарная операция, состоящая из проверки на равенство максимуму, плюс в ней же инкрементация индекса:
void insert(const Data &data) { mutex.lock(); if(index == MAXIMUM) index = 0; _buffer[index++] = data; mutex.unlock(); }
Но этот вариант полностью нивелирует профит при многопоточной вставке данных.
Я попытался самостоятельно решить проблему, однако нет полной уверенности в корректности решения:
void insert(const Data &data) { int index = _atomic_index++; if(index >= MAXIMUM) { const int new_index = index - MAXIMUM; _atomic_index.testAndSetOrdered(index+1,new_index); index = new_index; }
_buffer[index++] = data; }
Смысл в том, чтобы использовать локальную и индивидуальную для каждого из потоков переменную, содержащую значение текущего индекса:
int index = _atomic_index++;
Именно эту переменную проверяем на выход за пределы размера вектора:
if(index >= MAXIMUM) {...}
А уже внутри этого условия _atomic_index, по моему соображению, должен принять значение самой последней операции инкрементирования индекса. Прав ли я? Или, быть может, существует иной способ решения обозначенной задачи?
Обновление
Дабы акцентировать внимание только на моменте проверки индекса на выход за границу размера вектора, примем по умолчанию, что кол-во записывающих потоков в данном конкретном примере значительно (в разы) меньше, нежели чем кол-во элементов в векторе и, тем самым, исключим из внимания возможность перезаписи ещё не прочитанных (занятых) элементов вектора вновь вставляемыми элементами. Например:
записывающих потоков: 10 читающих потоков: 20 длина вектора: 100
Читающие потоки не отстают от записывающих, что означает, что все вновь заполненные ячейки будут практически сразу прочитаны. Или даже радикально: записанные ячейки можно хоть сразу перезаписывать, поскольку данные для чтения не важны.
Интересует только момент сброса значения счётчика текущего индекса записи при многопоточной вставке, когда, например, все 10 потоков одновременно будут пытаться его увеличить на единицу и текущий индекс будет равен 98 из 100 возможных элементов. В этом случае правильная обработка должна одному потоку назначить индекс 99, тогда как оставшимся девяти: от 0 до 8. При этом значение текущего индекса (в коде это _atomic_index) должно сравняться с именно последним индексом из присвоенных, т.е. также 8.
Обновление 2
В зачёркнутом утверждении - ошибка. Текущий индекс должен стать равным 9, т.е. на единицу больше от последнего вставленного, поскольку в строке:
int index = _atomic_index++;
... используется пост-инкремент.


Ответ

Мой ответ основывается на собственном опыте с java; с с++ у меня практически не было опыта, но семантика должна быть одна и та же.
Я бы хотел разобрать предложенный пример. Сделаем мысленный эксперимент со следующей ситуацией:
void insert(const Data &data) { int index = _atomic_index++; // Thread B if(index >= MAXIMUM) { const int new_index = index - MAXIMUM; _atomic_index.testAndSetOrdered(index+1,new_index); // Thread A index = new_index; }
_buffer[index++] = data; }
Thread A успел пройти несколько выражений быстрее, чем Thread B, и в тот момент, когда Thread A пытается сбросить индекс на ноль, Thread B его инкрементирует, поэтому сброс индекса на ноль провалится, и (так как операция не создает преград для ее повторения) это дает возможность для неограниченного увеличения _atomic_index - если ситуация постоянно повторяется. Как только _atomic_index достигнет значения MAXIMUM * 2 - 1, index будет вычислен со значением MAXIMUM, что приведет к попытке записи за пределы ожидаемого диапазона значений.
Первое решение, которое придет в голову - использовать оператор modulo вместо однократного вычитания MAXIMUM - позволит уменьшить вероятность возникновения такой ситуации, но не спасет от нее, потому что при переполнении int в _atomic_index вернется неожиданное значение.
Один из классических подходов для такой ситуации - реализовать spinlock вручную на основе оптимистичной блокировки. Дальше идет псевдокод, т.е. я не знаю реальных методов в используемой библиотеке, равно как и синтаксиса с++:
void insert(const Data &data) { // это все без проблем выносится в отдельный метод int current; int next; do { current = _atomic_index.get(); next = (current + 1) % MAXIMUM; } while (!_atomic_index.compareAndSet(current, next))
_buffer[current] = data; }
В этом случае можно говорить о следующем инварианте:
Следующее значение = текущее значение + 1 % MAXIMUM Следующее значение может быть установлено вместо текущего, только если текущее значение является верным (= не изменилось с момента чтения)
У алгоритма остается ровно два пути:
Установить следующее значение индекса, если он не изменялся Пойти на новый круг, если между двумя операциями индекс изменился
Если у нас есть конкурирующие потоки, это будет выглядеть примерно так:
Индекс: 1 Максимум: 3 Т1: пробует установить 2 вместо 1 Т2: пробует установить 2 вместо 1 Т3: пробует установить 2 вместо 1
(театральная пауза, магия за кулисами)
Т1: успех, установлен 2 Т2: провал, пробует установить 0 вместо 2 Т3: провал, пробует установить 0 вместо 2
(театральная пауза, магия за кулисами)
Т1: отработал Т2: провал, пробует установить 1 вместо 0 Т3: успех, установлен 0
(театральная пауза, магия за кулисами)
Т1: отработал Т2: успех, установлен 1 Т3: отработал
Что будет, если поток "заснул", и за время сна счетчик прошел полный цикл?
Если я ничего не напутал - ничего страшного. Воспроизведем алгоритм:
Индекс: 1 Максимум: 4 Т1: считывает 1 и засыпает Т2: считывает 1 и обновляет на 2 Т2: считывает 2 и обновляет на 3 Т2: считывает 3 и обновляет на 0 Т2: считывает 0 и обновляет на 1 Т2: считывает 1 Т1: просыпается и обновляет 1 на 2 Т2: терпит неудачу в обновлении, заходит на новый круг
Безопасен ли этот подход? Конкретно в этом случае - нет
Данный ответ подсказывает решение на проблему корректного инкрементирования счетчика, но не алгоритма целиком.
Выбор нового индекса и непосредственно установка значения - это две разных операции, и атомарность выполнения метода insert не гарантируется. ОС имеет право приостановить поток сразу после "резерва" индекса и разбудить его спустя произвольное время, поэтому в одну и ту же ячейку могут быть записаны данные в обратном порядке:
Т1: зарезервировал ячейку с индексом 1 Т1: заснул (некоторое время работы с алгоритмом, в течение которого индекс обнуляется и идет на новый круг) Т2: зарезервировал ячейку с индексом 1 Т2: положил данные Т1: проснулся Т1: положил устаревшие данные Кроме того, индекс инкрементируется до появления данных по адресу - это значит, что прочитавший индекс может обратиться к ячейке, в которой все еще находятся старые данные, считая их актуальными Кроме того, весь алгоритм нельзя считать рабочим, пока не учитывается положение хвоста, иначе хвост может дойти до головы, и в этом случае (!) потеряется MAXIMUM элементов.
Если хотите найти хорошую реализацию подобного алгоритма, стоит поискать в гугле lock-free circular buffer или аналогичную фразу; я, если честно, не могу сходу придумать реализаций кроме linked list с отслеживанием длины (в котором будет очень сложно гарантировать корректность алгоритма без пессимистичной блокировки) и непосредственно пессимистичную блокировку. Последняя может оказаться не настолько плохим варинатом, и, возможно, будет показывать себя даже лучше приведенного примера спинлока в окружении с большим количеством потоков.

Комментариев нет:

Отправить комментарий