Мне потребовалась очередь команд, где множество потоков может добавлять команды на выполнение и один поток по очереди их выполняет. Тк я не знаю какие-либо стандартные реализации подобного, то пришлось делать самому. Можете оценить мой класс для этого, тк у меня сомнения на счет его качества, хотя он работает. И если кто-либо даст ссылку на реализации подобного, то тоже будет приятно.
public class ConcurrentCommandsQueue
private readonly Action
private volatile bool _isDisposed;
private volatile bool _isExecuting;
private volatile Task _taskForAction;
public bool IsExecuting
{
get { return _isExecuting; }
}
public ConcurrentCommandsQueue(Action
_isDisposed = false;
_isExecuting = false;
_executeCommand = executeCommand;
_onStartCommandsExecution = onStartCommandsExecution;
_onEndCommandsExecution = onEndCommandsExecution;
_taskForAction = new TaskCompletionSource
private void ExecuteCommands()
{
if (_onStartCommandsExecution != null)
_onStartCommandsExecution();
T command = default(T);
while (true)
{
lock (_lockForSyncExecuting)
{
if (_commandQueue.Count == 0)
{
_isExecuting = false;
break;
}
command = _commandQueue.Dequeue();
}
_executeCommand(command);
}
if (_onEndCommandsExecution != null)
_onEndCommandsExecution(command);
}
public void AddCommand(T command)
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().ToString());
lock (_lockForSyncExecuting)
{
if (_isDisposed)
return;
_commandQueue.Enqueue(command);
if (!_isExecuting)
{
_isExecuting = true;
_taskForAction = Task.Run(new Action(ExecuteCommands));
}
}
}
public void ClearCommandQueue()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().ToString());
lock (_lockForSyncExecuting)
_commandQueue.Clear();
}
public void Dispose()
{
if (_isDisposed)
return;
_isDisposed = true;
lock (_lockForSyncExecuting)
_commandQueue.Clear();
if (_taskForAction != null)
_taskForAction.Wait();
}
}
Ответ
У вас получился код в стиле .NET 1.0, который слегка эволюционировал с приходом лямбд (Delegate механически заменён на Action) и задач (new Thread().Start() механически заменён на Task.Run()), но по сути совершенно не изменился: вы TPL с async/await толком не используете, а существование concurrent коллекций вообще упустили из виду.
В .NET 4.5 это всё ненужно в принципе. Если вам нужно выполнение задач строго в одном потоке, то можно просто кидать задачи в однопоточный планировщик задач (task scheduler). Собственно, на этой первой строчке весь код и заканчивается. Если нужно выполнение кода перед всеми операциями и после всех операций, то код перед и после этой строчки и пишется. Отмечу, что при этой однострочной реализации вы ещё имеете бонусом возможность использования токена отмены (cancellation token) и прочих радостей жизни.
Рассмотрим пример. Допустим, надо выполнить две очереди задач: в каждой очереди задачи выполняются последовательно, но очереди выполняются параллельно. Создаём два планировщика задач с ограничением параллелизма, затем создаём задачи с указанием нужного нам планировщика.
class Program
{
static readonly Random _rnd = new Random();
static readonly LimitedConcurrencyTaskScheduler _schedulerFoo =
new LimitedConcurrencyTaskScheduler(1);
static readonly LimitedConcurrencyTaskScheduler _schedulerBar =
new LimitedConcurrencyTaskScheduler(1);
static void Main () => new Program().Run().Wait();
async Task Run ()
{
Task queueFoo = RunQueue("Foo", _schedulerFoo,
Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Foo"))));
Task queueBar = RunQueue("Bar", _schedulerBar,
Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Bar"))));
await Task.WhenAll(queueFoo, queueBar);
Console.WriteLine("Done!");
Console.ReadKey();
}
async Task RunQueue (string name, TaskScheduler scheduler,
IEnumerable
async Task RunTask (Action task, TaskScheduler scheduler)
{
await Task.Factory.StartNew(task, CancellationToken.None,
TaskCreationOptions.None, scheduler);
}
void Foo (string name)
{
int timeout = _rnd.Next(200);
Console.WriteLine($"{name}: Start {timeout}");
Thread.Sleep(timeout);
Console.WriteLine($"{name}: Finish {timeout}");
}
}
Пример вывода:
Foo: Start
Bar: Start
Foo: Start 165
Bar: Start 50
Bar: Finish 50
Bar: Start 39
Bar: Finish 39
Bar: Start 115
Foo: Finish 165
Foo: Start 116
Bar: Finish 115
Bar: Finish
Foo: Finish 116
Foo: Start 0
Foo: Finish 0
Foo: Finish
Done!
Хотя в этом примере все задачи и кидаются одновременно, задачи можно добавлять в любой момент. Правда тогда смысл "начала" и "конца" несколько теряется. Если вы объясните, что вы собираетесь с ними делать, тогда можно будет добавить.
Здесь я воспользовался планировщиком LimitedConcurrencyTaskScheduler из примеров на MSDN: Samples for Parallel Programming with the .NET Framework (статья с описанием: ParallelExtensionsExtras Tour - #7 - Additional TaskSchedulers).
///
private readonly int _maxDegreeOfParallelism;
private readonly LinkedList
///
///
protected override sealed IEnumerable
protected override sealed void QueueTask (Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks) {
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) {
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
///
protected override sealed bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems)
return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
TryDequeue(task);
// Try to run the task.
return TryExecuteTask(task);
}
protected override sealed bool TryDequeue (Task task)
{
lock (_tasks)
return _tasks.Remove(task);
}
}
Ещё можно воспользоваться BlockingCollection<> с ConcurrentQueue<> внутри. По окончанию добавления надо будет не забыть вызвать CompleteAdding
Ещё есть TPL Dataflow. Там параллелизмы и прочие настраиваются без кастомных планировщиков.
Можно ещё Rx добавить в качестве вишенки на торте.
Вариантов море.
Вообще, сейчас ещё @Vlad прибежит, расскажет про хитрые мозговыносящие способы решить эту задачу. :)
Комментариев нет:
Отправить комментарий