Страницы

Поиск по вопросам

вторник, 7 января 2020 г.

Thread-safe очередь, и все-все-все

#очередь #c #многопоточность #cpp


Опять у меня идиотский вопрос (простите, но впадаю в старческий маразм, "бабушка
ничего не помнит").
Дано: поток данных поступает в очередь (com-порт). Другой поток разгребает эту очередь,
складывает разобраные данные в базу. Всё в рамках одного приложения.
Вопрос: что юзать в качестве очереди на C (возможно, C++)    


Ответы

Ответ 1



Видимо что-то в таком духе (увидел знакомую тему и не удержался). #include #include #include #include #include struct qitem { struct qitem *next; int len; char data[1]; // реально здесь будет len+1 байт данных }; struct queue { struct qitem *head, *tail; // head == tail == 0 очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим" когда очередь стала НЕ ПУСТОЙ pthread_t th; // tid обработчика }; void inqueue (const char *str, int len, struct queue *q) { struct qitem *p = (typeof(p))malloc(sizeof(*p) + len); strcpy(p->data, str); p->len = len; p->next = 0; pthread_mutex_lock(&q->lock); if (q->tail) q->tail->next = p; else { q->head = p; pthread_cond_broadcast(&q->cond); // теперь очередь не пуста, сигнализируем } q->tail = p; pthread_mutex_unlock(&q->lock); } void processit (struct qitem *p) { int t = atoi(p->data); if (t > 0) { printf ("Sleep %d\n", t); sleep(t); } } // обработчик очереди в отдельном потоке void * consumer (void *arg) { struct queue *q = (typeof(q))arg; for (;;) { pthread_mutex_lock(&q->lock); if (!q->head) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); //pthread_mutex_unlock(&q->lock); // это для нескольких обработчиков очереди //continue; // в нашем случае не нужно // для нескольких "concurrent" обработчиков нужна последовательность // lock/check/wait/unlock/continue/lock/check... } struct qitem *p = q->head; q->head = q->head->next; q->nitems--; if (!q->head) q->tail = q->head; pthread_mutex_unlock(&q->lock); printf ("consume: %s", p->data); if (strcmp(p->data, "STOP") == 0) break; processit(p); free(p); } return 0; } // сделаем пустую очередь и запустим ее обработчик в новом потоке struct queue * run_consumer() { struct queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->nitems = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); pthread_create(&q->th, 0, consumer, (void *)q); return q; } int main () { void *res = 0; char *in = NULL; size_t sz; int l; struct queue *q = run_consumer(); while ((l = getline(&in, &sz, stdin)) > 0) { inqueue(in, l, q); in = NULL; } inqueue("STOP", 5, q); if (pthread_join(q->th, &res)) perror("join"); return (long)res; } Для тестирования просто читаем строки с клавиатуры, и ставим их в очередь. Обрааботчик их печатает. Если в начале строки число, то обработчик делает sleep, позволяя написать в очередь несколько строк.

Ответ 2



Я бы использовал просто queue. Поскольку у вас доступ к очереди из нескольких потоков, вам нужно синхронизировать обращения к очереди. Для синхронизации подошёл бы std::mutex, если он доступен на вашем компиляторе. В любом случае, какая-то синхронизация вам всё равно нужна, т.к. в её отсутствие у разных потоков может быть разное представление о содержимом памяти (например потому, что в многопроцессорной системе каждый процессор сбрасывает кэш независимо).

Ответ 3



Используйте boost::lockfree::queue

Комментариев нет:

Отправить комментарий