Страницы

Поиск по вопросам

воскресенье, 12 января 2020 г.

Организация доступа к ресурсу из разных потоков

#c_sharp #многопоточность #backgroundworker


Имеется такой код.

// Коллекция буферов на запись
List> writeReadyList = new List>();

private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e)     
{
    // Локальный буфер данного потока
    List localBufer = new List();
    Random random = new Random();

    while (!backgroundWorker1.CancellationPending)
    {
        // Забиваем локальный буфер значениями
        localBufer.Add(random.Next(0, 100));

        if (localBufer.Count > 10)            
        {
            // Передаем буфер в очередь на запись и обнуляем его
            writeReadyList.Add(localBufer);
            localBufer = new List();
        }

        Thread.Sleep(50);
    }
}

private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e)
{
    while (!backgroundWorker2.CancellationPending) 
    {
        // Коллекция буферов, которые не удалось записать
        List> failList = new List>(); 

        foreach (List currentBufer in writeReadyList)
        {
            try
            {
                // Пишем значения из перебираемого буфера в базу данных
            }
            catch { failList.Add(currentBufer); }
        }

        writeReadyList = new List>(failList);

        Thread.Sleep(450);
    }
}


Немного пояснений по коду - первый поток читает некие значения, копит их в локальном
буфере. При достижении определенного количества элементов локальный буфер ставится
в очередь на запись и сбрасывается. Второй поток смотрит, что у нас есть в очереди
на запись и, собственно, пишет значения в базу данных. Изменять коллекцию в foreach
нельзя, поэтому, чтобы исключить все, что записалось успешно, формирую список из неудачно
записанных буферов.

Насколько понимаю, при таком подходе возможны несколько вариантов некорректной работы.


в момент работы foreach во втором потоке она будет изменена из первого потока - получаем
исключение?
второй поток выполнил перебор очереди, но еще не выполнил перезаписи в соответствии
с неудавшимися попытками записи. В этот момент первый добавляет запись, но, так как
второй ничего об этом не знает, то запись будет потеряна.


Как правильно организовать работу с очередью на запись между этими потоками? Причем
организовать надо так, чтобы если второй поток в данный момент обрабатывает очередь,
то первый поток не ждал окончания работы с ресурсом, а продолжал складывать значения
в свой локальный буфер. В какую сторону копать?

______UPD:

По совету VladD использовал мьютексы.

    Mutex writeReadyAccess = new Mutex();

    private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e)
    {
        if (localBufer.Count > 10)
            // Таймаут в 1 секунду тк нельзя тормозить первый поток на время обработки
очереди
            if (writeReadyAccess.WaitOne(1))
            {
                // Если второй поток не обрабатывает очередь, то добавляем в нее
локальный буфер и обнуляем его
                writeReadyAccess.ReleaseMutex();
            }
        // Если очередь была в обработке на момент запроса, то продолжаем писать
в локальный буфер        
    }

    private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e)
    {
        // Этот поток можно тормозить по времени, поэтому ждем без таймаута
        if (writeReadyAccess.WaitOne())
        {
            // Обработка очереди, в том числе ее перезапись
            writeReadyAccess.ReleaseMutex();
        }
    }

    


Ответы

Ответ 1



using System.Collections.Concurrent; using System.Threading; class Dto { public int State; public int Index; } class Test { BlockingCollection list = new BlockingCollection(); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for(int i=0; i < 3; i++) list.Add(new Dto() { Index = i }); } void backgroundWorker2() { foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index + " state=" + v.State); // если сохранить не удалось, то возвращаем в list if (v.Index == 0 && v.State++ < 2) list.Add(v); if(list.Count == 0) break; } Console.WriteLine("completed"); } } var t = new Test(); t.Run(); Результат index=0 state=0 index=1 state=0 index=2 state=0 index=0 state=1 вторая попытка записи index=0 state=2 еще одна completed Если надо передавать данные порциями, то определите коллекцию, например, так BlockingCollection UPDATE Другая версия, в которой backgroundWorker1 работает медленно class Dto { public int Index; } class Test { BlockingCollection list = new BlockingCollection(); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for (int i = 0; i < 3; i++) { list.Add(new Dto() { Index = i }); // Thread.Sleep(1000); } list.CompleteAdding(); } void backgroundWorker2() { var retry = new Queue(); foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index); // если сохранить не удалось, то ставим в очередь if (v.Index == 0) retry.Enqueue(v); } // повторяем попытку записи while(retry.Count > 0) Console.WriteLine("retry=" + retry.Dequeue().Index); Console.WriteLine("completed"); } } var t = new Test(); t.Run(); Результат index=0 index=1 index=2 retry=0 повторная попытка записи completed UPDATE Для конвейеризации обработки данных в разных потоках предназначена библиотека потоков данных - TPL Dataflow. Описание с примерами есть в MSDN и есть nuget-пакет Microsoft TPL Dataflow.

Комментариев нет:

Отправить комментарий