суббота, 30 ноября 2019 г.

Ревью класса для очереди команд

#c_sharp #многопоточность #очередь #очередь_задач #инспекция_кода

Мне потребовалась очередь команд, где множество потоков может добавлять команды на
выполнение и один поток по очереди их выполняет. Тк я не знаю какие-либо стандартные
реализации подобного, то пришлось делать самому. Можете оценить мой класс для этого,
тк у меня сомнения на счет его качества, хотя он работает. И если кто-либо даст ссылку
на реализации подобного, то тоже будет приятно.    

public class ConcurrentCommandsQueue : IDisposable
    private readonly object _lockForSyncExecuting;
    private readonly Queue _commandQueue;

    private readonly Action _executeCommand;
    private readonly Action _onStartCommandsExecution;
    private readonly Action _onEndCommandsExecution;

    private volatile bool _isDisposed;
    private volatile bool _isExecuting;
    private volatile Task _taskForAction;

    public bool IsExecuting
        get { return _isExecuting; }

    public ConcurrentCommandsQueue(Action executeCommand) : this(executeCommand,
null, null) { }
    public ConcurrentCommandsQueue(Action executeCommand, 
        Action onStartCommandsExecution, Action onEndCommandsExecution)
        _lockForSyncExecuting = new object();
        _commandQueue = new Queue(10);

        _isDisposed = false;
        _isExecuting = false;
        _executeCommand = executeCommand;
        _onStartCommandsExecution = onStartCommandsExecution;
        _onEndCommandsExecution = onEndCommandsExecution;
        _taskForAction = new TaskCompletionSource().Task;

    private void ExecuteCommands()
        if (_onStartCommandsExecution != null)
        T command = default(T);
        while (true)
            lock (_lockForSyncExecuting)
                if (_commandQueue.Count == 0)
                    _isExecuting = false;
                command = _commandQueue.Dequeue();
        if (_onEndCommandsExecution != null)

    public void AddCommand(T command)
        if (_isDisposed)
            throw new ObjectDisposedException(GetType().ToString());

        lock (_lockForSyncExecuting)
            if (_isDisposed)
            if (!_isExecuting)
                _isExecuting = true;
                _taskForAction = Task.Run(new Action(ExecuteCommands));
    public void ClearCommandQueue()
        if (_isDisposed)
            throw new ObjectDisposedException(GetType().ToString());

        lock (_lockForSyncExecuting)
    public void Dispose()
        if (_isDisposed)

        _isDisposed = true;
        lock (_lockForSyncExecuting)
        if (_taskForAction != null)



Ответ 1

У вас получился код в стиле .NET 1.0, который слегка эволюционировал с приходом лямбд (Delegate механически заменён на Action) и задач (new Thread().Start() механически заменён на Task.Run()), но по сути совершенно не изменился: вы TPL с async/await толком не используете, а существование concurrent коллекций вообще упустили из виду. В .NET 4.5 это всё ненужно в принципе. Если вам нужно выполнение задач строго в одном потоке, то можно просто кидать задачи в однопоточный планировщик задач (task scheduler). Собственно, на этой первой строчке весь код и заканчивается. Если нужно выполнение кода перед всеми операциями и после всех операций, то код перед и после этой строчки и пишется. Отмечу, что при этой однострочной реализации вы ещё имеете бонусом возможность использования токена отмены (cancellation token) и прочих радостей жизни. Рассмотрим пример. Допустим, надо выполнить две очереди задач: в каждой очереди задачи выполняются последовательно, но очереди выполняются параллельно. Создаём два планировщика задач с ограничением параллелизма, затем создаём задачи с указанием нужного нам планировщика. class Program { static readonly Random _rnd = new Random(); static readonly LimitedConcurrencyTaskScheduler _schedulerFoo = new LimitedConcurrencyTaskScheduler(1); static readonly LimitedConcurrencyTaskScheduler _schedulerBar = new LimitedConcurrencyTaskScheduler(1); static void Main () => new Program().Run().Wait(); async Task Run () { Task queueFoo = RunQueue("Foo", _schedulerFoo, Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Foo")))); Task queueBar = RunQueue("Bar", _schedulerBar, Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Bar")))); await Task.WhenAll(queueFoo, queueBar); Console.WriteLine("Done!"); Console.ReadKey(); } async Task RunQueue (string name, TaskScheduler scheduler, IEnumerable commands) { Console.WriteLine($"{name}: Start"); await Task.WhenAll(commands.Select(c => RunTask(c, scheduler))); Console.WriteLine($"{name}: Finish"); } async Task RunTask (Action task, TaskScheduler scheduler) { await Task.Factory.StartNew(task, CancellationToken.None, TaskCreationOptions.None, scheduler); } void Foo (string name) { int timeout = _rnd.Next(200); Console.WriteLine($"{name}: Start {timeout}"); Thread.Sleep(timeout); Console.WriteLine($"{name}: Finish {timeout}"); } } Пример вывода: Foo: Start Bar: Start Foo: Start 165 Bar: Start 50 Bar: Finish 50 Bar: Start 39 Bar: Finish 39 Bar: Start 115 Foo: Finish 165 Foo: Start 116 Bar: Finish 115 Bar: Finish Foo: Finish 116 Foo: Start 0 Foo: Finish 0 Foo: Finish Done! Хотя в этом примере все задачи и кидаются одновременно, задачи можно добавлять в любой момент. Правда тогда смысл "начала" и "конца" несколько теряется. Если вы объясните, что вы собираетесь с ними делать, тогда можно будет добавить. Здесь я воспользовался планировщиком LimitedConcurrencyTaskScheduler из примеров на MSDN: Samples for Parallel Programming with the .NET Framework (статья с описанием: ParallelExtensionsExtras Tour - #7 - Additional TaskSchedulers). /// /// Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool. /// Source: http://code.msdn.microsoft.com/ParExtSamples /// Documentation: http://blogs.msdn.com/b/pfxteam/archive/2010/04/09/9990424.aspx /// License: MS-LPL /// public class LimitedConcurrencyTaskScheduler : TaskScheduler { [ThreadStatic] private static bool _currentThreadIsProcessingItems; private readonly int _maxDegreeOfParallelism; private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the specified degree of parallelism. /// The maximum degree of parallelism provided by this scheduler. public LimitedConcurrencyTaskScheduler (int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); _maxDegreeOfParallelism = maxDegreeOfParallelism; } /// Gets the maximum concurrency level supported by this scheduler. public override sealed int MaximumConcurrencyLevel => _maxDegreeOfParallelism; protected override sealed IEnumerable GetScheduledTasks () { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } protected override sealed void QueueTask (Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } /// Informs the ThreadPool that there's work to be executed for this scheduler. private void NotifyThreadPoolOfPendingWork () { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue TryExecuteTask(item); } } finally { // We're done processing items on the current thread _currentThreadIsProcessingItems = false; } }, null); } protected override sealed bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); // Try to run the task. return TryExecuteTask(task); } protected override sealed bool TryDequeue (Task task) { lock (_tasks) return _tasks.Remove(task); } } Ещё можно воспользоваться BlockingCollection<> с ConcurrentQueue<> внутри. По окончанию добавления надо будет не забыть вызвать CompleteAdding. Ещё есть TPL Dataflow. Там параллелизмы и прочие настраиваются без кастомных планировщиков. Можно ещё Rx добавить в качестве вишенки на торте. Вариантов море. Вообще, сейчас ещё @Vlad прибежит, расскажет про хитрые мозговыносящие способы решить эту задачу. :)

Ответ 2

Вам нужен MailboxProcessor - агент для обработки сообщений, который выполняет асинхронные операции. Реализация паттерна "много писателей, один читатель".

