Страницы

Поиск по вопросам

вторник, 28 января 2020 г.

Почему программа, написанная с потоками, работает гораздо медленнее обычной?

#linux #c #многопоточность


Программа сравнивает строки и выводит процент схожести

Без потоков:

#include 
#include 
#include 

static int res = 0;
static int col_words = 0;

int process(char *s, char *S2)
{ 
    char *instr;
    int col_in = 0;
    double proc; 
    instr = strstr(S2,s); 
    col_words++;
    if(instr!=NULL)
    {
     res += strlen(s);
    }
    else res += 0; 
    proc = (res+col_words-1)*100/strlen(S2);
    printf("%.1lf\n",proc);  

}

int main()
{

char S1[2000] = "aaa bvbb cc bb gg sdsdf", S2[2000] = "aaa bvbb";
    char sp[10]=" ";
    char *istr;
    //int res = 0, col_words=0, tmp;
    double proc; 

    clock_t time;
    time = clock();

    if(strlen(S1)<=strlen(S2))
    {
    istr = strtok(S1,sp);
    while(istr != NULL)
    {
       process(istr,S2);
       istr = strtok(NULL,sp);
       //col_words++;
    } 
    //proc = (res+col_words-1)*100/strlen(S2);
    }

    //printf("%.1lf\n",proc);  

    time = clock() - time;
    printf("time - %f\n", (double)time/CLOCKS_PER_SEC); 

    return 0;

}


С потоками:

#include 
#include 
#include 
#include 
#include 
#include 

pthread_mutex_t mtx; 
static int res = 0;
static int col_words = 0;

typedef struct{
   char slovo[20];
   char str[2000];
} pthrData;


void* process(void* thread_data)
{ 
    pthrData *data = (pthrData*) thread_data;

    //pthread_mutex_lock(&mtx);


    col_words++;
    char *instr;
    double proc;
    instr = strstr(data->str,data->slovo);
    if(instr!=NULL)
    {
      res += strlen(data->slovo);
    }  

    proc = (res+col_words-1)*100/strlen(data->str);  
    printf("%.2lf\n",proc); 


    //pthread_mutex_unlock(&mtx);

}

int main()
{

    pthread_mutex_init(&mtx,NULL);         

char S1[2000] = "aaa bvbb cc bb gg", S2[2000] = "aaa bvbb";
    char sp[10]=" ";
    char words[2000][2000];
    char *istr;
    int res = 0, col_w=0, tmp, j=0;
    double proc;  

//printf("Enter S1, S2\n");
    //gets(S1);
    //gets(S2);

clock_t time;
    time = clock();



    istr = strtok(S1,sp); 
    while(istr != NULL)
    {
     col_w++;
     strncpy(words[j],istr,2000);
     istr = strtok(NULL,sp);
     j++;
    }

    pthread_t* threads = (pthread_t*) malloc(col_w * sizeof(pthread_t));
    pthrData* threadData = (pthrData*) malloc(col_w * sizeof(pthrData));
    for(int i=0;i


Ответы

Ответ 1



По сути, данная программа подсчитывает количество слов для поиска и сумму длинн найденных слов, которые хотя бы один раз встретились в тексте. Наверное, для определения схожести текста с набором заданных слов, несколько лучше было бы искать все вхождения слов в текст, определяя таким образом, какая часть текста покрывается данными словами. Впрочем, вопрос, как мне кажется, скорее относится к эфективной реализации подобных действий в многопоточной программе. В общем случае, чем больше полезной работы выполняет каждый поток, тем более эффективна будет программа в целом. В частности, надо стараться уменьшить количество синхронизирующих взаимодействий (использование mutex, condition variables и т.п.) между потоками. К счастью, данная программа по своей сути именно такая, она позволяет показать, как можно независимо разделить всю работу по поиску вхождения слов в текст и подсчет некоторй статистики совершенно без взаимодействия потоков. Идея состоит в том, что перед поиском мы создадим массив слов и запустим заданное количество потоков, каждый из которых, основываясь на переданном ему его прядковом номере, будет вычислять какие слова из массива он будет искать в тексте, не пересекаясь с другими потоками. То же самое относится и к результатам работы каждого потока, которые тот создает в отведенном для его номера элементе массива, не взаимодействуя с другими потоками. Затем, первоначальный поток (main) дожидается завершения всех остальных и собирает частичные суммы, накопленные каждым из потоков вместе. #include #include #include #include #include #include // Вернем количество байт, которые word занимает в text // и увеличим общий счетчик вхождений слов в текст size_t lookup_word (char *word, size_t wlen, size_t *tot_cnt, char *text) { char *instr; size_t res = 0; if (wlen) while ((instr = strstr(text, word))) { res += wlen; text = instr + wlen; (*tot_cnt)++; } return res; } // аргументы и результат работы каждого потока // поток ищет в text слова из массива words с индексами кратными номеру потока struct thrdata { int nthr, // общее число потоков i_thr; // номер текущего потока char *text; // текст struct words *words; // массив слов для поиска // результат работы потока size_t n_words, // количество найденных слов tot_cnt, // сумма вхождений слов в текст s_len; // их общий размер в тексте pthread_t tid; }; // дескриптор массива слов struct words { struct wstr *warr; // массив дескрипторов слов size_t n_words; // его размер }; struct wstr { char *wtxt; // байты слова (nil-terminated) size_t wlen; // длина слова (strlen) }; /* В каждом потоке ищем слова с индексом кратным номеру потока Количество найденных слов и общая найденная длина возвращаются в переданной структуре (*arg) */ void * process (void *arg) { struct thrdata *d = (__typeof__(d))arg; d->s_len = d->n_words = d->tot_cnt = 0; size_t res; for (int i = d->i_thr; i < d->words->n_words; i += d->nthr) if ((res = lookup_word(d->words->warr[i].wtxt, d->words->warr[i].wlen, &d->tot_cnt, d->text))) { d->s_len += res; d->n_words++; } return (void *)0; } char * make_text (char *s, size_t nrep) { size_t l = strlen(s); char *t = malloc(l * nrep + 1); if (t) for (size_t i = 0; i < nrep; i++) strcpy(t + i * l, s); return t; } #define INICAP_WORDS 1 int main (int ac, char *av[]) { char S1[2000] = "aaa bvbb cc bb gg sdsdf 123 567", S2[2000] = "aaa bvbb"; char sp[10]=" "; char *istr; clock_t time; /* сделаем из S1[] массив слов (указатель на начало и длина слова) и найдем их количество Внимние(!) strtok() модифицирует содержимое S1[] */ struct words wrds = {0}; size_t wrds_capacity; if (!(wrds.warr = malloc((wrds_capacity = INICAP_WORDS) * sizeof(struct wstr)))) exit((perror("malloc"), 2)); for (istr = strtok(S1,sp); istr; wrds.n_words++, istr = strtok(NULL, sp)) { if (wrds.n_words == wrds_capacity) if (!(wrds.warr = realloc(wrds.warr, (wrds_capacity *= 2) * sizeof(struct wstr)))) exit((perror("realloc"), 2)); wrds.warr[wrds.n_words].wtxt = istr; wrds.warr[wrds.n_words].wlen = strlen(istr); } // определим число потоков int nthr = atoi(av[1] ? av[1] : "0"); if (nthr < 1) if ((nthr = sysconf(_SC_NPROCESSORS_ONLN)) < 1) nthr = 4; // не сконфигурировано или лимит не установлен // число повторений текста int nt = 1; if (av[1] && av[2]) if ((nt = atoi(av[2])) < 1) nt = 1; char *Text = make_text(S2, nt); printf("Text %zu bytes\n", strlen(Text)); // sequential test time = clock(); size_t res = 0, tot_cnt = 0, wcnt = 0; for (int i = 0; i < wrds.n_words; i++) { size_t old = res; res += lookup_word(wrds.warr[i].wtxt, wrds.warr[i].wlen, &tot_cnt, Text); if (res > old) wcnt++; } time = clock() - time; puts("Sequential"); printf("found %zu (%zu times) of total %zu words res = %zu bytes\n", wcnt, tot_cnt, wrds.n_words, res); printf("time: %f sec\n", (double)time/CLOCKS_PER_SEC); // multithread test printf("run %d threads\n", nthr); struct thrdata tdata[nthr]; time = clock(); // заполним аргументы для потоков и запустим их for (int i = 0; i < nthr; i++) { tdata[i].nthr = nthr; tdata[i].i_thr = i; tdata[i].text = Text; tdata[i].words = &wrds; pthread_create(&tdata[i].tid, NULL, process, tdata + i); } // соберем результаты всех потоков res = tot_cnt = wcnt = 0; for (int i = 0; i < nthr; i++) { if (pthread_join(tdata[i].tid, 0) == 0) { res += tdata[i].s_len; wcnt += tdata[i].n_words; tot_cnt += tdata[i].tot_cnt; } } time = clock() - time; free(wrds.warr); puts("Threading"); printf("found %zu (%zu times) of total %zu words res = %zu bytes\n", wcnt, tot_cnt, wrds.n_words, res); printf("time: %f sec\n", (double)time/CLOCKS_PER_SEC); return 0; } Что же касается демонстрации ускорения подобной многопоточной прграммы над однопоточной, то нужо взять побольше данных (особенно размер текста). (попозже попробую продемонстрировать это и найти точку равной эффективности) Update Модифицировал программку. Теперь она сначала выдает результат (время) выполнения в последовательном, а затем в многопоточном варианте. Первый параметр -- количество потоков, второй -- множитель для размера текста (переменная S2[]). Также добавил в S1[] пару слов для поиска, чтобы равномерно загружать потоки в 4-х (8-ми) ядерном CPU. Например: avp@avp-ubu1:hashcode$ gcc w-thr.c -pthread -O2 avp@avp-ubu1:hashcode$ ./a.out 4 1000 Text 8000 bytes Sequential found 3 (3000 times) of total 8 words res = 9000 bytes time: 0.000056 sec run 4 threads Threading found 3 (3000 times) of total 8 words res = 9000 bytes time: 0.000237 sec avp@avp-ubu1:hashcode$ ./a.out 4 10000 Text 80000 bytes Sequential found 3 (30000 times) of total 8 words res = 90000 bytes time: 0.000534 sec run 4 threads Threading found 3 (30000 times) of total 8 words res = 90000 bytes time: 0.000253 sec avp@avp-ubu1:hashcode$ ./a.out 4 50000 Text 400000 bytes Sequential found 3 (150000 times) of total 8 words res = 450000 bytes time: 0.002502 sec run 4 threads Threading found 3 (150000 times) of total 8 words res = 450000 bytes time: 0.000447 sec avp@avp-ubu1:hashcode$ grep CPU /proc/cpuinfo model name : Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz model name : Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz model name : Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz model name : Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz avp@avp-ubu1:hashcode$ uname -a Linux avp-ubu1 4.4.0-66-generic #87-Ubuntu SMP Fri Mar 3 15:29:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux avp@avp-ubu1:hashcode$ В общем, поиграйтесь с различной длиной текста и количеством потоков. (обратите внимание, что начиная с некоторого большого размера текста (на одной машине 1.5Мб, на другой 8Мбайт), результаты снова становятся практически равными (это играет роль скорость обмена между основной памятью и кэшем процессора)) Update 2 (упрощенный вариант, используем внешние массивы для данных потоков) #define NWORDS 2000 char *word[NWORDS]; size_t n_words = 0, nthr; char *text; struct thresult { // результат работы потока size_t n_words, // количество найденных слов tot_cnt, // сумма вхождений слов в текст s_len; // их общий размер в тексте } tres[NWORDS]; Очевидно, что не имеет смысла закладывать больше потоков, чем максимальное количество слов, поэтому константа NWORDS используется для обеих. Теперь функция потока process несколько меняется и в качестве аргумента получает сразу номер потока (приведенный к void *). void * process (void *a) { size_t tix = (size_t)a, res; tres[tix] = (struct thresult){0}; for (size_t i = tix; i < n_words; i += nthr) { size_t wlen = strlen(word[i]); if ((res = lookup_word(word[i], wlen, &tres[tix].tot_cnt, text))) { tres[tix].s_len += res; tres[tix].n_words++; } } return 0; } Также небольшие изменения в main, упрощается создание списка слов, меняются циклы создания потока и сборки результата. (для краткости я полностью выбросил вызов подсчета в одном (главном) потоке) int main (int ac, char *av[]) { char S1[2000] = "aaa bvbb cc bb gg sdsdf 123 567", S2[2000] = "aaa bvbb"; char sp[10]=" "; char *istr; clock_t time; /* сделаем из S1[] массив слов (указатель на начало и длина слова) и найдем их количество (в extern n_words !!!) Внимние(!) strtok() модифицирует содержимое S1[] */ for (istr = strtok(S1,sp); n_words < NWORDS && istr; n_words++, istr = strtok(NULL, sp)) word[n_words] = istr; // определим число потоков (в extern nthr !!!) nthr = atoi(av[1] ? av[1] : "0"); if (nthr < 1) if ((nthr = sysconf(_SC_NPROCESSORS_ONLN)) < 1) nthr = 4; // не сконфигурировано или лимит не установлен if (nthr >= NWORDS) nthr = NWORDS - 1; // число повторений текста int nt = 1; if (av[1] && av[2]) if ((nt = atoi(av[2])) < 1) nt = 1; text = make_text(S2, nt); printf("Text %zu bytes\n", strlen(text)); printf("run %zu threads\n", nthr); time = clock(); pthread_t tid[nthr]; for (size_t i = 0; i < nthr; i++) pthread_create(tid + i, NULL, process, (void *)i); size_t res = 0, tot_cnt = 0, wcnt = 0; for (size_t i = 0; i < nthr; i++) if (pthread_join(tid[i], 0) == 0) { res += tres[i].s_len; wcnt += tres[i].n_words; tot_cnt += tres[i].tot_cnt; } time = clock() - time; printf("found %zu (%zu times) of total %zu words res = %zu bytes\n", wcnt, tot_cnt, n_words, res); printf("time: %f sec\n", (double)time/CLOCKS_PER_SEC); return 0; }

Ответ 2



Ну вот, все и решилось... у меня в ubuntu в настройках виртуалки стоит только 1 ядро Понимаете, бессмысленно делать параллельность на одном ядре. Это ядро должно выполнить все то, что у вас делалось в однопоточной программе (и только по очереди - оно же одно!), плюс выполнить всю работу по созданию потоков, плюс - если потоки длинные еще их и переключать... Так что все понятно. Выигрыша не будет никакого, кроме проигрыша... Потоков вообще не рекомендуется делать больше, чем ядер - с единственным исключением: когда поток что-то ожидает, скажем, от контроллера, и в это время работает другой поток. Случай не ваш. А вычислительные потоки будут только мешать друг другу, если их больше, чем число ядер.

Ответ 3



Во-первых, нет смысла сравнивать производительность относительно элементарных кусков кода, если они отличаются наличием относительно "тяжелых" операций. В вашем многопоточном варианте присутствуют вызовы функции strncpy с заведомо большим значением лимита размера: 2000 при том, что ваши слова состоят всего из 2-5 символов. Если вы представляете, что такое strncpy и что эта функция на самом деле делает, то вы должны понимать, что эти вызовы будут делать типичную для stncpy "дурную работу", т.е. заниматься обнулением сравнительно большого неиспользуемого хвоста массива. Учитывая, что ваши входные слова на порядки короче, чем 2000, уже этого вполне достаточно, чтобы замедлить вашу реализацию с strncpy в разы. Первый вариант не занимается тупым вываливанием в память почти N * 6K нулей (где N - количество слов), а второй - занимается. Неудивительно, что второй вариант работает намного медленнее. Во-вторых, вы делаете strncpy(threadData[i].slovo,words[i],2000); при том, что threadData[i].slovo - это просто char [20] (!). То есть каждый такой strncpy просто вылетает за пределы массива и тупо бомбит нулями аж чуть ли не 2K чужой памяти за пределами этого массива. О чем вообще можно говорить в такой программе? Программа мертва чуть менее, чем полностью и какие-то замеры ее производительности - бесмысленны.

Ответ 4



Не могу больше на это смотреть :) Вот, набросал вариантик. Надеюсь, не сильно накосячил. Мелочи вылизывать лень. Важное замечание: данные для всех потоков одни и те же. Изменяемая в них часть - только позиция текущего обрабатываемого потоком слова. Точно так же (в этой же структуре) можно расшарить и аналитику, что там и как считается не вникал. Очевидные обработки ошибок тоже пропущены. На самом деле тут просто реализация разгребания очереди несколькими потоками, где в качестве очереди выступает массив слов. То же самое можно сделать на основе очереди или стека (в смысле контейнеров таких). #include #include #include #include #include #include #include #define DELIMITERS " \t\r\n" /* ------------------------------------------------------------------------- */ static const char **split_2_words(char *string) { const char **words = NULL; size_t size = 0; char *word = strpbrk(string, DELIMITERS); /* * Считаем количество слов в строке. Может насчитаться больше * (из-за сдвоеных разделителей), но не меньше. */ while (word) { size++; word = strpbrk(word + 1, DELIMITERS); } if ((words = calloc(sizeof(const char *), size + 2))) { size = 0; word = strtok(string, DELIMITERS); while (word) { words[size++] = word; word = strtok( NULL, DELIMITERS); } } return words; } /* ------------------------------------------------------------------------- */ typedef struct _thread_data { const char **sentence_words; const char **words_2_find; size_t current_word; pthread_mutex_t mutex; } thread_data; /* ------------------------------------------------------------------------- */ static void * process(void *_data) { thread_data *data = (thread_data *) _data; while (data) { size_t current_word; pthread_mutex_lock(&data->mutex); if (data->words_2_find[data->current_word]) { current_word = data->current_word; ++data->current_word; pthread_mutex_unlock(&data->mutex); } else { pthread_mutex_unlock(&data->mutex); break; } for (size_t idx = 0; data->sentence_words[idx]; idx++) { if (strstr(data->sentence_words[idx], data->words_2_find[current_word])) { printf("Word '%s' found in '%s'\n", data->words_2_find[current_word], data->sentence_words[idx]); } } } return _data; } /* ------------------------------------------------------------------------- */ int main() { char *sentence = readline("Enter sentence: "); char *words = readline("Enter words to find: "); const char **sentence_words = split_2_words(sentence); const char **words_2_find = split_2_words(words); thread_data *data = malloc(sizeof(thread_data)); size_t nthreads = (size_t) sysconf(_SC_NPROCESSORS_ONLN); pthread_t *threads = calloc(nthreads, sizeof(pthread_t)); data->sentence_words = sentence_words; data->words_2_find = words_2_find; data->current_word = 0; pthread_mutex_init(&data->mutex, NULL); for (size_t i = 0; i < nthreads; i++) { pthread_create(threads + i, NULL, process, data); } for (size_t i = 0; i < nthreads; i++) { pthread_join(*(threads + i), NULL); } free(sentence); free(words); free(sentence_words); free(words_2_find); free(data); free(threads); printf("That's All, Folks!\n"); return 0; } /* ------------------------------------------------------------------------- */

Комментариев нет:

Отправить комментарий