Страницы

Поиск по вопросам

суббота, 4 января 2020 г.

Многопоточность, блокировка доступа

#c_sharp #многопоточность


Есть приведенный ниже код, суть в том, что функции Add(), Read(), Modify(), Remove(),
вызываются извне и в хаотичном порядке, с разным периодом во времени.

Уже сломал голову, подскажите с помощью какой технологии языка C# мне организовать
одновременный доступ к разным элементам массива MyList? И чтобы во время удаления/добавления
новых элементов, текущие процессы не сбивались с толку, например в потоке ReadThread
в данную секунду обрабатывается 5ый элемент массива и в туже секунду потоком RemoveThread
уничтожается 4ый, индекс 5ого измениться, как быть?

Делал объект Lock для каждой структуры свой и блокировал отдельные элементы MyList,
но тоже не решило проблемы добавления, удаления.

class test
{
    //переменные
    List MyList = new List();
    //блокировка доступа
    Object Lock = new Object();

    //буфер и поток для обработки событий Add
    BlockingCollection AddBuffer = new BlockingCollection();
    Thread AddThread;

    //буфер и поток для обработки событий Read
    BlockingCollection ReadBuffer = new BlockingCollection();
    Thread ReadThread;

    //буфер и поток для обработки событий Modify
    BlockingCollection ModifyBuffer = new BlockingCollection();
    Thread ModifyThread;

    //буфер и поток для обработки событий Remove
    BlockingCollection RemoveBuffer = new BlockingCollection();
    Thread RemoveThread;

    //конструктор
    public test()
    {
        if(!AddThread.IsAlive)
        {
            AddThread = new Thread(new ThreadStart(AddBufferReader));
            AddThread.IsBackground = true;
            AddThread.Start();
        }

        if(!ReadThread.IsAlive)
        {
            ReadThread = new Thread(new ThreadStart(ReadBufferReader));
            ReadThread.IsBackground = true;
            ReadThread.Start();
        }

        if(!ModifyThread.IsAlive)
        {
            ModifyThread = new Thread(new ThreadStart(ModifyBufferReader));
            ModifyThread.IsBackground = true;
            ModifyThread.Start();
        }

        if(!RemoveThread.IsAlive)
        {
            RemoveThread = new Thread(new ThreadStart(RemoveBufferReader));
            RemoveThread.IsBackground = true;
            RemoveThread.Start();
        }
    }

    //Добавляем 
    internal void Add(string Name, string InputString)
    {
        //добавляем имя структуры которую надо добавить
        AddBuffer.Add(new MyStruct(Name, InputString));
    }
    void AddBufferReader()
    {
        foreach(var MyStruct in AddBuffer.GetConsumingEnumerable())
        {
            lock(Lock)
            {
                //некоторые условия, не все будет добавлено в MyList
                if(true)
                {
                    //add, add
                    MyList.Add(new MyStruct(MyStruct.Name, MyStruct.MyString));
                }
            }
        }
    }

    //Читаем
    internal void Read(string Name)
    {
        //добавляем имя структуры которую надо прочитать и вывести/обработать
        ReadBuffer.Add(Name);
    }
    void ReadBufferReader()
    {
        foreach(var Name in ReadBuffer.GetConsumingEnumerable())
        {
            lock(Lock)
            {
                for(int i = 0; i < MyList.Count; i++)
                {
                    if(MyList[i].Name == Name)
                    {
                        //долгие вычисления
                        break;
                    }
                }
            }
        }
    }

    //Изменяем
    internal void Modify(string Name, string NewString)
    {
        //добавляем имя структуры которую надо прочитать и вывести/обработать
        ModifyBuffer.Add(new MyStruct(Name, NewString));
    }
    void ModifyBufferReader()
    {
        foreach(var MyStruct in ModifyBuffer.GetConsumingEnumerable())
        {
            lock(Lock)
            {
                for(int i = 0; i < MyList.Count; i++)
                {
                    if(MyList[i].Name == MyStruct.Name)
                    {
                        //долгие вычисления
                        MyList[i] = new MyStruct(MyList[i].Name, MyStruct.MyString);
                        break;
                    }
                }
            }
        }
    }

    //Удаляем
    internal void Remove(string Name)
    {
        //добавляем имя структуры которую надо удалить из MyList
        RemoveBuffer.Add(Name);
    }
    void RemoveBufferReader()
    {
        foreach(var Name in RemoveBuffer.GetConsumingEnumerable())
        {
            lock(Lock)
            {
                for(int i = 0; i < MyList.Count; i++)
                {
                    if(MyList[i].Name == Name)
                    {
                        //remove, remove
                        MyList.RemoveAt(i);
                        break;
                    }
                }
            }
        }
    }

    struct MyStruct
    {
        internal string Name;
        internal string MyString;

        public MyStruct(string Name, string MyString)
        {
            this.Name = Name;
            this.MyString = MyString;
        }
    }
}

    


Ответы

Ответ 1



Я бы начал с ReaderWriterLockSlim или ReaderWriterLock: В этой документации приведён полноценный пример использования. Краткая суть в том, что читать могут сразу несколько потоков. Писать может только один. Читающий поток может захотеть писать и тогда он запрашивает блокировку на запись , что предотвращает появление новых читателей, пока объект не освободится и не завершится запись. Читатели, на время, пока объект заблокирован и происходит запись, ждут. Максимальное время ожидания регулируется вами. Для реализации более гранулированной блокировки вы можете объявить "корзины" для элементов, которые находятся в процессе чтения и "корзину" для элементов, находящихся в процессе правки, то есть использовать не один а два или более экземпляров ReaderWriterLockSlim. Однако, будьте осторожны. Грануляция блокировок всегда черевата мёртвыми блокировками (deadlock). Представьте, что в вашем MyList два объекта и вы решили блокировать их отдельно. К вам одновременно приходят два запроса. Один - на чтение, другой на запись. Первый поток забрал первый элемент MyList на чтение. Второй поток забрать второй элемент MyList на запись. Первый поток начал ждать освобождения второго элемента. Он хочет его почитать. Второй поток начал ждать окончания чтения первого элемента. Он хочет его исправить. Вы в ж... я хотел сказать в deadlock-е. Именно по этому так полезно указывать конечное значение timeout при обращении к ReaderWriterLockSlim. Ну или сразу согласиться, что экземпляр ReaderWriterLockSlim будет один и на запись будет блокироваться весь ресурс целиком. Тогда deadlock-и вам не страшны.

Ответ 2



Окей, вам по идее нужен fine-grained lock. То есть блокировка не на уровне всего списка, а на уровне одного элемента. Кроме того, раз вы ищете элемент по ключу, имеет смысл воспользоваться Dictionary. Для этого давайте оснастим элемент списка объектом для блокировки. Выйдет что-то такое: class Wrapper // это будет одновременно и обёртка, и объект для блокировки { public T Value; public bool IsAlive = true; } Dictionary> MyDict = new Dictionary>; object Lock = new object(); void AddItem(string key, MyStruct value) { lock(Lock) { MyDict.Add(key, new Wrapper() { Value = value }); } } void ModifyItem(string key) { Wrapper wrapper; lock(Lock) { if (!MyDict.TryGetValue(key, out wrapper)) return; // нет такого (уже) } // получить данные можно под внутренние блокировкой: MyStruct oldValue; lock (wrapper) { if (!wrapper.IsAlive) return; oldValue = wrapper.Value; } // теперь у нас есть данные, можно с ними долго работать /* тут длинные вычисления */ // для модификации нужно залочить снова wrapper lock (wrapper) { if (!wrapper.IsAlive) return; // упс, наш элемент тем временем удалили wrapper.Value = newValue; } } void DeleteItem(string key) { Wrapper wrapper; lock(Lock) { if (!MyDict.TryGetValue(key, out wrapper)) return; // нет такого (уже) } // модифицируем lock (wrapper) { if (!wrapper.IsAlive) return; wrapper.IsAlive = false; } lock (Lock) { MyDict.RemoveKey(key); } } Если у вас зависимые данные, вам нужно для обработки залочить несколько элементов. Для этого, чтобы избежать deadlock'ов, имеет смысл лочить их в фиксированном порядке (например, по возрастанию ключа). void ModifyItems(IEnumerable keys) { var materialKeys = keys.OrderBy(k => k).ToList(); List> wrappers = new List>(); bool lockSuccessful = true; lock(Lock) { Wrapper wrapper; foreach (var key in materialKeys) { if (!MyDict.TryGetValue(key, out wrapper)) { lockSuccessful = false; break; } wrappers.Add(wrapper); Monitor.Enter(wrapper); } } if (lockSuccessful) { // мы получили данные, работаем с ними /* тут длинные вычисления */ } lock (Lock) { foreach (var key in materialKeys.Reverse()) { Monitor.Exit(wrapper); } } } Если вам неинтересно в конце ModifyItem убедиться, что значение ещё живое, можно всё упростить. Dictionary MyDict = new Dictionary(); object Lock = new object(); void AddItem(string key, MyStruct value) { lock(Lock) { MyDict.Add(key, value); } } void ModifyItem(string key) { MyStruct oldValue; lock(Lock) { if (!MyDict.TryGetValue(key, out oldValue)) return; // нет такого (уже) } // теперь у нас есть данные, можно с ними долго работать /* тут длинные вычисления */ // для модификации нужно залочить снова wrapper lock(Lock) { if (!MyDict.ContainsKey(key)) return; // удалили тем временем MyDict[key] = newValue; } } void DeleteItem(string key) { lock(Lock) { MyDict.RemoveKey(key); } }

Комментариев нет:

Отправить комментарий