#c_sharp #многопоточность #очередь #очередь_задач #инспекция_кода
Мне потребовалась очередь команд, где множество потоков может добавлять команды на выполнение и один поток по очереди их выполняет. Тк я не знаю какие-либо стандартные реализации подобного, то пришлось делать самому. Можете оценить мой класс для этого, тк у меня сомнения на счет его качества, хотя он работает. И если кто-либо даст ссылку на реализации подобного, то тоже будет приятно. public class ConcurrentCommandsQueue: IDisposable { private readonly object _lockForSyncExecuting; private readonly Queue _commandQueue; private readonly Action _executeCommand; private readonly Action _onStartCommandsExecution; private readonly Action _onEndCommandsExecution; private volatile bool _isDisposed; private volatile bool _isExecuting; private volatile Task _taskForAction; public bool IsExecuting { get { return _isExecuting; } } public ConcurrentCommandsQueue(Action executeCommand) : this(executeCommand, null, null) { } public ConcurrentCommandsQueue(Action executeCommand, Action onStartCommandsExecution, Action onEndCommandsExecution) { _lockForSyncExecuting = new object(); _commandQueue = new Queue (10); _isDisposed = false; _isExecuting = false; _executeCommand = executeCommand; _onStartCommandsExecution = onStartCommandsExecution; _onEndCommandsExecution = onEndCommandsExecution; _taskForAction = new TaskCompletionSource ().Task; } private void ExecuteCommands() { if (_onStartCommandsExecution != null) _onStartCommandsExecution(); T command = default(T); while (true) { lock (_lockForSyncExecuting) { if (_commandQueue.Count == 0) { _isExecuting = false; break; } command = _commandQueue.Dequeue(); } _executeCommand(command); } if (_onEndCommandsExecution != null) _onEndCommandsExecution(command); } public void AddCommand(T command) { if (_isDisposed) throw new ObjectDisposedException(GetType().ToString()); lock (_lockForSyncExecuting) { if (_isDisposed) return; _commandQueue.Enqueue(command); if (!_isExecuting) { _isExecuting = true; _taskForAction = Task.Run(new Action(ExecuteCommands)); } } } public void ClearCommandQueue() { if (_isDisposed) throw new ObjectDisposedException(GetType().ToString()); lock (_lockForSyncExecuting) _commandQueue.Clear(); } public void Dispose() { if (_isDisposed) return; _isDisposed = true; lock (_lockForSyncExecuting) _commandQueue.Clear(); if (_taskForAction != null) _taskForAction.Wait(); } }
Ответы
Ответ 1
У вас получился код в стиле .NET 1.0, который слегка эволюционировал с приходом лямбд (Delegate механически заменён на Action) и задач (new Thread().Start() механически заменён на Task.Run()), но по сути совершенно не изменился: вы TPL с async/await толком не используете, а существование concurrent коллекций вообще упустили из виду. В .NET 4.5 это всё ненужно в принципе. Если вам нужно выполнение задач строго в одном потоке, то можно просто кидать задачи в однопоточный планировщик задач (task scheduler). Собственно, на этой первой строчке весь код и заканчивается. Если нужно выполнение кода перед всеми операциями и после всех операций, то код перед и после этой строчки и пишется. Отмечу, что при этой однострочной реализации вы ещё имеете бонусом возможность использования токена отмены (cancellation token) и прочих радостей жизни. Рассмотрим пример. Допустим, надо выполнить две очереди задач: в каждой очереди задачи выполняются последовательно, но очереди выполняются параллельно. Создаём два планировщика задач с ограничением параллелизма, затем создаём задачи с указанием нужного нам планировщика. class Program { static readonly Random _rnd = new Random(); static readonly LimitedConcurrencyTaskScheduler _schedulerFoo = new LimitedConcurrencyTaskScheduler(1); static readonly LimitedConcurrencyTaskScheduler _schedulerBar = new LimitedConcurrencyTaskScheduler(1); static void Main () => new Program().Run().Wait(); async Task Run () { Task queueFoo = RunQueue("Foo", _schedulerFoo, Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Foo")))); Task queueBar = RunQueue("Bar", _schedulerBar, Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Bar")))); await Task.WhenAll(queueFoo, queueBar); Console.WriteLine("Done!"); Console.ReadKey(); } async Task RunQueue (string name, TaskScheduler scheduler, IEnumerablecommands) { Console.WriteLine($"{name}: Start"); await Task.WhenAll(commands.Select(c => RunTask(c, scheduler))); Console.WriteLine($"{name}: Finish"); } async Task RunTask (Action task, TaskScheduler scheduler) { await Task.Factory.StartNew(task, CancellationToken.None, TaskCreationOptions.None, scheduler); } void Foo (string name) { int timeout = _rnd.Next(200); Console.WriteLine($"{name}: Start {timeout}"); Thread.Sleep(timeout); Console.WriteLine($"{name}: Finish {timeout}"); } } Пример вывода: Foo: Start Bar: Start Foo: Start 165 Bar: Start 50 Bar: Finish 50 Bar: Start 39 Bar: Finish 39 Bar: Start 115 Foo: Finish 165 Foo: Start 116 Bar: Finish 115 Bar: Finish Foo: Finish 116 Foo: Start 0 Foo: Finish 0 Foo: Finish Done! Хотя в этом примере все задачи и кидаются одновременно, задачи можно добавлять в любой момент. Правда тогда смысл "начала" и "конца" несколько теряется. Если вы объясните, что вы собираетесь с ними делать, тогда можно будет добавить. Здесь я воспользовался планировщиком LimitedConcurrencyTaskScheduler из примеров на MSDN: Samples for Parallel Programming with the .NET Framework (статья с описанием: ParallelExtensionsExtras Tour - #7 - Additional TaskSchedulers). /// /// Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool. /// Source: http://code.msdn.microsoft.com/ParExtSamples /// Documentation: http://blogs.msdn.com/b/pfxteam/archive/2010/04/09/9990424.aspx /// License: MS-LPL /// public class LimitedConcurrencyTaskScheduler : TaskScheduler { [ThreadStatic] private static bool _currentThreadIsProcessingItems; private readonly int _maxDegreeOfParallelism; private readonly LinkedList_tasks = new LinkedList (); // protected by lock(_tasks) private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the specified degree of parallelism. /// The maximum degree of parallelism provided by this scheduler. public LimitedConcurrencyTaskScheduler (int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); _maxDegreeOfParallelism = maxDegreeOfParallelism; } ///Gets the maximum concurrency level supported by this scheduler. public override sealed int MaximumConcurrencyLevel => _maxDegreeOfParallelism; protected override sealed IEnumerableGetScheduledTasks () { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } protected override sealed void QueueTask (Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } /// Informs the ThreadPool that there's work to be executed for this scheduler. private void NotifyThreadPoolOfPendingWork () { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue TryExecuteTask(item); } } finally { // We're done processing items on the current thread _currentThreadIsProcessingItems = false; } }, null); } protected override sealed bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); // Try to run the task. return TryExecuteTask(task); } protected override sealed bool TryDequeue (Task task) { lock (_tasks) return _tasks.Remove(task); } } Ещё можно воспользоваться BlockingCollection<> с ConcurrentQueue<> внутри. По окончанию добавления надо будет не забыть вызвать CompleteAdding. Ещё есть TPL Dataflow. Там параллелизмы и прочие настраиваются без кастомных планировщиков. Можно ещё Rx добавить в качестве вишенки на торте. Вариантов море. Вообще, сейчас ещё @Vlad прибежит, расскажет про хитрые мозговыносящие способы решить эту задачу. :)Ответ 2
Вам нужен MailboxProcessor - агент для обработки сообщений, который выполняет асинхронные операции. Реализация паттерна "много писателей, один читатель".
Комментариев нет:
Отправить комментарий