Страницы

Поиск по вопросам

среда, 30 января 2019 г.

Многопоточность, блокировка доступа

Есть приведенный ниже код, суть в том, что функции Add(), Read(), Modify(), Remove(), вызываются извне и в хаотичном порядке, с разным периодом во времени.
Уже сломал голову, подскажите с помощью какой технологии языка C# мне организовать одновременный доступ к разным элементам массива MyList? И чтобы во время удаления/добавления новых элементов, текущие процессы не сбивались с толку, например в потоке ReadThread в данную секунду обрабатывается 5ый элемент массива и в туже секунду потоком RemoveThread уничтожается 4ый, индекс 5ого измениться, как быть?
Делал объект Lock для каждой структуры свой и блокировал отдельные элементы MyList, но тоже не решило проблемы добавления, удаления.
class test { //переменные List MyList = new List(); //блокировка доступа Object Lock = new Object();
//буфер и поток для обработки событий Add BlockingCollection AddBuffer = new BlockingCollection(); Thread AddThread;
//буфер и поток для обработки событий Read BlockingCollection ReadBuffer = new BlockingCollection(); Thread ReadThread;
//буфер и поток для обработки событий Modify BlockingCollection ModifyBuffer = new BlockingCollection(); Thread ModifyThread;
//буфер и поток для обработки событий Remove BlockingCollection RemoveBuffer = new BlockingCollection(); Thread RemoveThread;
//конструктор public test() { if(!AddThread.IsAlive) { AddThread = new Thread(new ThreadStart(AddBufferReader)); AddThread.IsBackground = true; AddThread.Start(); }
if(!ReadThread.IsAlive) { ReadThread = new Thread(new ThreadStart(ReadBufferReader)); ReadThread.IsBackground = true; ReadThread.Start(); }
if(!ModifyThread.IsAlive) { ModifyThread = new Thread(new ThreadStart(ModifyBufferReader)); ModifyThread.IsBackground = true; ModifyThread.Start(); }
if(!RemoveThread.IsAlive) { RemoveThread = new Thread(new ThreadStart(RemoveBufferReader)); RemoveThread.IsBackground = true; RemoveThread.Start(); } }
//Добавляем internal void Add(string Name, string InputString) { //добавляем имя структуры которую надо добавить AddBuffer.Add(new MyStruct(Name, InputString)); } void AddBufferReader() { foreach(var MyStruct in AddBuffer.GetConsumingEnumerable()) { lock(Lock) { //некоторые условия, не все будет добавлено в MyList if(true) { //add, add MyList.Add(new MyStruct(MyStruct.Name, MyStruct.MyString)); } } } }
//Читаем internal void Read(string Name) { //добавляем имя структуры которую надо прочитать и вывести/обработать ReadBuffer.Add(Name); } void ReadBufferReader() { foreach(var Name in ReadBuffer.GetConsumingEnumerable()) { lock(Lock) { for(int i = 0; i < MyList.Count; i++) { if(MyList[i].Name == Name) { //долгие вычисления break; } } } } }
//Изменяем internal void Modify(string Name, string NewString) { //добавляем имя структуры которую надо прочитать и вывести/обработать ModifyBuffer.Add(new MyStruct(Name, NewString)); } void ModifyBufferReader() { foreach(var MyStruct in ModifyBuffer.GetConsumingEnumerable()) { lock(Lock) { for(int i = 0; i < MyList.Count; i++) { if(MyList[i].Name == MyStruct.Name) { //долгие вычисления MyList[i] = new MyStruct(MyList[i].Name, MyStruct.MyString); break; } } } } }
//Удаляем internal void Remove(string Name) { //добавляем имя структуры которую надо удалить из MyList RemoveBuffer.Add(Name); } void RemoveBufferReader() { foreach(var Name in RemoveBuffer.GetConsumingEnumerable()) { lock(Lock) { for(int i = 0; i < MyList.Count; i++) { if(MyList[i].Name == Name) { //remove, remove MyList.RemoveAt(i); break; } } } } }
struct MyStruct { internal string Name; internal string MyString;
public MyStruct(string Name, string MyString) { this.Name = Name; this.MyString = MyString; } } }


Ответ

Я бы начал с ReaderWriterLockSlim или ReaderWriterLock
В этой документации приведён полноценный пример использования. Краткая суть в том, что читать могут сразу несколько потоков. Писать может только один. Читающий поток может захотеть писать и тогда он запрашивает блокировку на запись , что предотвращает появление новых читателей, пока объект не освободится и не завершится запись. Читатели, на время, пока объект заблокирован и происходит запись, ждут. Максимальное время ожидания регулируется вами.
Для реализации более гранулированной блокировки вы можете объявить "корзины" для элементов, которые находятся в процессе чтения и "корзину" для элементов, находящихся в процессе правки, то есть использовать не один а два или более экземпляров ReaderWriterLockSlim. Однако, будьте осторожны. Грануляция блокировок всегда черевата мёртвыми блокировками (deadlock).
Представьте, что в вашем MyList два объекта и вы решили блокировать их отдельно. К вам одновременно приходят два запроса. Один - на чтение, другой на запись.
Первый поток забрал первый элемент MyList на чтение. Второй поток забрать второй элемент MyList на запись. Первый поток начал ждать освобождения второго элемента. Он хочет его почитать. Второй поток начал ждать окончания чтения первого элемента. Он хочет его исправить.
Вы в ж... я хотел сказать в deadlock-е. Именно по этому так полезно указывать конечное значение timeout при обращении к ReaderWriterLockSlim
Ну или сразу согласиться, что экземпляр ReaderWriterLockSlim будет один и на запись будет блокироваться весь ресурс целиком. Тогда deadlock-и вам не страшны.

Комментариев нет:

Отправить комментарий