Страницы

Поиск по вопросам

воскресенье, 24 ноября 2019 г.

Имплементация Producer/Consumer pattern


Паттерн producer/consumer достаточно часто встречается в многопоточном программировании
Его смысл состоит в том, что один или несколько потоков производят данные, и параллельно этому один или несколько потоков потребляют их.

Как правильно имплементировать этот паттерн в популярных языках программирования
Задача сама по себе нетривиальна, поскольку включает синхронизацию между потоками, и потенциальную гонку между несколькими производителями и потребителями.



Справка.

Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».

Нетривиальность проблемы заключается в том, что потенциально как создание новых данных
так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.

Примеры:

Произведённые данные могут представлять вычислительно интенсивное задание. В это
случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления).

Или производящие потоки загружают данные из сети, а по окончанию загрузки выполняющи
потоки производит разбор загруженных данных. В этом случае разумно иметь по одному производителю на сайт и, и ограничивать число производителей, если предел доступной скорости сети исчерпан.



Этот вопрос — адаптация одноименного исследования с Хэшкода.
    


Ответы

Ответ 1



Реализация на C# Для современных версий языка (начиная с C# 4.0), имеет смысл не писать реализаци вручную, а (руководствуясь советом @Flammable), воспользоваться классом BlockingCollection, представляющим нужную функциональность. Для чтения в consumer-потоках используем просто циклы по последовательности, котору даёт GetConsumingEnumerable(). В producer-потоках пользуемся Add, и в конце не забываем CompleteAdding, чтобы consumer-потоки смогли остановиться. Пример: class Program { static public void Main() { new Program().Run(); } BlockingCollection q = new BlockingCollection(); void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) }; foreach (var t in threads) t.Start(); string s; while ((s = Console.ReadLine()).Length != 0) q.Add(s); q.CompleteAdding(); // останавливаем foreach (var t in threads) t.Join(); } void Consumer() { foreach (var s in q.GetConsumingEnumerable()) { Console.WriteLine("Processing: {0}", s); Thread.Sleep(2000); Console.WriteLine("Processed: {0}", s); } } } BlockingCollection позволяет ограничить количество элементов, так что попытк добавить элемент в переполненную очередь также может быть заблокирована до освобождения места. Заметьте, что GetConsumingEnumerable корректно работает даже в случае, когда у вас много консьюмеров. Это не так уж и очевидно. Если вы работаете со старой версией C#, вам придётся писать нужную функциональност вручную. Вы можете воспользоваться встроенным классом Monitor (который является аналогом mutex + condition variable из pthreads). public class ProducerConsumer where T : class { object mutex = new object(); Queue queue = new Queue(); bool isDead = false; public void Enqueue(T task) { if (task == null) throw new ArgumentNullException("task"); lock (mutex) { if (isDead) throw new InvalidOperationException("Queue already stopped"); queue.Enqueue(task); Monitor.Pulse(mutex); } } public T Dequeue() { lock (mutex) { while (queue.Count == 0 && !isDead) Monitor.Wait(mutex); if (queue.Count == 0) return null; return queue.Dequeue(); } } public void Stop() { lock (mutex) { isDead = true; Monitor.PulseAll(mutex); } } } Использование (аналогичный пример): class Program { static public void Main() { new Program().Run(); } ProducerConsumer q = new ProducerConsumer(); void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) }; foreach (var t in threads) t.Start(); string s; while ((s = Console.ReadLine()).Length != 0) q.Enqueue(s); q.Stop(); foreach (var t in threads) t.Join(); } void Consumer() { while (true) { string s = q.Dequeue(); if (s == null) break; Console.WriteLine("Processing: {0}", s); Thread.Sleep(2000); Console.WriteLine("Processed: {0}", s); } } }

Ответ 2



Реализация на C с библиотекой pthreads В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихс коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так: #include // объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // объявляем очередь с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу }; Теперь нам нужны процедуры добавления и извлечения заданий из очереди. void enqueue (void *data, struct producer_consumer_queue *q) { // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p; // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } void * dequeue(struct producer_consumer_queue *q) { // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; } Ещё нужна процедура для инициализации очереди: struct producer_consumer_queue * producer_consumer_queue_create() { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; } И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *q) { // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); q->is_alive = 0; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->lock); } Отлично, у нас есть всё, что нам надо. Как использовать это? Нужно: запустить несколько потоков-«производителей» и несколько «потребителей» придумать структуру данных для задания Пример: (производитель — главный поток, потребители — 2 потока) // это поток-потребитель void * consumer_thread (void *arg) { struct producer_consumer_queue *q = (typeof(q))arg; for (;;) { void *data = dequeue(q); // это сигнал, что очередь окончена if (!data) break; // значит, пора закрывать поток char *str = (char *)data; // тут наша обработка данных printf ("consuming: %s\n", str); sleep(2); printf ("consumed: %s\n", str); free(str); } return 0; } int main () { pthread_t consumer_threads[2]; void *res = 0; char *in = NULL; size_t sz; // создадим очередь: struct producer_consumer_queue *q = producer_consumer_queue_create(); // и потоки-«потребители» pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q); // главный цикл // получаем данные с клавиатуры: while (getline(&in, &sz, stdin) > 0) { enqueue(in, q); in = NULL; } producer_consumer_queue_stop(q); if (pthread_join(consumer_threads[0], &res) || pthread_join(consumer_threads[1], &res)) perror("join"); return (long)res; } Это реализация задачи с "бесконечной" очередью. На практике же иногда (или почт всегда?) более полезно ограничить размер очереди и таким образом сбалансировать скорость производителей, иногда переводя их в спящее состояние, с возможностями потребителей. Для этого немного изменим нашу producer_consumer_queue struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОС СВОБОДНОЕ МЕСТО int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt, // максимальный размер очереди и число заданий в ней pqcnt; // количество производителей, ждущих свободного места в очереди }; Добавляем pthread_cond_t condc для "засыпания/пробуждения" потоков производителей их счетчик в очереди на отправку сообщения и пару переменных, содержащих максимальный размер очереди и текущее количество заданий в ней. Соответственно меняются функции для постановки задания в очередь (enqueue), выборк его из очереди (dequeue), инициализации очереди (producer_consumer_queue_create) и ее остановки (producer_consumer_queue_stop): void enqueue (void *data, struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&aq->lock); // проверим не переполнена ли она if (q->max <= q->cnt) { q->pqcnt++; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // будем ждать пока потребители ее слегка не опустошат while(q->max <= q->cnt & q->is_alive) pthread_cond_wait(&aq->condc, &aq->lock); q->pqcnt--; asm volatile ("" : : : "memory"); } // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&aq->condp); } q->tail = p; q->cnt++; asm volatile ("" : : : "memory"); // разрешаем доступ всем снова pthread_mutex_unlock(&aq->lock); } void * dequeue(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&aq->lock); if (q->pqcnt && q->max > q->cnt) // в очереди есть место, а кто-то спит, разбудим их pthread_cond_broadcast(&aq->condc); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&aq->condp, &aq->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; q->cnt--; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // разбудим поставщиков в их очереди pthread_cond_broadcast(&aq->condc); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&aq->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; q->pqcnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->condc, 0); pthread_cond_init(&q->condp, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&aq->lock); q->is_alive = 0; asm volatile ("" : : : "memory"); pthread_cond_broadcast(&aq->condc); pthread_cond_broadcast(&aq->condp); pthread_mutex_unlock(&aq->lock); } Здесь же показан memory barrier (asm volatile ("" : : : "memory");), использование которого запрещает компилятору переупорядочивать операции чтения-записи из RAM. Данная реализация не обеспечивает "упорядоченность" производителей, ожидающих свое очереди для отправки сообщения. Т.е. поток производитель, "заснувший" первым из-за отсутствия свободного места в очереди не обязательно проснется первым. Если такое поведение нас не устраивает, то придется внести некоторые изменения наши данные, прежде всего добавив очередь поставщиков из структур producer_queue_item (которая будет частью структуры producer_consumer_queue. Получаем следующие структуры данных: // объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // струкура данных для спящего (ждущего свободного места) потока-производителя struct producer_queue_item { struct producer_queue_item *next; struct producer_consumer_queue_item *item; // данные для которых нет места pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место #if DEBUG pid_t tid; // linux thread id for debug print int signaled; // индикатор "побудки" for debug print #endif }; // объявляем очередь данных с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt; // максимальный размер очереди и число заданий в ней // очередь потоков-производителей, ждущих свободного места для своих данных struct producer_queue_item *pqhead, *pqtail; }; и реализацию основных функций: void enqueue (void *data, struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); #if DEBUG printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data)); #endif // ... и добавим новое задание туда: if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать #if DEBUG if (vq->cnt < vq->max) { puts("========================"); print_queue(q, 0); puts("========================"); } #endif struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq)); pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят pq->next = 0; pq->item = p; // сохраним данные на время сна #if DEBUG pq->tid = gettid(); #endif // поместим себя в очередь спящих производителей if (vq->pqtail) vq->pqtail->next = pq; else vq->pqhead = pq; vq->pqtail = pq; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти #if DEBUG int at = 0; // счетчик циклов пробуждения #endif do { // пойдем спать до появления свободного места в очереди данных #if DEBUG printf ("%ld prod cond wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at++, (char *)(p->data)); pq->signaled = 0; #endif pthread_cond_wait(&pq->cond, &q->lock); } while(vq->max <= vq->cnt && vq->is_alive); // проснулись и владеем очередью /* Вот тонкий момент. Порядок активизации потоков не определен, а нам надо соблюдать очередность данных. Поэтому переустановим локальные переменные из очереди, хотя это могут быть данные, положенные туда другим потоком. */ #if DEBUG if (pq != vq->pqhead) { printf ("BAAAD %ld (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); print_queue(q, 0); if (vq->is_alive) exit(1); // совсем плохо, такого быть не должно else puts("CONTINUE"); } #endif pq = vq->pqhead; // в любом случае берем голову очереди производителей if ((vq->pqhead = pq->next) == 0) // и удаляем ее vq->pqtail = 0; asm volatile ("" : : : "memory"); p = pq->item; free(pq); #if DEBUG printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); #endif } // вот тут реально кладем data в очередь для потребителей if (vq->tail) vq->tail->next = p; else { vq->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } vq->tail = p; vq->cnt++; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } #if DEBUG #define cond_signal_producer(q) ({ \ if ((q)->pqhead) { \ (q)->pqhead->signaled = 1; \ pthread_cond_signal(&(q)->pqhead->cond); \ } \ }) #else #define cond_signal_producer(q) ({ \ if ((q)->pqhead) \ pthread_cond_signal(&(q)->pqhead->cond); \ }) #endif void * dequeue(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); // если есть спящие производители, то разбудим первого cond_signal_producer(vq); while (!vq->head && vq->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = vq->head; if (p) { // и удаляем его из очереди vq->head = vq->head->next; if (!vq->head) vq->tail = vq->head; vq->cnt--; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разбудим первого поставщика в их очереди cond_signal_producer(vq); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет // согласно 7.20.3.2/2, можно не проверять на 0 free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->pqhead = q->pqtail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); vq->is_alive = 0; pthread_cond_broadcast(&q->cond); // разбудим потребителей volatile struct producer_queue_item *pq; for (pq = vq->pqhead; pq; pq = pq->next) { #if DEBUG pq->signaled = 1; asm volatile ("" : : : "memory"); #endif // будим каждого ждущего производителя pthread_cond_signal((pthread_cond_t *)&pq->cond); } pthread_mutex_unlock(&q->lock); } Все три программы (pq1.c, pq2.c и pq3.c) вместе с функцией gettid() находятся в http://pastebin.com/E23r9DZk . Для экспериментов скопируйте их в разные файлы и компилируйте, например, gcc pq3.c -pthread gettid.o

Ответ 3



Реализация на C#, библиотека Dataflow Ещё одной альтернативой является использование Майкрософтовской библиотеки Dataflow которая собственно и создана для того, чтобы управлять потоками данных. Для использовани кода в примерах, вам нужно подключить NuGet-пакет Microsoft.Tpl.Dataflow. Класс BufferBlock практически является готовым producer/consumer'ом, но, в отличие от блокирующего интерфейса BlockingCollection, он имеет async-интерфейс! Для асинхронного добавления задания поставщик может воспользоваться SendAsync. Асинхронно добавление нужно, так как очередь может быть ограниченной длины, а значит, добавление должно будет дожидаться наличия свободного места! По окончанию добавления нужно вызвать Complete. async Task ProduceSingle(ITargetBlock queue, int howmuch) { Random r = new Random(); while (howmuch-- > 0) { // эмулируем длительную работу по подготовке следующего задания // длительность выбираем случайно, чтобы задания приходили в // непредсказуемые моменты времени await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); await queue.SendAsync(v); } queue.Complete(); } Если у вас несколько поставщиков, закрывать очередь нужно лишь когда они все отработают: async Task Produce1(ITargetBlock queue, int howmuch) { Random r = new Random(); while (howmuch-- > 0) { await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); await queue.SendAsync(v); } } // функция Console.ReadLine() -- блокирующая, поэтому выполняем её асинхронно // (иначе она заблокирует вызывающий поток) // у Console нет async-интерфейса. Task ReadConsole() { // блокирующую функцию выгружаем в thread pool return Task.Run(() => Console.ReadLine()); } async Task Produce2(ITargetBlock queue) { string s; while ((s = await ReadConsole()).Length != 0) await queue.SendAsync("manual " + s); } async Task ProduceAll(ITargetBlock queue) { var p1 = Produce1(queue, 20); var p2 = Produce2(queue); await Task.WhenAll(p1, p2); queue.Complete(); } Теперь, потребитель. Если потребитель лишь один, всё просто: async Task ConsumeSingle(ISourceBlock queue) { while (await queue.OutputAvailableAsync()) Console.WriteLine(await queue.ReceiveAsync()); } Для случая нескольких потребителей использовать ReceiveAsync — неверно, так как задани может быть взято из очереди другим потребителем! Функции TryReceiveAsync тоже нету, поэтому после асинхронного выяснения того, что очередь не пуста, используем TryReceive: async Task ConsumeCooperative(IReceivableSourceBlock queue, int number) { Random r = new Random(); while (await queue.OutputAvailableAsync()) { string v; // в этой точке данные могут быть уже уйти другому потребителю if (!queue.TryReceive(out v)) continue; // продолжаем ждать // цветной вывод и прочие плюшки // мне лень синхронизировать вывод на консоль, хотя конечно это разделяемый ресурс if (Console.CursorLeft != 0) Console.WriteLine(); var savedColor = Console.ForegroundColor; Console.ForegroundColor = (ConsoleColor)(number + 8); Console.WriteLine(string.Format("{0}[{1}]: {2}", new string(' ', number * 4), number, v)); Console.ForegroundColor = savedColor; // симулируем длительную обработку результата клиентом await Task.Delay(1000 * r.Next(1, 3)); } } Task ConsumeAll(IReceivableSourceBlock queue) { var c1 = ConsumeCooperative(queue, 1); var c2 = ConsumeCooperative(queue, 2); return Task.WhenAll(c1, c2); } Осталась обвязка: class Program { static void Main(string[] args) { new Program().RunAll().Wait(); } async Task RunAll() { BufferBlock queue = new BufferBlock(); var p = ProduceAll(queue); var c = ConsumeAll(queue); await Task.WhenAll(p, c, queue.Completion); } // остальные методы } В своей статье Async Producer/Consumer Queue using Dataflow Stephen Cleary предлагае другой подход, более в духе библиотеки Dataflow. В ней заложена симметрия между блоками-источникам (ISourceBlock), блоками-приёмниками (ITargetBlock), и блоками-преобразователями (IPropagatorBlock). В соответствии с этой идеологией, мы применяем для поставщика блок-приёмник ActionBlock: Task Consume2(ISourceBlock queue, int number) { var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }; var consumer = new ActionBlock(v => ConsumeImpl2(v, number), consumerOptions); var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; queue.LinkTo(consumer, linkOptions); return consumer.Completion; } void ConsumeImpl2(string v, int number) { Console.WriteLine(string.Format("[{0}]: {1}", number, v)); Thread.Sleep(1500); } Task ConsumeAll2(ISourceBlock queue) { var c1 = Consume2(queue, 1); var c2 = Consume2(queue, 2); return Task.WhenAll(c1, c2); } Для чего нужно BoundedCapacity = 1? Дел в том, что по умолчанию ActionBlock имее «неограниченную» ёмкость, и таким образом за раз потребит все данные из очереди. Таким образом, если мы ввели ограничение на объём очереди queue = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20 }); то данные будут всё равно накапливаться в ActionBlock'е. Чтобы роль хранилища исполнялас BufferBlock'ом, а потребителя — ActionBlock, и нужно ограничить его объём. Заметьте также, что ограничение ёмкости ActionBlock'а позволяет библиотеке Dataflow балансировать нагрузку, отправляя данные свободному блоку. Заметьте, что в этом случае то, в каком контексте (пул потоков? выделенный поток? исполняется ActionBlock, управляется не через стандартный механизм async/await, а посредством TaskScheduler'а в настройках. Подход со стыковкой блоков выполнения может быстро стать слишком тяжеловесным, поэтому я советовал бы использовать его лишь для достаточно сложных задач.

Ответ 4



Реализация на C#, библиотека Dataflow ("инверсный" алгоритм) Отличие данного алгоритма от приведенного VladD "прямого" - в том, что используется очередь потребителей вместо очереди элементов. Это позволяет избавиться от цикла приема в потребителе - ценой появления знания списк потребителей поставщиком данных. Иными словами, в такой конфигурации потребители выходят не активными - а пассивными. Кроме того, сам алгоритм получается очень простым. Потребитель в таком алгоритме не имеет никакого алгоритма, это лишь класс (или интерфейс либо вовсе делегат), в котором можно вызвать метод: class Consumer { public void Consume(string str) { Console.WriteLine(str); } } На стороне поставщика алгоритм немного сложнее: BufferBlock consumers = new BufferBlock(); public async void SendToConsumer(string str) { var consumer = await consumers.ReceiveAsync(); try { consumer.Consume(str); } finally { consumers.Post(consumer); } } async Task Produce(int howmuch) { Random r = new Random(); while (howmuch-- > 0) { // эмулируем длительную работу по подготовке следующего задания // длительность выбираем случайно, чтобы задания приходили в // непредсказуемые моменты времени await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); SendToConsumer(v); } } К сожалению, это еще не все. Дело в том, что в таком виде любое исключение, возникшее в методе Consume, обрушит программу. Поэтому надо сделать одну из двух вещей: добавить в метод контракт класса Consumer требование обязательной обработки исключений; или же установить глобальный обработчик неперехваченных исключений. А еще лучше - воспользоваться сразу обоими вариантами. И еще один важный момент. Пассивность потребителя означает, в частности, что потребител будет запущен в контексте синхронизации поставщика. Иногда это может быть нежелательным - в таком случае в код SendToConsumer следует добавить вызов Task.Run. Иногда же, напротив, производителю нужно дождаться полной обработки порции данных В таком случае надо сменить возвращаемое значение у метода SendToConsumer с void на Task.

Комментариев нет:

Отправить комментарий