#cpp #многопоточность
Уже несколько часов ломаю голову, не могу придумать, как реализовать обработку файлов, выполняемую двумя потоками. Создание потоков (2) for (int i = 0; i < threadCount; i++) { threads.push_back(boost::thread(readFile)); } Функция readFile void readFile() { while (true) { fs::path filePath = getFilePath(); fs::ifstream ifs(filePath); if (!ifs) break; if (!ifs.is_open()) { break; } *обработка* } Функция getFilePath fs::path getFilePath() { if (it != fs::directory_iterator() && it->path().extension() == ".txt") { fs::path itPath = it->path(); ++it; return itPath; } } Т.е. сейчас я создаю поток, который сразу обращается к getFilePath и обрабатывает файл. Это повторяется до тех пор, пока в папке есть необработанные файлы. Если запустить один поток - то всё нормально, но при двух - всё плохо. Дайте пару примеров работающего кода, или укажите на мои ошибки. UPD: Добился нормальной работы, изменив getFilePath string getFilePath() { { lock_guardlock(m); if (it != fs::directory_iterator() && it->path().extension() == ".txt") { string itPath = it->path().string(); ++it; bHaveFiles = true; return itPath; } else { return "null"; } } } Но вопрос не сильно изменился. Сейчас, если я создам условные 8 потоков, то из-за мьютекста только один поток получит файл на обработку, а остальные 7 потоков вынуждены ждать своей очереди. Реально как-либо организовать асинхронную работу?
Ответы
Ответ 1
Еще один ответ - поскольку он посвящен не советам, а экспериментальной их проверке. Проведя некоторые эксперименты, должен признать, что мое предположение, что дисковые операции не дадут ускорения за счет многопоточности, не оправдались. Вот эти эксперименты. Для начала создаем 20000 файлов по 200K каждый - #include#include #include int main() { for(int i = 0; i < 20000; ++i) { char buf[200]; sprintf(buf,"g:\\tmp\\test\\%06d.dat",i); FILE * f = fopen(buf,"wb"); if (f) { for(int j = 0; j < 50000; ++j) fwrite(&i,sizeof(i),1,f); fwrite(buf,strlen(buf)+1,1,f); fclose(f); } } } После этого в дело вступает основная программа. #include #include #include #include #include #include #include #include #include #include using namespace std; mutex sumMtx; // Мьютекс для защиты аккумулятора unsigned long long sum = 0; // Аккумулятор (сумма чисел, считанных из файла) mutex filesMtx; // Мьютекс для защиты списка файлов vector files; // Список имен файлов size_t filesIdx; // Текущий индекс в файле vector getDirFiles(string dirName) // Сбор всех имен файлов в каталоге { vector fileNames; _finddata_t info; intptr_t handle = _findfirst((dirName + "\\*.*").c_str(),&info); if (handle == -1) return fileNames; do { if (info.attrib & _A_SUBDIR) continue; fileNames.push_back(string(dirName) + "\\" + info.name); } while(_findnext(handle,&info) == 0); _findclose(handle); return fileNames; } void handleOneFile(const char * name) // Обработка одного файла { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; // Чтение около 40К for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); // Запись 4 байт lock_guard lk(sumMtx); // Увеличение аккумулятора sum += v; } } void asyncHandle() // Главная функция потока { for(;;) { const char * name; // Чтение очередного { // файла из списка, lock_guard lk(filesMtx); // если уже все считаны - if (filesIdx >= files.size()) break; // выход name = files[filesIdx++].c_str(); } handleOneFile(name); // Обработка файла } } int main(int argc, const char * argv[]) { files = getDirFiles("G:\\Tmp\\Test"); // Сбор списка файлов filesIdx = 0; // Сброс и пару проходов asyncHandle(); // приведения кешей в filesIdx = 0; // стабильное состояние asyncHandle(); for(int threadCount = 1; threadCount < 20; ++threadCount) // Для разного числа потоков { sum = 0; // Сброс аккумулятора и индекса filesIdx = 0; cout << "threadCount = " << setw(3) << threadCount << ": "; muTimer mu; // Мой таймер для хронометража vector > tasks; for(int i = 0; i < threadCount; ++i) // Создаем threadCount потоков { tasks.push_back(async(asyncHandle)); } for(int i = 0; i < threadCount; ++i) // Дожидаемся завершения { tasks[i].get(); } cout << sum << " "; // Выводим накопленную сумму } // и затраченное время } Результат оказался следующим (я старался максимально убрать все задачи на машине, чтоб никто в этот момент не полез к диску): threadCount = 1: 199990000 46108 ms threadCount = 2: 199990000 23587 ms threadCount = 3: 199990000 9802 ms threadCount = 4: 199990000 8409 ms threadCount = 5: 199990000 8492 ms threadCount = 6: 199990000 8575 ms threadCount = 7: 199990000 8332 ms threadCount = 8: 199990000 8507 ms threadCount = 9: 199990000 8585 ms threadCount = 10: 199990000 8254 ms threadCount = 11: 199990000 8326 ms threadCount = 12: 199990000 8218 ms threadCount = 13: 199990000 8784 ms threadCount = 14: 199990000 8359 ms threadCount = 15: 199990000 8528 ms threadCount = 16: 199990000 8481 ms threadCount = 17: 199990000 8531 ms threadCount = 18: 199990000 8772 ms threadCount = 19: 199990000 8525 ms Компилировал VC++ 2015 x86, машина под Windows 7 x64, с четырехъядерным процессором, так что совет запускать потоков не более, чем имеется ядер, в целом обоснован :) Update При изменении в main() на такой код vector tasks; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle)); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); } результаты поменялись не особенно: threadCount = 1: 199990000 41373 ms threadCount = 2: 199990000 24758 ms threadCount = 3: 199990000 9544 ms threadCount = 4: 199990000 8165 ms threadCount = 5: 199990000 7911 ms threadCount = 6: 199990000 7970 ms threadCount = 7: 199990000 7807 ms threadCount = 8: 199990000 7942 ms threadCount = 9: 199990000 8064 ms threadCount = 10: 199990000 7858 ms threadCount = 11: 199990000 8361 ms threadCount = 12: 199990000 8157 ms threadCount = 13: 199990000 8550 ms threadCount = 14: 199990000 8001 ms threadCount = 15: 199990000 8392 ms threadCount = 16: 199990000 8346 ms threadCount = 17: 199990000 8558 ms threadCount = 18: 199990000 8410 ms threadCount = 19: 199990000 8398 ms А вот при изменении void asyncHandle(int start, int stop) { for(;start < stop; ++start) { handleOneFile(files[start].c_str()); } } и vector tasks; int count = (files.size()+20)/threadCount; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle,i*count, std::min((size_t)(i+1)*count,files.size()))); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); } (т.е. каждому потоку выдавался примерно равный кусок работы изначально) результаты получались threadCount = 1: 199990000 57153 ms threadCount = 2: 199990000 27474 ms threadCount = 3: 199990000 16320 ms threadCount = 4: 199990000 13041 ms threadCount = 5: 199990000 8656 ms threadCount = 6: 199990000 8811 ms threadCount = 7: 199990000 8943 ms threadCount = 8: 199990000 10088 ms threadCount = 9: 199990000 9069 ms threadCount = 10: 199990000 8360 ms threadCount = 11: 199990000 8578 ms threadCount = 12: 199990000 8839 ms threadCount = 13: 199990000 8435 ms threadCount = 14: 199990000 8957 ms threadCount = 15: 199990000 8718 ms threadCount = 16: 199990000 10704 ms threadCount = 17: 199990000 10382 ms threadCount = 18: 199990000 10500 ms threadCount = 19: 199990000 11576 ms Т.е. рост с увеличением числа потоков появился, но не такой уж большой, и при небольших количествах потоков результаты оказались явно хуже. Почему - особых идей нет, разве что первые потоки успевали до запуска следующих сделать кучу работы, так что заканчивались первыми и ожидали, пока последние доделают свою часть. Так что мне кажется, что более равномерная работа - при выборке файлов из коллекции по одному - более эффективный способ работы. Ну и последнее - если перенести накопление после всей выполненной работы, а не для каждого файла: int handleOneFile(const char * name) { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); return v; } return 0; } void asyncHandle(int start, int stop) { long long s = 0; for(;start < stop; ++start) { s += handleOneFile(files[start].c_str()); } lock_guard lk(sumMtx); sum += s; } получилось примерно так: threadCount = 1: 199990000 38168 ms threadCount = 2: 199990000 23800 ms threadCount = 3: 199990000 10787 ms threadCount = 4: 199990000 9065 ms threadCount = 5: 199990000 8403 ms threadCount = 6: 199990000 8077 ms threadCount = 7: 199990000 8410 ms threadCount = 8: 199990000 9656 ms threadCount = 9: 199990000 8038 ms threadCount = 10: 199990000 8209 ms threadCount = 11: 199990000 8722 ms threadCount = 12: 199990000 8561 ms threadCount = 13: 199990000 8524 ms threadCount = 14: 199990000 8524 ms threadCount = 15: 199990000 8000 ms threadCount = 16: 199990000 8511 ms threadCount = 17: 199990000 8463 ms threadCount = 18: 199990000 9431 ms threadCount = 19: 199990000 8456 ms Откровенно говоря, сделать какие-то однозначные (далеко ведущие :)) выводы и дать какие-то рекомендации несколько затрудняюсь... Мьютексы явно играют роль, хотя и не кардинальную, но вполне заметную. Сейчас сделаю еще один эксперимент с async и на этом закруглюсь. Вот с async вновь. Отличие в том, что здесь накопление в глобальную переменную идет не после обработки файла, а по окончании всей работы; кроме того, чтобы еще уменьшить количество обращений к мьютексу, я брал из коллекции сразу по два имени файла. Результат не сказать чтоб сильно отличающийся. Почему для малого количества потоков при этом больше, чем в самой первой версии - для меня загадка :( threadCount = 1: 199990000 44203 ms threadCount = 2: 199990000 39898 ms threadCount = 3: 199990000 23217 ms threadCount = 4: 199990000 7808 ms threadCount = 5: 199990000 8043 ms threadCount = 6: 199990000 7668 ms threadCount = 7: 199990000 8168 ms threadCount = 8: 199990000 7762 ms threadCount = 9: 199990000 7675 ms threadCount = 10: 199990000 8221 ms threadCount = 11: 199990000 7910 ms threadCount = 12: 199990000 8056 ms threadCount = 13: 199990000 7888 ms threadCount = 14: 199990000 8058 ms threadCount = 15: 199990000 7741 ms threadCount = 16: 199990000 8075 ms threadCount = 17: 199990000 7812 ms threadCount = 18: 199990000 8489 ms threadCount = 19: 199990000 8346 ms Ответ 2
В комментарий просто не влез, так что простите, выскажу ответом несколько своих соображений. Дисковые операции существенно более долгие, чем получение одного значения из контейнера с помощью итератора, так что ожидание мьютекса тормозить потоки не будет - это не главный тормоз. Что касается мьютексов - то опять же вам нужно как-то реализовывать подсчет запущенных потоков, а для этого использовать тот же мьютекс (вот тут мне давали совет, как это реализовать). Список файлов я бы собрал заранее, а потом просто выдавал потокам по одному - вплоть до собрать их все в каком-то векторе, и выдавать ссылку или указатель на имя, даже не копируя - вряд ли ожидание такой короткой операции будет длинным. А вот подгадить несинхронизированное обращение может сильно. Да и собирать список по ходу дела плохо по двум причинам: это тоже дисковая операция, и вот тут могут быть тормоза, это раз, и список файлов во время работы может меняться - это два... Еще - я бы не использовал потоки. Все авторитеты в один голос говорят, что лучше использовать async - если они пишут правду, то его реализации работают с пулом потоков, а значит, это существенно быстрее, чем создавать-убивать потоки. С моей же точки зрения, даже важнее то, что async корректно обработает даже исключение в потоке, не уложив при этом программу. Еще одно замечание - при большом количестве файлов и большом количестве потоков начнет работать против вас ограничение на количество одновременно открытых файлов. А в общем - повторюсь: лично я не жду большого эффекта от распараллеливания дисковых операций. Если распараллеливать, скажем, вычисления и дисковые операции - то это разумно: медленная дисковая операция не заставляет в таком случае тормозиться вычислениям. Но когда несколько потоков одновременно требуют от диска что-то сделать - есть у меня подозрение, что операционка такие требования просто поставит в одну очередь, и вся многопоточность станет самообманом... Вопрос вы задали интересный, надо бы посидеть и прохронометрировать...
Комментариев нет:
Отправить комментарий