Паттерн producer/consumer достаточно часто встречается в многопоточном программировании
Его смысл состоит в том, что один или несколько потоков производят данные, и параллельно этому один или несколько потоков потребляют их.
Как правильно имплементировать этот паттерн в популярных языках программирования
Задача сама по себе нетривиальна, поскольку включает синхронизацию между потоками, и потенциальную гонку между несколькими производителями и потребителями.
Справка.
Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».
Нетривиальность проблемы заключается в том, что потенциально как создание новых данных
так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.
Примеры:
Произведённые данные могут представлять вычислительно интенсивное задание. В это
случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления).
Или производящие потоки загружают данные из сети, а по окончанию загрузки выполняющи
потоки производит разбор загруженных данных. В этом случае разумно иметь по одному производителю на сайт и, и ограничивать число производителей, если предел доступной скорости сети исчерпан.
Этот вопрос — адаптация одноименного исследования с Хэшкода.
Ответы
Ответ 1
Реализация на C#
Для современных версий языка (начиная с C# 4.0), имеет смысл не писать реализаци
вручную, а (руководствуясь советом @Flammable), воспользоваться классом BlockingCollection, представляющим нужную функциональность.
Для чтения в consumer-потоках используем просто циклы по последовательности, котору
даёт GetConsumingEnumerable(). В producer-потоках пользуемся Add, и в конце не забываем CompleteAdding, чтобы consumer-потоки смогли остановиться.
Пример:
class Program
{
static public void Main()
{
new Program().Run();
}
BlockingCollection q = new BlockingCollection();
void Run()
{
var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
foreach (var t in threads)
t.Start();
string s;
while ((s = Console.ReadLine()).Length != 0)
q.Add(s);
q.CompleteAdding(); // останавливаем
foreach (var t in threads)
t.Join();
}
void Consumer()
{
foreach (var s in q.GetConsumingEnumerable())
{
Console.WriteLine("Processing: {0}", s);
Thread.Sleep(2000);
Console.WriteLine("Processed: {0}", s);
}
}
}
BlockingCollection позволяет ограничить количество элементов, так что попытк
добавить элемент в переполненную очередь также может быть заблокирована до освобождения места.
Заметьте, что GetConsumingEnumerable корректно работает даже в случае, когда у вас много консьюмеров. Это не так уж и очевидно.
Если вы работаете со старой версией C#, вам придётся писать нужную функциональност
вручную. Вы можете воспользоваться встроенным классом Monitor (который является аналогом mutex + condition variable из pthreads).
public class ProducerConsumer where T : class
{
object mutex = new object();
Queue queue = new Queue();
bool isDead = false;
public void Enqueue(T task)
{
if (task == null)
throw new ArgumentNullException("task");
lock (mutex)
{
if (isDead)
throw new InvalidOperationException("Queue already stopped");
queue.Enqueue(task);
Monitor.Pulse(mutex);
}
}
public T Dequeue()
{
lock (mutex)
{
while (queue.Count == 0 && !isDead)
Monitor.Wait(mutex);
if (queue.Count == 0)
return null;
return queue.Dequeue();
}
}
public void Stop()
{
lock (mutex)
{
isDead = true;
Monitor.PulseAll(mutex);
}
}
}
Использование (аналогичный пример):
class Program
{
static public void Main()
{
new Program().Run();
}
ProducerConsumer q = new ProducerConsumer();
void Run()
{
var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
foreach (var t in threads)
t.Start();
string s;
while ((s = Console.ReadLine()).Length != 0)
q.Enqueue(s);
q.Stop();
foreach (var t in threads)
t.Join();
}
void Consumer()
{
while (true)
{
string s = q.Dequeue();
if (s == null)
break;
Console.WriteLine("Processing: {0}", s);
Thread.Sleep(2000);
Console.WriteLine("Processed: {0}", s);
}
}
}
Ответ 2
Реализация на C с библиотекой pthreads
В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихс
коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так:
#include
// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
struct producer_consumer_queue_item *next;
// здесь идут собственно данные. вы можете поменять этот кусок,
// использовав структуру, более специфичную для вашей задачи
void *data;
};
// объявляем очередь с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
struct producer_consumer_queue_item *head, *tail;
// head == tail == 0, если очередь пуста
pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
int is_alive; // показывает, не закончила ли очередь свою работу
};
Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
void
enqueue (void *data, struct producer_consumer_queue *q)
{
// упакуем задание в новую структуру
struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
p->data = data;
p->next = 0;
// получим "эксклюзивный" доступ к очереди заданий
pthread_mutex_lock(&q->lock);
// ... и добавим новое задание туда:
if (q->tail)
q->tail->next = p;
else {
q->head = p;
// очередь была пуста, а теперь нет -- надо разбудить потребителей
pthread_cond_broadcast(&q->cond);
}
q->tail = p;
// разрешаем доступ всем снова
pthread_mutex_unlock(&q->lock);
}
void *
dequeue(struct producer_consumer_queue *q)
{
// получаем эксклюзивный доступ к очереди:
pthread_mutex_lock(&q->lock);
while (!q->head && q->is_alive) {
// очередь пуста, делать нечего, ждем...
pthread_cond_wait(&q->cond, &q->lock);
// wait разрешает доступ другим на время ожидания
}
// запоминаем текущий элемент или 0, если очередь умерла
struct producer_consumer_queue_item *p = q->head;
if (p)
{
// и удаляем его из очереди
q->head = q->head->next;
if (!q->head)
q->tail = q->head;
}
// возвращаем эксклюзивный доступ другим участникам
pthread_mutex_unlock(&q->lock);
// отдаём данные
void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
free(p);
return data;
}
Ещё нужна процедура для инициализации очереди:
struct producer_consumer_queue *
producer_consumer_queue_create()
{
struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
q->head = q->tail = 0;
q->is_alive = 1;
pthread_mutex_init(&q->lock, 0);
pthread_cond_init(&q->cond, 0);
return q;
}
И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
// для обращения к разделяемым переменным необходим эксклюзивный доступ
pthread_mutex_lock(&q->lock);
q->is_alive = 0;
pthread_cond_broadcast(&q->cond);
pthread_mutex_unlock(&q->lock);
}
Отлично, у нас есть всё, что нам надо.
Как использовать это? Нужно:
запустить несколько потоков-«производителей» и несколько «потребителей»
придумать структуру данных для задания
Пример: (производитель — главный поток, потребители — 2 потока)
// это поток-потребитель
void *
consumer_thread (void *arg)
{
struct producer_consumer_queue *q = (typeof(q))arg;
for (;;) {
void *data = dequeue(q);
// это сигнал, что очередь окончена
if (!data)
break; // значит, пора закрывать поток
char *str = (char *)data;
// тут наша обработка данных
printf ("consuming: %s\n", str);
sleep(2);
printf ("consumed: %s\n", str);
free(str);
}
return 0;
}
int
main ()
{
pthread_t consumer_threads[2];
void *res = 0;
char *in = NULL;
size_t sz;
// создадим очередь:
struct producer_consumer_queue *q = producer_consumer_queue_create();
// и потоки-«потребители»
pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
// главный цикл
// получаем данные с клавиатуры:
while (getline(&in, &sz, stdin) > 0) {
enqueue(in, q);
in = NULL;
}
producer_consumer_queue_stop(q);
if (pthread_join(consumer_threads[0], &res) ||
pthread_join(consumer_threads[1], &res))
perror("join");
return (long)res;
}
Это реализация задачи с "бесконечной" очередью. На практике же иногда (или почт
всегда?) более полезно ограничить размер очереди и таким образом сбалансировать скорость производителей, иногда переводя их в спящее состояние, с возможностями потребителей.
Для этого немного изменим нашу producer_consumer_queue
struct producer_consumer_queue {
struct producer_consumer_queue_item *head, *tail;
// head == tail == 0, если очередь пуста
pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОС
СВОБОДНОЕ МЕСТО
int is_alive; // показывает, не закончила ли очередь свою работу
int max, cnt, // максимальный размер очереди и число заданий в ней
pqcnt; // количество производителей, ждущих свободного места в очереди
};
Добавляем pthread_cond_t condc для "засыпания/пробуждения" потоков производителей
их счетчик в очереди на отправку сообщения и пару переменных, содержащих максимальный размер очереди и текущее количество заданий в ней.
Соответственно меняются функции для постановки задания в очередь (enqueue), выборк
его из очереди (dequeue), инициализации очереди (producer_consumer_queue_create) и ее остановки (producer_consumer_queue_stop):
void
enqueue (void *data, struct producer_consumer_queue *aq)
{
volatile struct producer_consumer_queue *q = aq;
// упакуем задание в новую структуру
struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
p->data = data;
p->next = 0;
// получим "эксклюзивный" доступ к очереди заданий
pthread_mutex_lock(&aq->lock);
// проверим не переполнена ли она
if (q->max <= q->cnt) {
q->pqcnt++;
asm volatile ("" : : : "memory");
// зафиксируем изменения очереди в памяти
// будем ждать пока потребители ее слегка не опустошат
while(q->max <= q->cnt & q->is_alive)
pthread_cond_wait(&aq->condc, &aq->lock);
q->pqcnt--;
asm volatile ("" : : : "memory");
}
// ... и добавим новое задание туда:
if (q->tail)
q->tail->next = p;
else {
q->head = p;
// очередь была пуста, а теперь нет -- надо разбудить потребителей
pthread_cond_broadcast(&aq->condp);
}
q->tail = p;
q->cnt++;
asm volatile ("" : : : "memory");
// разрешаем доступ всем снова
pthread_mutex_unlock(&aq->lock);
}
void *
dequeue(struct producer_consumer_queue *aq)
{
volatile struct producer_consumer_queue *q = aq;
// получаем эксклюзивный доступ к очереди:
pthread_mutex_lock(&aq->lock);
if (q->pqcnt && q->max > q->cnt)
// в очереди есть место, а кто-то спит, разбудим их
pthread_cond_broadcast(&aq->condc);
while (!q->head && q->is_alive) {
// очередь пуста, делать нечего, ждем...
pthread_cond_wait(&aq->condp, &aq->lock);
// wait разрешает доступ другим на время ожидания
}
// запоминаем текущий элемент или 0, если очередь умерла
struct producer_consumer_queue_item *p = q->head;
if (p) {
// и удаляем его из очереди
q->head = q->head->next;
if (!q->head)
q->tail = q->head;
q->cnt--;
asm volatile ("" : : : "memory");
// зафиксируем изменения очереди в памяти
// разбудим поставщиков в их очереди
pthread_cond_broadcast(&aq->condc);
}
// возвращаем эксклюзивный доступ другим участникам
pthread_mutex_unlock(&aq->lock);
// отдаём данные
void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
free(p);
return data;
}
struct producer_consumer_queue *
producer_consumer_queue_create(int max)
{
struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
q->head = q->tail = 0;
q->is_alive = 1;
q->max = max;
q->cnt = 0;
q->pqcnt = 0;
pthread_mutex_init(&q->lock, 0);
pthread_cond_init(&q->condc, 0);
pthread_cond_init(&q->condp, 0);
return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *aq)
{
volatile struct producer_consumer_queue *q = aq;
// для обращения к разделяемым переменным необходим эксклюзивный доступ
pthread_mutex_lock(&aq->lock);
q->is_alive = 0;
asm volatile ("" : : : "memory");
pthread_cond_broadcast(&aq->condc);
pthread_cond_broadcast(&aq->condp);
pthread_mutex_unlock(&aq->lock);
}
Здесь же показан memory barrier (asm volatile ("" : : : "memory");), использование которого запрещает компилятору переупорядочивать операции чтения-записи из RAM.
Данная реализация не обеспечивает "упорядоченность" производителей, ожидающих свое
очереди для отправки сообщения. Т.е. поток производитель, "заснувший" первым из-за отсутствия свободного места в очереди не обязательно проснется первым.
Если такое поведение нас не устраивает, то придется внести некоторые изменения
наши данные, прежде всего добавив очередь поставщиков из структур producer_queue_item (которая будет частью структуры producer_consumer_queue.
Получаем следующие структуры данных:
// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
struct producer_consumer_queue_item *next;
// здесь идут собственно данные. вы можете поменять этот кусок,
// использовав структуру, более специфичную для вашей задачи
void *data;
};
// струкура данных для спящего (ждущего свободного места) потока-производителя
struct producer_queue_item {
struct producer_queue_item *next;
struct producer_consumer_queue_item *item; // данные для которых нет места
pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место
#if DEBUG
pid_t tid; // linux thread id for debug print
int signaled; // индикатор "побудки" for debug print
#endif
};
// объявляем очередь данных с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
struct producer_consumer_queue_item *head, *tail;
// head == tail == 0, если очередь пуста
pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
int is_alive; // показывает, не закончила ли очередь свою работу
int max, cnt; // максимальный размер очереди и число заданий в ней
// очередь потоков-производителей, ждущих свободного места для своих данных
struct producer_queue_item *pqhead,
*pqtail;
};
и реализацию основных функций:
void
enqueue (void *data, struct producer_consumer_queue *q)
{
volatile struct producer_consumer_queue *vq = q;
// упакуем задание в новую структуру
struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
p->data = data;
p->next = 0;
// получим "эксклюзивный" доступ к очереди заданий
pthread_mutex_lock(&q->lock);
#if DEBUG
printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
#endif
// ... и добавим новое задание туда:
if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
#if DEBUG
if (vq->cnt < vq->max) {
puts("========================");
print_queue(q, 0);
puts("========================");
}
#endif
struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят
pq->next = 0;
pq->item = p; // сохраним данные на время сна
#if DEBUG
pq->tid = gettid();
#endif
// поместим себя в очередь спящих производителей
if (vq->pqtail)
vq->pqtail->next = pq;
else
vq->pqhead = pq;
vq->pqtail = pq;
asm volatile ("" : : : "memory");
// зафиксируем изменения очереди в памяти
#if DEBUG
int at = 0; // счетчик циклов пробуждения
#endif
do { // пойдем спать до появления свободного места в очереди данных
#if DEBUG
printf ("%ld prod cond wait (cnt: %d at: %d) %s",
(long)gettid(), vq->cnt, at++, (char *)(p->data));
pq->signaled = 0;
#endif
pthread_cond_wait(&pq->cond, &q->lock);
} while(vq->max <= vq->cnt && vq->is_alive);
// проснулись и владеем очередью
/*
Вот тонкий момент. Порядок активизации потоков не определен,
а нам надо соблюдать очередность данных.
Поэтому переустановим локальные переменные из очереди,
хотя это могут быть данные, положенные туда другим потоком.
*/
#if DEBUG
if (pq != vq->pqhead) {
printf ("BAAAD %ld (cnt: %d at: %d) %s",
(long)gettid(), vq->cnt, at, (char *)(p->data));
print_queue(q, 0);
if (vq->is_alive)
exit(1); // совсем плохо, такого быть не должно
else
puts("CONTINUE");
}
#endif
pq = vq->pqhead; // в любом случае берем голову очереди производителей
if ((vq->pqhead = pq->next) == 0) // и удаляем ее
vq->pqtail = 0;
asm volatile ("" : : : "memory");
p = pq->item;
free(pq);
#if DEBUG
printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s",
(long)gettid(), vq->cnt, at, (char *)(p->data));
#endif
}
// вот тут реально кладем data в очередь для потребителей
if (vq->tail)
vq->tail->next = p;
else {
vq->head = p;
// очередь была пуста, а теперь нет -- надо разбудить потребителей
pthread_cond_broadcast(&q->cond);
}
vq->tail = p;
vq->cnt++;
asm volatile ("" : : : "memory");
// сбросим изменения очереди в память
// разрешаем доступ всем снова
pthread_mutex_unlock(&q->lock);
}
#if DEBUG
#define cond_signal_producer(q) ({ \
if ((q)->pqhead) { \
(q)->pqhead->signaled = 1; \
pthread_cond_signal(&(q)->pqhead->cond); \
} \
})
#else
#define cond_signal_producer(q) ({ \
if ((q)->pqhead) \
pthread_cond_signal(&(q)->pqhead->cond); \
})
#endif
void *
dequeue(struct producer_consumer_queue *q)
{
volatile struct producer_consumer_queue *vq = q;
// получаем эксклюзивный доступ к очереди:
pthread_mutex_lock(&q->lock);
// если есть спящие производители, то разбудим первого
cond_signal_producer(vq);
while (!vq->head && vq->is_alive) {
// очередь пуста, делать нечего, ждем...
pthread_cond_wait(&q->cond, &q->lock);
// wait разрешает доступ другим на время ожидания
}
// запоминаем текущий элемент или 0, если очередь умерла
struct producer_consumer_queue_item *p = vq->head;
if (p) {
// и удаляем его из очереди
vq->head = vq->head->next;
if (!vq->head)
vq->tail = vq->head;
vq->cnt--;
asm volatile ("" : : : "memory");
// сбросим изменения очереди в память
// разбудим первого поставщика в их очереди
cond_signal_producer(vq);
}
// возвращаем эксклюзивный доступ другим участникам
pthread_mutex_unlock(&q->lock);
// отдаём данные
void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
// согласно 7.20.3.2/2, можно не проверять на 0
free(p);
return data;
}
struct producer_consumer_queue *
producer_consumer_queue_create(int max)
{
struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
q->head = q->tail = 0;
q->pqhead = q->pqtail = 0;
q->is_alive = 1;
q->max = max;
q->cnt = 0;
pthread_mutex_init(&q->lock, 0);
pthread_cond_init(&q->cond, 0);
return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
volatile struct producer_consumer_queue *vq = q;
// для обращения к разделяемым переменным необходим эксклюзивный доступ
pthread_mutex_lock(&q->lock);
vq->is_alive = 0;
pthread_cond_broadcast(&q->cond); // разбудим потребителей
volatile struct producer_queue_item *pq;
for (pq = vq->pqhead; pq; pq = pq->next) {
#if DEBUG
pq->signaled = 1;
asm volatile ("" : : : "memory");
#endif
// будим каждого ждущего производителя
pthread_cond_signal((pthread_cond_t *)&pq->cond);
}
pthread_mutex_unlock(&q->lock);
}
Все три программы (pq1.c, pq2.c и pq3.c) вместе с функцией gettid() находятся в http://pastebin.com/E23r9DZk . Для экспериментов скопируйте их в разные файлы и компилируйте, например, gcc pq3.c -pthread gettid.o
Ответ 3
Реализация на C#, библиотека Dataflow
Ещё одной альтернативой является использование Майкрософтовской библиотеки Dataflow
которая собственно и создана для того, чтобы управлять потоками данных. Для использовани
кода в примерах, вам нужно подключить NuGet-пакет Microsoft.Tpl.Dataflow. Класс BufferBlock практически является готовым producer/consumer'ом, но, в отличие от блокирующего интерфейса BlockingCollection, он имеет async-интерфейс!
Для асинхронного добавления задания поставщик может воспользоваться SendAsync. Асинхронно
добавление нужно, так как очередь может быть ограниченной длины, а значит, добавление должно будет дожидаться наличия свободного места! По окончанию добавления нужно вызвать Complete.
async Task ProduceSingle(ITargetBlock queue, int howmuch)
{
Random r = new Random();
while (howmuch-- > 0)
{
// эмулируем длительную работу по подготовке следующего задания
// длительность выбираем случайно, чтобы задания приходили в
// непредсказуемые моменты времени
await Task.Delay(1000 * r.Next(1, 3));
var v = string.Format("automatic {0}", r.Next(1, 10));
await queue.SendAsync(v);
}
queue.Complete();
}
Если у вас несколько поставщиков, закрывать очередь нужно лишь когда они все отработают:
async Task Produce1(ITargetBlock queue, int howmuch)
{
Random r = new Random();
while (howmuch-- > 0)
{
await Task.Delay(1000 * r.Next(1, 3));
var v = string.Format("automatic {0}", r.Next(1, 10));
await queue.SendAsync(v);
}
}
// функция Console.ReadLine() -- блокирующая, поэтому выполняем её асинхронно
// (иначе она заблокирует вызывающий поток)
// у Console нет async-интерфейса.
Task ReadConsole()
{
// блокирующую функцию выгружаем в thread pool
return Task.Run(() => Console.ReadLine());
}
async Task Produce2(ITargetBlock queue)
{
string s;
while ((s = await ReadConsole()).Length != 0)
await queue.SendAsync("manual " + s);
}
async Task ProduceAll(ITargetBlock queue)
{
var p1 = Produce1(queue, 20);
var p2 = Produce2(queue);
await Task.WhenAll(p1, p2);
queue.Complete();
}
Теперь, потребитель. Если потребитель лишь один, всё просто:
async Task ConsumeSingle(ISourceBlock queue)
{
while (await queue.OutputAvailableAsync())
Console.WriteLine(await queue.ReceiveAsync());
}
Для случая нескольких потребителей использовать ReceiveAsync — неверно, так как задани
может быть взято из очереди другим потребителем! Функции TryReceiveAsync тоже нету, поэтому после асинхронного выяснения того, что очередь не пуста, используем TryReceive:
async Task ConsumeCooperative(IReceivableSourceBlock queue, int number)
{
Random r = new Random();
while (await queue.OutputAvailableAsync())
{
string v;
// в этой точке данные могут быть уже уйти другому потребителю
if (!queue.TryReceive(out v))
continue; // продолжаем ждать
// цветной вывод и прочие плюшки
// мне лень синхронизировать вывод на консоль, хотя конечно это разделяемый ресурс
if (Console.CursorLeft != 0)
Console.WriteLine();
var savedColor = Console.ForegroundColor;
Console.ForegroundColor = (ConsoleColor)(number + 8);
Console.WriteLine(string.Format("{0}[{1}]: {2}",
new string(' ', number * 4), number, v));
Console.ForegroundColor = savedColor;
// симулируем длительную обработку результата клиентом
await Task.Delay(1000 * r.Next(1, 3));
}
}
Task ConsumeAll(IReceivableSourceBlock queue)
{
var c1 = ConsumeCooperative(queue, 1);
var c2 = ConsumeCooperative(queue, 2);
return Task.WhenAll(c1, c2);
}
Осталась обвязка:
class Program
{
static void Main(string[] args)
{
new Program().RunAll().Wait();
}
async Task RunAll()
{
BufferBlock queue = new BufferBlock();
var p = ProduceAll(queue);
var c = ConsumeAll(queue);
await Task.WhenAll(p, c, queue.Completion);
}
// остальные методы
}
В своей статье Async Producer/Consumer Queue using Dataflow Stephen Cleary предлагае
другой подход, более в духе библиотеки Dataflow. В ней заложена симметрия между блоками-источникам
(ISourceBlock), блоками-приёмниками (ITargetBlock), и блоками-преобразователями (IPropagatorBlock). В соответствии с этой идеологией, мы применяем для поставщика блок-приёмник ActionBlock:
Task Consume2(ISourceBlock queue, int number)
{
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
var consumer = new ActionBlock(v => ConsumeImpl2(v, number), consumerOptions);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
queue.LinkTo(consumer, linkOptions);
return consumer.Completion;
}
void ConsumeImpl2(string v, int number)
{
Console.WriteLine(string.Format("[{0}]: {1}", number, v));
Thread.Sleep(1500);
}
Task ConsumeAll2(ISourceBlock queue)
{
var c1 = Consume2(queue, 1);
var c2 = Consume2(queue, 2);
return Task.WhenAll(c1, c2);
}
Для чего нужно BoundedCapacity = 1? Дел в том, что по умолчанию ActionBlock имее
«неограниченную» ёмкость, и таким образом за раз потребит все данные из очереди. Таким образом, если мы ввели ограничение на объём очереди
queue = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20 });
то данные будут всё равно накапливаться в ActionBlock'е. Чтобы роль хранилища исполнялас
BufferBlock'ом, а потребителя — ActionBlock, и нужно ограничить его объём. Заметьте также, что ограничение ёмкости ActionBlock'а позволяет библиотеке Dataflow балансировать нагрузку, отправляя данные свободному блоку.
Заметьте, что в этом случае то, в каком контексте (пул потоков? выделенный поток?
исполняется ActionBlock, управляется не через стандартный механизм async/await, а посредством TaskScheduler'а в настройках.
Подход со стыковкой блоков выполнения может быстро стать слишком тяжеловесным, поэтому я советовал бы использовать его лишь для достаточно сложных задач.
Ответ 4
Реализация на C#, библиотека Dataflow ("инверсный" алгоритм)
Отличие данного алгоритма от приведенного VladD "прямого" - в том, что используется очередь потребителей вместо очереди элементов.
Это позволяет избавиться от цикла приема в потребителе - ценой появления знания списк
потребителей поставщиком данных. Иными словами, в такой конфигурации потребители выходят не активными - а пассивными.
Кроме того, сам алгоритм получается очень простым.
Потребитель в таком алгоритме не имеет никакого алгоритма, это лишь класс (или интерфейс либо вовсе делегат), в котором можно вызвать метод:
class Consumer
{
public void Consume(string str)
{
Console.WriteLine(str);
}
}
На стороне поставщика алгоритм немного сложнее:
BufferBlock consumers = new BufferBlock();
public async void SendToConsumer(string str)
{
var consumer = await consumers.ReceiveAsync();
try
{
consumer.Consume(str);
}
finally
{
consumers.Post(consumer);
}
}
async Task Produce(int howmuch)
{
Random r = new Random();
while (howmuch-- > 0)
{
// эмулируем длительную работу по подготовке следующего задания
// длительность выбираем случайно, чтобы задания приходили в
// непредсказуемые моменты времени
await Task.Delay(1000 * r.Next(1, 3));
var v = string.Format("automatic {0}", r.Next(1, 10));
SendToConsumer(v);
}
}
К сожалению, это еще не все. Дело в том, что в таком виде любое исключение, возникшее в методе Consume, обрушит программу.
Поэтому надо сделать одну из двух вещей:
добавить в метод контракт класса Consumer требование обязательной обработки исключений;
или же установить глобальный обработчик неперехваченных исключений.
А еще лучше - воспользоваться сразу обоими вариантами.
И еще один важный момент. Пассивность потребителя означает, в частности, что потребител
будет запущен в контексте синхронизации поставщика. Иногда это может быть нежелательным - в таком случае в код SendToConsumer следует добавить вызов Task.Run.
Иногда же, напротив, производителю нужно дождаться полной обработки порции данных
В таком случае надо сменить возвращаемое значение у метода SendToConsumer с void на Task.
Комментариев нет:
Отправить комментарий