Страницы

Поиск по вопросам

среда, 4 марта 2020 г.

Как использовать сопрограммы С++ с Boost.Asio?

#cpp #boost #сопрограмма


Есть прокси-сервер, написанный на асинхронном API Boost.Asio - async_* функции и
коллбеки.

Полный код есть в этом ответе.
Схематично его можно описать так:

Цикл приема входящих соединений, установка соединения с сервером назначения:

void accept_loop() {
   acceptor.async_accept(src_socket, [](auto err) {
     accept_loop();  // повторяем accept

     dst_socket.async_connect(dst_endpoint, [](auto err) {
       proxy_loop(src_socket, dst_socket);
       proxy_loop(dst_socket, src_socket);
     });
  });
}


Цикл передачи данных, по 2 шт. на каждое соединение:

void proxy_loop(socket src, socket dst) {
  async_read(src, buf, [](auto error, auto n) {
    async_write(dst, buf, [](auto error, auto n) {
      proxy_loop(src, dst);  // повторяем read
    });
  });
}


Как переписать этот сервер с использованием сопрограмм С++?
    


Ответы

Ответ 1



В настоящий момент нет стандартного класса Future, который был бы совместим с сопрограммами и co_await. Также Boost.Asio еще не поддерживает co_await из коробки. Поэтому мы напишем и то и другое, всего за сто строк кода. Начнем с универсального Future, который можно вернуть из сопрограммы, и который можно ждать в co_await. template struct Future { // На памяти экономить не будем, // поэтому данные будем хранить в некотором "общем состоянии", // результат будем копировать (или перемещать). struct SharedState { T value; std::experimental::coroutine_handle<> h; std::atomic is_ready; }; // Поддержка использования Future как результата сопрограммы. struct promise_type { std::shared_ptr s = std::make_shared(); Future get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } // SharedState переживет удаление promise_type в конце работы сопрограммы std::experimental::suspend_never final_suspend() { return {}; } void return_value(T value) const { s->value = std::move(value); if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr s; // Поддержка co_await. bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } T await_resume() { return std::move(s->value); } }; Теперь пишем всё то же самое, но для случая когда сопрограмма не возвращает значений. template<> struct Future { struct SharedState { std::experimental::coroutine_handle<> h; std::atomic is_ready; }; struct promise_type { std::shared_ptr s = std::make_shared(); Future get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } std::experimental::suspend_never final_suspend() { return {}; } void return_void() const { if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr s; bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } void await_resume() {} }; Это был наш Future. Теперь пишем обертки над async_* функциями Boost.Asio. Мы можем использовать тот же Future::promise_type, как будто это сопрограмма. Future coro_accept(boost::asio::ip::tcp::acceptor& acceptor, boost::asio::ip::tcp::socket& socket) { Future::promise_type p; acceptor.async_accept(socket, [p](auto error) { p.return_value(error); }); return p.get_return_object(); } Future coro_connect(boost::asio::ip::tcp::socket& socket, boost::asio::ip::tcp::endpoint endpoint) { Future::promise_type p; socket.async_connect(endpoint, [p](auto error) { p.return_value(error); }); return p.get_return_object(); } Если callback принимает больше одного параметра, то их можно сделать out-параметрами. В С++17 можно будет использовать tuple и structured bindings для распаковки. template Future coro_read(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_read) { Future::promise_type p; socket.async_read_some(bufs, [p, &bytes_read](auto error, auto n) { bytes_read = n; p.return_value(error); }); return p.get_return_object(); } template Future coro_write_all(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_written) { Future::promise_type p; async_write(socket, bufs, boost::asio::transfer_all(), [p, &bytes_written](auto error, auto n) { bytes_written = n; p.return_value(error); }); return p.get_return_object(); } И наконец сам код сервера, те же ~50 строк что и в оригинале boost::asio::io_service io_service; boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query dst_query("arrowd.name", "80"); boost::asio::ip::tcp::resolver::iterator dst_iterator = resolver.resolve(dst_query); boost::asio::ip::tcp::endpoint dst_endpoint = *dst_iterator; Future proxy(boost::asio::ip::tcp::socket& src, boost::asio::ip::tcp::socket& dst) { char buf[4096]; for (;;) { std::size_t bytes_read; auto error = co_await coro_read(src, boost::asio::buffer(buf), bytes_read); std::cout << "read " << bytes_read << ' ' << error << '\n'; if (error) break; std::size_t bytes_written; error = co_await coro_write_all(dst, boost::asio::buffer(buf, bytes_read), bytes_written); std::cout << "write " << bytes_written << ' ' << error << '\n'; if (error) break; } // Закрытие сокетов вызовет ошибку в сопрограмме которая качает в другую сторону src.close(); dst.close(); } Future connect(boost::asio::ip::tcp::socket src) { boost::asio::ip::tcp::socket dst(io_service); auto error = co_await coro_connect(dst, dst_endpoint); std::cout << "connect " << error << '\n'; if (error) co_return; auto _ = proxy(src, dst); // Запускаем первую сопрограмму без ожидания co_await proxy(dst, src); // Запускаем вторую и ждем // Мы вышли из второй с какой-то ошибкой co_await _; // Ждем завершения первой } Future accept_loop() { boost::asio::ip::tcp::endpoint src_endpoint( boost::asio::ip::address_v4::loopback(), 8080); // localhost:8080 boost::asio::ip::tcp::acceptor acceptor{io_service, src_endpoint}; for (;;) { boost::asio::ip::tcp::socket src(io_service); auto error = co_await coro_accept(acceptor, src); std::cout << "accept " << error << '\n'; if (error) co_return; connect(std::move(src)); } } int main() { accept_loop(); io_service.run(); // Можно запустить параллельно в нескольких потоках } Код стал гораздо чище - пропали коллбеки. connect пришлось вынести из accept_loop в отдельную функцию, т.к. это отдельная сопрограмма. Всё что явно выделялось в динамической памяти теперь выглядит как локальные переменные. При этом оно всеравно живет в динамической памяти, в coroutine-state. Это позволяет передавать сокеты в proxy по ссылке - их время жизни привязно к connect. В данном коде отсутствует обработка исключений. Для поддержки исключений, Future должно уметь перебрасывать исключения при помощи std::exception_ptr.

Комментариев нет:

Отправить комментарий