#c_sharp #многопоточность #backgroundworker
Имеется такой код. // Коллекция буферов на запись List> writeReadyList = new List
>(); private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e) { // Локальный буфер данного потока List
localBufer = new List (); Random random = new Random(); while (!backgroundWorker1.CancellationPending) { // Забиваем локальный буфер значениями localBufer.Add(random.Next(0, 100)); if (localBufer.Count > 10) { // Передаем буфер в очередь на запись и обнуляем его writeReadyList.Add(localBufer); localBufer = new List (); } Thread.Sleep(50); } } private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e) { while (!backgroundWorker2.CancellationPending) { // Коллекция буферов, которые не удалось записать List > failList = new List
>(); foreach (List
currentBufer in writeReadyList) { try { // Пишем значения из перебираемого буфера в базу данных } catch { failList.Add(currentBufer); } } writeReadyList = new List >(failList); Thread.Sleep(450); } } Немного пояснений по коду - первый поток читает некие значения, копит их в локальном буфере. При достижении определенного количества элементов локальный буфер ставится в очередь на запись и сбрасывается. Второй поток смотрит, что у нас есть в очереди на запись и, собственно, пишет значения в базу данных. Изменять коллекцию в foreach нельзя, поэтому, чтобы исключить все, что записалось успешно, формирую список из неудачно записанных буферов. Насколько понимаю, при таком подходе возможны несколько вариантов некорректной работы. в момент работы foreach во втором потоке она будет изменена из первого потока - получаем исключение? второй поток выполнил перебор очереди, но еще не выполнил перезаписи в соответствии с неудавшимися попытками записи. В этот момент первый добавляет запись, но, так как второй ничего об этом не знает, то запись будет потеряна. Как правильно организовать работу с очередью на запись между этими потоками? Причем организовать надо так, чтобы если второй поток в данный момент обрабатывает очередь, то первый поток не ждал окончания работы с ресурсом, а продолжал складывать значения в свой локальный буфер. В какую сторону копать? ______UPD: По совету VladD использовал мьютексы. Mutex writeReadyAccess = new Mutex(); private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e) { if (localBufer.Count > 10) // Таймаут в 1 секунду тк нельзя тормозить первый поток на время обработки очереди if (writeReadyAccess.WaitOne(1)) { // Если второй поток не обрабатывает очередь, то добавляем в нее локальный буфер и обнуляем его writeReadyAccess.ReleaseMutex(); } // Если очередь была в обработке на момент запроса, то продолжаем писать в локальный буфер } private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e) { // Этот поток можно тормозить по времени, поэтому ждем без таймаута if (writeReadyAccess.WaitOne()) { // Обработка очереди, в том числе ее перезапись writeReadyAccess.ReleaseMutex(); } }
Ответы
Ответ 1
using System.Collections.Concurrent; using System.Threading; class Dto { public int State; public int Index; } class Test { BlockingCollectionlist = new BlockingCollection (); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for(int i=0; i < 3; i++) list.Add(new Dto() { Index = i }); } void backgroundWorker2() { foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index + " state=" + v.State); // если сохранить не удалось, то возвращаем в list if (v.Index == 0 && v.State++ < 2) list.Add(v); if(list.Count == 0) break; } Console.WriteLine("completed"); } } var t = new Test(); t.Run(); Результат index=0 state=0 index=1 state=0 index=2 state=0 index=0 state=1 вторая попытка записи index=0 state=2 еще одна completed Если надо передавать данные порциями, то определите коллекцию, например, так BlockingCollection UPDATE Другая версия, в которой backgroundWorker1 работает медленно class Dto { public int Index; } class Test { BlockingCollection list = new BlockingCollection (); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for (int i = 0; i < 3; i++) { list.Add(new Dto() { Index = i }); // Thread.Sleep(1000); } list.CompleteAdding(); } void backgroundWorker2() { var retry = new Queue (); foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index); // если сохранить не удалось, то ставим в очередь if (v.Index == 0) retry.Enqueue(v); } // повторяем попытку записи while(retry.Count > 0) Console.WriteLine("retry=" + retry.Dequeue().Index); Console.WriteLine("completed"); } } var t = new Test(); t.Run(); Результат index=0 index=1 index=2 retry=0 повторная попытка записи completed UPDATE Для конвейеризации обработки данных в разных потоках предназначена библиотека потоков данных - TPL Dataflow. Описание с примерами есть в MSDN и есть nuget-пакет Microsoft TPL Dataflow.
Комментариев нет:
Отправить комментарий