Страницы

Поиск по вопросам

понедельник, 23 декабря 2019 г.

Блокировать процесс пока не завершатся потоки/дочерние процессы

#cpp #многопоточность #cpp14 #pthread


Есть такой код:

if(pid_t pid = fork()) // spawn child process
{
    // parent process
    LOG("Try to execute smth. in child process");
    return;
}
else
{
    // child process
    doWork();
    std::terminate();
}


Тут порождается дочерний процесс в котором выполняется некоторая работа. Где-то в
другом месте(а именно в момент завершения процесса) есть такое:

// wait childern(some code may call fork())
const pid_t pid{::wait(nullptr)};


Здесь процесс ждет завершения дочерних процессов(которые нафоркались ранее). Так
вот, все это удовольствие не работает вместе ключом -fsanintize=thread. Код зависает
на wait, потому что компилятор добавляет некоторые дочерние процессы, которые сами
не завершаются.

Вопрос. Как добиться аналогичного поведения без wait? Чтобы задачи запускались асинхронно,
и главный процесс ждал завершения этих задач в случае прекращения работы. Может при
помощи потоков, а не процессов. Если бы у меня в распоряжении был Qt, я бы закидывал
эти doWork в QThreadPool и он бы в декструкторе ждал завершения всех задач. Но Qt у
меня нет, а есть C++14 и pthread. Может кто-то предложить какое-нибудь решение?
    


Ответы

Ответ 1



Накидал на скорую руку, но, думаю, идея понятна. #include #include #include #include #include #include #include #include class fork_storage { public: fork_storage() noexcept { m_pid = fork(); } fork_storage(const fork_storage&) = delete; fork_storage& operator=(const fork_storage&) = delete; fork_storage& operator=(fork_storage&& rhv) = delete; fork_storage(fork_storage&& src) noexcept { m_pid = src.pid(); src.reset(); } ~fork_storage() { if (is_parent()) { wait(); } } bool fail() const noexcept { return m_pid < 0; } bool is_parent() const noexcept { return m_pid > 0; } void reset() noexcept { m_pid = -1; } pid_t pid() const noexcept { return m_pid; } int wait() noexcept { int status = -1; if (is_parent()) { waitpid(m_pid, &status, WUNTRACED); } reset(); return status; } private: pid_t m_pid; }; std::mt19937 &random_gen() { static std::mt19937 gen(std::chrono::system_clock::now().time_since_epoch().count()); return gen; } int main() { std::vector processes; for (unsigned i = 0; i < 10; ++i) { fork_storage fork_obj; if (fork_obj.fail()) { std::cerr << "fork error\n"; } else { if (fork_obj.is_parent()) { std::string message = "process created: " + std::to_string(fork_obj.pid()) + "\n"; processes.emplace_back(std::move(fork_obj)); } else { for (auto &process: processes) { process.reset(); } processes.clear(); std::chrono::seconds sec(random_gen()() % 5 + 1); std::string message = "child " + std::to_string(getpid()) + " paused: " + std::to_string(sec.count()) + " sec\n"; std::cout << message; std::this_thread::sleep_for(sec); break; } } } while (!processes.empty()) { fork_storage &fork_obj = processes.back(); if(fork_obj.is_parent()) { std::string message = "wait process: " + std::to_string(fork_obj.pid()) + "\n"; std::cout << message; } processes.pop_back(); } } http://rextester.com/LUZI37730

Ответ 2



Можно использовать std::thread: #include #include void func1() //Функция для запуска в потоке { std::cout << "Executing func1" << std::endl; } void func2(int x) //Еще одна функция с аргументом { std::cout << "Executing func2 with x=" << x << std::endl; } int main() { //Запускаем функции в отдельных потоках std::thread thread1 (func1); std::thread thread2 (func2,1); //Параллельно можем исполнять код в главном процессе std::cout << "Execute some commands in main process" << std::endl; //Ждем завершения кода в потоках thread1.join(); thread2.join(); std::cout << "Thread execution completed" << std::endl; return 0; } Помимо этого для распараллеливания процесса можно использовать std::async: #include #include #include #include #include #include #include //Некоторая задача которую мы будем выполнять асинхронно template int parallel_sum(RandomIt beg, RandomIt end) { auto len = end - beg; //определяем длину вектора if (len < 1000) return std::accumulate(beg, end, 0); //короткий вектор суммируем сразу //Рекурсивно запускаем parallel_sum для получения большего числа потоков RandomIt mid = beg + len/2; auto handle = std::async(std::launch::async, parallel_sum, mid, end); int sum = parallel_sum(beg, mid); return sum + handle.get(); } int main() { //Для примера будем суммировать большой vector случайных чисел std::vector v(100000); for (auto& e: v){ e = std::rand(); } //Для интереса сравним время выполнения суммирования вектора с помощью async: auto t0 = std::chrono::high_resolution_clock::now(); std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n'; auto t1 = std::chrono::high_resolution_clock::now(); std::cout << std::chrono::duration_cast(t1-t0).count() << "msec\n"; //и суммирование в одном главном потоке: t0 = std::chrono::high_resolution_clock::now(); std::cout << std::accumulate(v.begin(), v.end(), 0); t1 = std::chrono::high_resolution_clock::now(); std::cout << std::chrono::duration_cast(t1-t0).count() << "msec\n"; } У меня результат получился следующий: The sum is 1746632618 9msec 17466326181msec std::async - высокоуровневый инструмент, он не гарантирует, что код обязательно будет выполнен в отдельном потоке. Далее привожу простейший пример управления потоками, хотя это уже совсем другая история, подчеркиваю, что это простейший пример, много еще надо решать, например совместное использование cout всеми потоками: #include #include #include using namespace std; void func(const int i) { // Функция, исполняемая в потоках cout << "Thread N " << i << " created" << endl; } int main() { vector thread_pool; //создаем пул потоков на основе вектора int threadNum = 10; //количество потоков for (int i=0; i < threadNum; i++) { thread_pool.push_back(thread(func,i)); //запускаем потоки } for (auto& t : thread_pool) { t.join(); //ожидаем окончание выполнения потоков } }

Комментариев нет:

Отправить комментарий