diff --git a/include/ylt/coro_io/listen_endpoint.hpp b/include/ylt/coro_io/listen_endpoint.hpp index 09b501100..7e75a2e3e 100644 --- a/include/ylt/coro_io/listen_endpoint.hpp +++ b/include/ylt/coro_io/listen_endpoint.hpp @@ -1,8 +1,11 @@ #pragma once +#include #include #include #include +#include +#include #include "asio/error_code.hpp" #include "asio/ip/address.hpp" @@ -11,6 +14,71 @@ namespace coro_io::detail { +struct listen_address { + std::string address; + uint16_t port = 0; +}; + +inline listen_address parse_listen_address(std::string_view address, + uint16_t default_port = 0) { + listen_address parsed{std::string(address), default_port}; + if (address.empty()) { + return parsed; + } + + if (address.front() == '[') { + auto close = address.find(']'); + if (close != std::string_view::npos) { + parsed.address = std::string(address.substr(1, close - 1)); + if (close + 1 < address.size() && address[close + 1] == ':') { + auto port_sv = address.substr(close + 2); + uint16_t port = 0; + auto [ptr, ec] = std::from_chars(port_sv.data(), + port_sv.data() + port_sv.size(), port); + if (ec == std::errc{}) { + parsed.port = port; + } + } + return parsed; + } + } + + asio::error_code ec; + (void)asio::ip::make_address(address, ec); + if (!ec) { + parsed.address = std::string(address); + return parsed; + } + + if (size_t pos = address.rfind(':'); pos != std::string_view::npos) { + auto port_sv = address.substr(pos + 1); + uint16_t port = 0; + auto [ptr, parse_ec] = + std::from_chars(port_sv.data(), port_sv.data() + port_sv.size(), port); + if (parse_ec == std::errc{}) { + parsed.address = std::string(address.substr(0, pos)); + parsed.port = port; + } + } + + return parsed; +} + +inline bool is_ipv6_any_endpoint(const asio::ip::tcp::endpoint& endpoint) { + return endpoint.address().is_v6() && + endpoint.address().to_v6().is_unspecified(); +} + +inline bool should_create_dual_stack_acceptor( + const asio::ip::tcp::endpoint& endpoint) { +#if defined(__linux__) + return is_ipv6_any_endpoint(endpoint); +#else + (void)endpoint; + return false; +#endif +} + // Resolve a string address into a TCP endpoint. // - If `address` is a numeric IP literal (e.g. "::", "0.0.0.0", "127.0.0.1"), // it is parsed directly via `asio::ip::make_address` to avoid the ambiguity @@ -49,14 +117,69 @@ inline std::optional resolve_listen_endpoint( // cannot be set (e.g. on platforms that disallow toggling V6ONLY at runtime). // Callers are expected to log a warning on failure rather than abort, since // the listen socket can still serve IPv6 connections. +enum class ipv6_only_mode { + keep, + enable, + disable, +}; + +inline asio::error_code set_ipv6_only(asio::ip::tcp::acceptor& acceptor, + const asio::ip::tcp::endpoint& endpoint, + ipv6_only_mode mode) { + asio::error_code ec; + if (endpoint.protocol() == asio::ip::tcp::v6() && + mode != ipv6_only_mode::keep) { + acceptor.set_option(asio::ip::v6_only(mode == ipv6_only_mode::enable), ec); + } + return ec; +} + inline asio::error_code set_ipv6_only_false( asio::ip::tcp::acceptor& acceptor, const asio::ip::tcp::endpoint& endpoint) { + return set_ipv6_only(acceptor, endpoint, ipv6_only_mode::disable); +} + +inline void close_acceptor_now(asio::ip::tcp::acceptor& acceptor) { + asio::error_code ec; + acceptor.cancel(ec); + acceptor.close(ec); +} + +inline asio::error_code init_tcp_acceptor( + asio::ip::tcp::acceptor& acceptor, const asio::ip::tcp::endpoint& endpoint, + ipv6_only_mode mode) { + using asio::ip::tcp; asio::error_code ec; - if (endpoint.protocol() == asio::ip::tcp::v6()) { - acceptor.set_option(asio::ip::v6_only(false), ec); + + acceptor.open(endpoint.protocol(), ec); + if (ec) { + return ec; + } +#ifdef __GNUC__ + acceptor.set_option(tcp::acceptor::reuse_address(true), ec); +#endif + if (auto opt_ec = set_ipv6_only(acceptor, endpoint, mode); opt_ec) { + close_acceptor_now(acceptor); + return opt_ec; + } + acceptor.bind(endpoint, ec); + if (ec) { + close_acceptor_now(acceptor); + return ec; + } +#ifdef _MSC_VER + acceptor.set_option(tcp::acceptor::reuse_address(true)); +#endif + acceptor.listen(asio::socket_base::max_listen_connections, ec); + if (ec) { + close_acceptor_now(acceptor); } return ec; } +inline asio::ip::tcp::endpoint make_ipv4_any_endpoint(uint16_t port) { + return {asio::ip::make_address_v4("0.0.0.0"), port}; +} + } // namespace coro_io::detail diff --git a/include/ylt/coro_io/server_acceptor.hpp b/include/ylt/coro_io/server_acceptor.hpp index a8fb38ced..7e46bb5d0 100644 --- a/include/ylt/coro_io/server_acceptor.hpp +++ b/include/ylt/coro_io/server_acceptor.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ struct server_acceptor_base { uint16_t port() const noexcept { return port_; } std::string_view address() const { return address_; } virtual void close() = 0; + virtual void close_now() = 0; virtual listen_errc listen() = 0; virtual async_simple::coro::Lazy< ylt::expected> @@ -46,77 +48,52 @@ struct server_acceptor_base { pool_ = pool; } void init_address(std::string_view address) { - if (port_ == 0) { - if (size_t pos = address.rfind(':'); pos != std::string::npos) { - auto port_sv = std::string_view(address).substr(pos + 1); - - uint16_t port; - auto [ptr, ec] = std::from_chars( - port_sv.data(), port_sv.data() + port_sv.size(), port, 10); - if (ec != std::errc{}) { - address_ = std::string{address}; - return; - } - - port_ = port; - address = address.substr(0, pos); - if (address.front() == '[') { - if (address.size() > 2) - address = address.substr(1, address.size() - 2); - } - } - } - - address_ = std::string{address}; + auto parsed = detail::parse_listen_address(address, port_); + address_ = std::move(parsed.address); + port_ = parsed.port; } + void set_ipv6_dual_stack(bool v) noexcept { ipv6_dual_stack_ = v; } + bool ipv6_dual_stack() const noexcept { return ipv6_dual_stack_; } uint16_t port_; std::string address_; coro_io::io_context_pool* pool_ = nullptr; + bool ipv6_dual_stack_ = false; }; struct tcp_server_acceptor : public server_acceptor_base { virtual listen_errc listen() override { - acceptor_ = - asio::ip::tcp::acceptor{pool_->get_executor()->get_asio_executor()}; + executor_ = pool_->get_executor(); + acceptor_ = asio::ip::tcp::acceptor{executor_->get_asio_executor()}; ELOG_INFO << "begin to listen"; using asio::ip::tcp; asio::error_code ec; - auto endpoint = detail::resolve_listen_endpoint(acceptor_->get_executor(), - address_, port_, ec); + auto endpoint = detail::resolve_listen_endpoint( + executor_->get_asio_executor(), address_, port_, ec); if (!endpoint) { ELOG_ERROR << "resolve address " << address_ << " error: " << ec.message(); return listen_errc::bad_address; } - acceptor_->open(endpoint->protocol(), ec); - if (ec) { - ELOG_ERROR << "open failed, error: " << ec.message(); - return listen_errc::open_error; - } -#ifdef __GNUC__ - acceptor_->set_option(tcp::acceptor::reuse_address(true), ec); -#endif - if (auto opt_ec = detail::set_ipv6_only_false(*acceptor_, *endpoint); - opt_ec) { - ELOG_WARN << "set v6_only(false) failed: " << opt_ec.message(); - } - acceptor_->bind(*endpoint, ec); - if (ec) { - ELOG_ERROR << "bind port " << port_ << " error: " << ec.message(); - acceptor_->cancel(ec); - acceptor_->close(ec); - return listen_errc::address_in_used; - } -#ifdef _MSC_VER - acceptor_->set_option(tcp::acceptor::reuse_address(true)); -#endif - acceptor_->listen(asio::socket_base::max_listen_connections, ec); + auto mode = ipv6_dual_stack_ ? detail::ipv6_only_mode::enable + : detail::ipv6_only_mode::disable; + ec = detail::init_tcp_acceptor(*acceptor_, *endpoint, mode); if (ec) { - ELOG_ERROR << "port " << port_ << " listen error: " << ec.message(); - acceptor_->cancel(ec); - acceptor_->close(ec); + ELOG_ERROR << "listen init failed, error: " << ec.message(); + if (ec == asio::error::address_in_use) { + return listen_errc::address_in_used; + } + if (ec == asio::error::invalid_argument || + ec == asio::error::address_family_not_supported || + ec == asio::error::fault) { + return listen_errc::bad_address; + } + if (ec == asio::error::already_open || + ec == asio::error::operation_not_supported || + ec == asio::error::no_descriptors) { + return listen_errc::open_error; + } return listen_errc::listen_error; } @@ -134,10 +111,12 @@ struct tcp_server_acceptor : public server_acceptor_base { virtual async_simple::coro::Lazy< ylt::expected> accept() override { + accept_started_.store(true, std::memory_order_release); assert(acceptor_ != std::nullopt); - auto executor = pool_->get_executor(); - asio::ip::tcp::socket socket(executor->get_asio_executor()); + auto socket_executor = pool_->get_executor(); + asio::ip::tcp::socket socket(socket_executor->get_asio_executor()); ELOG_TRACE << "start accepting from acceptor: " << address_ << ":" << port_; + co_await coro_io::dispatch(executor_->get_asio_executor()); auto error = co_await coro_io::async_accept(*acceptor_, socket); ELOG_TRACE << "get connection from acceptor: " << address_ << ":" << port_; if (error) { @@ -155,22 +134,39 @@ struct tcp_server_acceptor : public server_acceptor_base { ylt::unexpected{error}}; } else { - co_return coro_io::socket_wrapper_t{std::move(socket), executor}; + co_return coro_io::socket_wrapper_t{std::move(socket), socket_executor}; } } - virtual void close() override { - asio::dispatch(acceptor_->get_executor(), [this]() { - asio::error_code ec; - (void)acceptor_->cancel(ec); - (void)acceptor_->close(ec); + virtual void close_now() override { + if (!acceptor_) { + return; + } + if (!accept_started_.load(std::memory_order_acquire)) { + detail::close_acceptor_now(*acceptor_); + return; + } + asio::dispatch(executor_->get_asio_executor(), [this]() { + if (acceptor_) { + detail::close_acceptor_now(*acceptor_); + } }); - acceptor_close_waiter_.get_future().wait(); + } + + virtual void close() override { + close_now(); + if (accept_started_.load(std::memory_order_acquire)) { + acceptor_close_future_.wait(); + } } virtual ~tcp_server_acceptor() = default; using server_acceptor_base::server_acceptor_base; std::optional acceptor_; + coro_io::ExecutorWrapper<>* executor_ = nullptr; std::promise acceptor_close_waiter_; + std::future acceptor_close_future_ = + acceptor_close_waiter_.get_future(); + std::atomic accept_started_ = false; }; } // namespace coro_io diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 3c2144953..f86313a62 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -1242,11 +1242,11 @@ class coro_rpc_client { /* * buffer layout - * 閳瑰备鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞顑芥敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳? - * 閳逛拷eq_header 閳逛繘rgs 閳? - * 閳规壕鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞灏栨敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳? - * 閳逛縼EQ_HEADER_LEN 閳瑰€俛riable length 閳? - * 閳规柡鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞绮规敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳? + * ┌────────────────┬────────────────┐ + * │req_header │args │ + * ├────────────────┼────────────────┤ + * │REQ_HEADER_LEN │variable length │ + * └────────────────┴────────────────┘ */ template std::vector prepare_buffer(uint32_t &id, diff --git a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp index be4ff00c6..c7cb51777 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp @@ -70,6 +70,49 @@ class coro_rpc_server_base { stop // server is stopping/stopped }; + void add_acceptor(std::string_view address, uint16_t port, + bool ipv6_dual_stack = false) { + auto acc = std::make_unique(address, port); + acc->set_ipv6_dual_stack(ipv6_dual_stack); + acceptors_.push_back(std::move(acc)); + } + + void init_acceptors(std::string_view address, uint16_t port) { + auto parsed = coro_io::detail::parse_listen_address(address, port); +#if defined(__linux__) + asio::error_code ec; + asio::io_context ctx; + if (auto endpoint = coro_io::detail::resolve_listen_endpoint( + ctx.get_executor(), parsed.address, parsed.port, ec); + endpoint && + coro_io::detail::should_create_dual_stack_acceptor(*endpoint)) { + add_acceptor(parsed.address, parsed.port, true); + if (parsed.port > 0) { + add_acceptor("0.0.0.0", parsed.port); + ELOG_INFO << "Dual-stack: added IPv4 acceptor on 0.0.0.0:" + << parsed.port; + } + return; + } +#endif + add_acceptor(parsed.address, parsed.port); + } + + coro_rpc::err_code map_listen_error(coro_io::listen_errc ec) const noexcept { + switch (ec) { + case coro_io::listen_errc::address_in_used: + return coro_rpc::err_code{coro_rpc::errc::address_in_used}; + case coro_io::listen_errc::bad_address: + return coro_rpc::err_code{coro_rpc::errc::bad_address}; + case coro_io::listen_errc::open_error: + return coro_rpc::err_code{coro_rpc::errc::open_error}; + case coro_io::listen_errc::listen_error: + return coro_rpc::err_code{coro_rpc::errc::listen_error}; + default: + return coro_rpc::err_code{coro_rpc::errc::io_error}; + } + } + public: /*! * TODO: add doc @@ -90,8 +133,7 @@ class coro_rpc_server_base { flag_{stat::init}, is_enable_tcp_no_delay_(is_enable_tcp_no_delay), conn_timeout_duration_(conn_timeout_duration) { - acceptors_.push_back( - std::make_unique(address, port)); + init_acceptors(address, port); } coro_rpc_server_base(size_t thread_num, std::string address, @@ -102,8 +144,7 @@ class coro_rpc_server_base { flag_{stat::init}, is_enable_tcp_no_delay_(is_enable_tcp_no_delay), conn_timeout_duration_(conn_timeout_duration) { - acceptors_.push_back( - std::make_unique(address)); + init_acceptors(address, 0); } coro_rpc_server_base( @@ -137,8 +178,7 @@ class coro_rpc_server_base { acceptors_ = std::move(acceptors); } else { - acceptors_.push_back(std::make_unique( - config.address, config.port)); + init_acceptors(config.address, config.port); } } @@ -148,18 +188,18 @@ class coro_rpc_server_base { } #ifdef YLT_ENABLE_SSL - void init_ssl(const ssl_configure &conf) { + void init_ssl(const ssl_configure& conf) { use_ssl_ = init_ssl_context_helper(context_, conf); } #ifdef YLT_ENABLE_NTLS - void init_ntls(const ssl_ntls_configure &conf) { + void init_ntls(const ssl_ntls_configure& conf) { use_ssl_ = init_ntls_context_helper(context_, conf); } #endif // YLT_ENABLE_NTLS #endif #ifdef YLT_ENABLE_IBV void init_ibv( - const coro_io::ib_socket_t::config_t &conf = {}, + const coro_io::ib_socket_t::config_t& conf = {}, std::vector> ibv_dev_lists = {}) { ibv_config_ = conf; ibv_dev_lists_ = std::move(ibv_dev_lists); @@ -179,14 +219,14 @@ class coro_rpc_server_base { private: async_simple::Future make_error_future( - coro_rpc::err_code &&err) { + coro_rpc::err_code&& err) { async_simple::Promise p; p.setValue(std::move(err)); return p.getFuture(); } public: - const std::vector> & + const std::vector>& get_acceptors() const noexcept { return acceptors_; } @@ -203,34 +243,32 @@ class coro_rpc_server_base { return make_error_future( coro_rpc::err_code{coro_rpc::errc::server_has_ran}); } - for (auto &acceptor : acceptors_) { + for (size_t i = 0; i < acceptors_.size(); ++i) { + auto& acceptor = acceptors_[i]; acceptor->set_io_threads_pool(&pool_); auto ec = acceptor->listen(); - if (ec != coro_io::listen_errc ::ok) { - switch (ec) { - case coro_io::listen_errc::address_in_used: - errc_ = coro_rpc::err_code{coro_rpc::errc::address_in_used}; - break; - case coro_io::listen_errc::bad_address: - errc_ = coro_rpc::err_code{coro_rpc::errc::bad_address}; - break; - case coro_io::listen_errc::open_error: - errc_ = coro_rpc::err_code{coro_rpc::errc::open_error}; - break; - case coro_io::listen_errc::listen_error: - errc_ = coro_rpc::err_code{coro_rpc::errc::listen_error}; - break; - default: - errc_ = coro_rpc::err_code{coro_rpc::errc::io_error}; - break; + if (ec != coro_io::listen_errc::ok) { + errc_ = map_listen_error(ec); + for (auto& opened : acceptors_) { + opened->close_now(); } - } - if (errc_) { break; } +#if defined(__linux__) + bool needs_ipv4_acceptor = acceptor->ipv6_dual_stack() && + acceptor->port() > 0 && + acceptors_.size() == 1; + auto acceptor_port = acceptor->port(); + if (needs_ipv4_acceptor) { + add_acceptor("0.0.0.0", acceptor_port); + acceptors_.back()->set_io_threads_pool(&pool_); + ELOG_INFO << "Dual-stack: added IPv4 acceptor on 0.0.0.0:" + << acceptor_port; + } +#endif } if (!errc_) { - if constexpr (requires(typename server_config::executor_pool_t &pool) { + if constexpr (requires(typename server_config::executor_pool_t& pool) { pool.run(); }) { thd_ = std::thread([this] { @@ -247,12 +285,12 @@ class coro_rpc_server_base { async_simple::Promise promise; auto future = promise.getFuture(); accept().start([this, p = std::move(promise)]( - async_simple::Try &&res) mutable { + async_simple::Try&& res) mutable { ELOG_INFO << "server quit!"; if (res.hasError()) { try { std::rethrow_exception(res.getException()); - } catch (const std::exception &e) { + } catch (const std::exception& e) { ELOG_ERROR << "server quit with exception: " << e.what(); } stop(); @@ -260,7 +298,7 @@ class coro_rpc_server_base { p.setValue(errc_); } else { - auto &value = res.value(); + auto& value = res.value(); if (value.ec == coro_rpc::errc::operation_canceled) { ELOG_INFO << "server quit: " << value.message(); } @@ -291,11 +329,11 @@ class coro_rpc_server_base { ELOG_INFO << "begin to stop coro_rpc_server"; if (flag_ == stat::started) { - for (auto &acceptor : acceptors_) acceptor->close(); + for (auto& acceptor : acceptors_) acceptor->close(); { std::unique_lock lock(conns_mtx_); ELOG_INFO << "total connection count: " << conns_.size(); - for (auto &conn_weak : conns_) { + for (auto& conn_weak : conns_) { auto conn = conn_weak.second.lock(); if (conn && !conn->has_closed()) { conn->async_close(); @@ -331,17 +369,17 @@ class coro_rpc_server_base { template void add_subserver( - std::function + std::function dispatcher, std::unique_ptr... server) { connection_transfer_ = [dispatcher = std::move(dispatcher), server = std::make_tuple(std::move(server)...)]( - coro_io::socket_wrapper_t &&socket, + coro_io::socket_wrapper_t&& socket, std::string_view magic_number, int index = -1) mutable { std::apply( - [&dispatcher, &socket, magic_number](auto &...server) { + [&dispatcher, &socket, magic_number](auto&... server) { dispatcher(std::move(socket), magic_number, *server...); }, server); @@ -378,13 +416,13 @@ class coro_rpc_server_base { */ template - void register_handler(util::class_type_t *self) { + void register_handler(util::class_type_t* self) { router_.template register_handler(self); } template - void register_handler(util::class_type_t *self, - const auto &key) { + void register_handler(util::class_type_t* self, + const auto& key) { router_.template register_handler(self, key); } @@ -418,11 +456,11 @@ class coro_rpc_server_base { } template - void register_handler(const auto &key) { + void register_handler(const auto& key) { router_.template register_handler(key); } - auto &get_io_context_pool() noexcept { return pool_; } + auto& get_io_context_pool() noexcept { return pool_; } /*! * Set client filter callback @@ -430,7 +468,7 @@ class coro_rpc_server_base { * true to allow connection, false to reject */ void client_filter( - std::function filter) { + std::function filter) { client_filter_ = std::move(filter); } @@ -439,7 +477,7 @@ class coro_rpc_server_base { std::vector> tasks; acceptors_[0]->address(); acceptors_[0]->port(); - for (auto &acceptor : acceptors_) { + for (auto& acceptor : acceptors_) { tasks.emplace_back(accept_impl(*acceptor)); } auto results = co_await async_simple::coro::collectAny(std::move(tasks)); @@ -454,7 +492,7 @@ class coro_rpc_server_base { return ++global_conn_id; } async_simple::coro::Lazy accept_impl( - coro_io::server_acceptor_base &acceptor) { + coro_io::server_acceptor_base& acceptor) { ELOG_INFO << "begin to accept looping"; for (;;) { auto result = co_await acceptor.accept(); @@ -462,7 +500,7 @@ class coro_rpc_server_base { #ifdef UNIT_TEST_INJECT if (result.has_value()) { if (g_action == inject_action::force_inject_server_accept_error) { - coro_io::socket_wrapper_t &wrapper = result.value(); + coro_io::socket_wrapper_t& wrapper = result.value(); asio::error_code ignored_ec; wrapper.close(); g_action = inject_action::nothing; @@ -486,7 +524,7 @@ class coro_rpc_server_base { } continue; } - coro_io::socket_wrapper_t &wrapper = result.value(); + coro_io::socket_wrapper_t& wrapper = result.value(); // Client filter check if (client_filter_) { @@ -509,7 +547,7 @@ class coro_rpc_server_base { << wrapper.remote_endpoint() << "], local addr[" << wrapper.local_endpoint() << "]"; - if (auto &socket = wrapper.socket(); socket) { + if (auto& socket = wrapper.socket(); socket) { if (is_enable_tcp_no_delay_) { std::error_code error; socket->set_option(asio::ip::tcp::no_delay(true), error); @@ -524,7 +562,7 @@ class coro_rpc_server_base { auto conn = std::make_shared(std::move(wrapper), conn_timeout_duration_); conn->set_quit_callback( - [this](const uint64_t &id) { + [this](const uint64_t& id) { std::unique_lock lock(conns_mtx_); conns_.erase(id); }, @@ -536,7 +574,7 @@ class coro_rpc_server_base { ELOG_TRACE << "start new connection, conn_id:" << conn_id; start_one(std::move(conn)) .directlyStart( - [id = conn_id, this](async_simple::Try &&res) { + [id = conn_id, this](async_simple::Try&& res) { ELOG_INFO << "connection over, conn id:" << id; }, wrapper.get_executor()); @@ -552,9 +590,9 @@ class coro_rpc_server_base { ibv_dev_lists_.size()]; } - async_simple::coro::Lazy update_to_rdma(coro_connection *conn) { + async_simple::coro::Lazy update_to_rdma(coro_connection* conn) { bool init_ok = true; - auto &wrapper = conn->socket_wrapper(); + auto& wrapper = conn->socket_wrapper(); try { if (!ibv_dev_lists_.empty()) { ibv_config_->device = get_rr_device(); @@ -628,7 +666,7 @@ class coro_rpc_server_base { coro_rpc::err_code errc_ = {}; std::chrono::steady_clock::duration conn_timeout_duration_; - async_simple::util::move_only_function connection_transfer_; @@ -642,6 +680,6 @@ class coro_rpc_server_base { std::atomic rr_index_ = 0; #endif - std::function client_filter_; + std::function client_filter_; }; } // namespace coro_rpc diff --git a/include/ylt/standalone/cinatra/coro_http_server.hpp b/include/ylt/standalone/cinatra/coro_http_server.hpp index 1cf0eea59..df98b0d4b 100644 --- a/include/ylt/standalone/cinatra/coro_http_server.hpp +++ b/include/ylt/standalone/cinatra/coro_http_server.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include "cinatra/coro_http_client.hpp" #include "cinatra/coro_http_response.hpp" @@ -22,7 +23,7 @@ enum class file_resp_format_type { }; class coro_http_server { public: - coro_http_server(asio::io_context &ctx, unsigned short port, + coro_http_server(asio::io_context& ctx, unsigned short port, std::string address = "0.0.0.0") : out_ctx_(&ctx), port_(port), acceptor_(ctx), check_timer_(ctx) { out_executor_ = @@ -30,7 +31,7 @@ class coro_http_server { init_address(std::move(address)); } - coro_http_server(asio::io_context &ctx, + coro_http_server(asio::io_context& ctx, std::string address /* = "0.0.0.0:9001" */) : out_ctx_(&ctx), acceptor_(ctx), check_timer_(ctx) { out_executor_ = @@ -70,8 +71,8 @@ class coro_http_server { } #ifdef CINATRA_ENABLE_SSL - void init_ssl(const std::string &cert_file, const std::string &key_file, - const std::string &passwd) { + void init_ssl(const std::string& cert_file, const std::string& key_file, + const std::string& passwd) { cert_file_ = cert_file; key_file_ = key_file; passwd_ = passwd; @@ -108,13 +109,13 @@ class coro_http_server { * @param ca_cert_file CA certificate file path (optional) * @param enable_client_verify enable client certificate verification */ - void init_ntls(const std::string &sign_cert_file, - const std::string &sign_key_file, - const std::string &enc_cert_file, - const std::string &enc_key_file, - const std::string &ca_cert_file = "", + void init_ntls(const std::string& sign_cert_file, + const std::string& sign_key_file, + const std::string& enc_cert_file, + const std::string& enc_key_file, + const std::string& ca_cert_file = "", bool enable_client_verify = false, - const std::string &passwd = "") { + const std::string& passwd = "") { ntls_config_.sign_cert_file = sign_cert_file; ntls_config_.sign_key_file = sign_key_file; ntls_config_.enc_cert_file = enc_cert_file; @@ -130,10 +131,10 @@ class coro_http_server { * Initialize NTLS with base path and relative file paths */ void init_ntls( - const std::string &base_path, const std::string &sign_cert_file, - const std::string &sign_key_file, const std::string &enc_cert_file, - const std::string &enc_key_file, const std::string &ca_cert_file = "", - bool enable_client_verify = false, const std::string &cipher_suites = "", + const std::string& base_path, const std::string& sign_cert_file, + const std::string& sign_key_file, const std::string& enc_cert_file, + const std::string& enc_key_file, const std::string& ca_cert_file = "", + bool enable_client_verify = false, const std::string& cipher_suites = "", coro_http_connection::ntls_mode mode = coro_http_connection::ntls_mode::tlcp_dual_cert) { ntls_config_.base_path = base_path; @@ -167,11 +168,11 @@ class coro_http_server { /*! * Initialize NTLS with RFC 8998 TLS 1.3 + GM single certificate mode */ - void init_ntls(const std::string &base_path, const std::string &gm_cert_file, - const std::string &gm_key_file, - const std::string &ca_cert_file = "", + void init_ntls(const std::string& base_path, const std::string& gm_cert_file, + const std::string& gm_key_file, + const std::string& ca_cert_file = "", bool enable_client_verify = false, - const std::string &cipher_suites = "") { + const std::string& cipher_suites = "") { ntls_config_.base_path = base_path; ntls_config_.mode = coro_http_connection::ntls_mode::tls13_single_cert; ntls_config_.gm_cert_file = gm_cert_file; @@ -188,7 +189,7 @@ class coro_http_server { /*! * Set NTLS cipher suites */ - void set_ntls_cipher_suites(const std::string &cipher_suites) { + void set_ntls_cipher_suites(const std::string& cipher_suites) { ntls_config_.cipher_suites = cipher_suites; } #endif // YLT_ENABLE_NTLS @@ -216,15 +217,23 @@ class coro_http_server { }); } - accept().start([p = std::move(promise), this](auto &&res) mutable { - if (res.hasError()) { - errc_ = std::make_error_code(std::errc::io_error); - p.setValue(errc_); - } - else { - p.setValue(res.value()); - } - }); + if (acceptor_v4_) { + accept(*acceptor_v4_, acceptor_v4_close_waiter_) + .via(out_ctx_ == nullptr ? pool_->get_executor() + : out_executor_.get()) + .detach(); + } + + accept(acceptor_, acceptor_close_waiter_) + .start([p = std::move(promise), this](auto&& res) mutable { + if (res.hasError()) { + errc_ = std::make_error_code(std::errc::io_error); + p.setValue(errc_); + } + else { + p.setValue(res.value()); + } + }); } else { promise.setValue(errc_); @@ -249,7 +258,7 @@ class coro_http_server { // close current connections. { std::scoped_lock lock(*conn_mtx_); - for (auto &conn : connections_) { + for (auto& conn : connections_) { conn.second->close(false); } connections_.clear(); @@ -276,7 +285,7 @@ class coro_http_server { uint16_t port() const { return port_; } template - void set_http_handler(std::string key, Func handler, Aspects &&...asps) { + void set_http_handler(std::string key, Func handler, Aspects&&... asps) { static_assert(sizeof...(method) >= 1, "must set http_method"); if constexpr (sizeof...(method) == 1) { (router_.set_http_handler(std::move(key), std::move(handler), @@ -292,7 +301,7 @@ class coro_http_server { template void set_http_handler(std::string key, Func handler, - util::class_type_t &owner, Aspects &&...asps) { + util::class_type_t& owner, Aspects&&... asps) { static_assert(std::is_member_function_pointer_v, "must be member function"); using return_type = typename util::function_traits::return_type; @@ -319,7 +328,7 @@ class coro_http_server { coro_io::load_balance_algorithm type = coro_io::load_balance_algorithm::random, std::vector weights = {}, - Aspects &&...aspects) { + Aspects&&... aspects) { if (hosts.empty()) { throw std::invalid_argument("not config hosts yet!"); } @@ -330,11 +339,11 @@ class coro_http_server { hosts, {.lba = type}, weights)); auto handler = [this, load_balancer, type]( - coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await load_balancer->send_request( [this, &req, &response]( - coro_http_client &client, + coro_http_client& client, std::string_view host) -> async_simple::coro::Lazy { co_await reply(client, host, req, response); }); @@ -359,7 +368,7 @@ class coro_http_server { coro_io::load_balance_algorithm type = coro_io::load_balance_algorithm::random, std::vector weights = {}, - Aspects &&...aspects) { + Aspects&&... aspects) { if (hosts.empty()) { throw std::invalid_argument("not config hosts yet!"); } @@ -371,7 +380,7 @@ class coro_http_server { set_http_handler( url_path, - [load_balancer](coro_http_request &req, coro_http_response &resp) + [load_balancer](coro_http_request& req, coro_http_response& resp) -> async_simple::coro::Lazy { websocket_result result{}; while (true) { @@ -386,7 +395,7 @@ class coro_http_server { } auto ret = co_await load_balancer->send_request( - [&req, result](coro_http_client &client, std::string_view host) + [&req, result](coro_http_client& client, std::string_view host) -> async_simple::coro::Lazy { auto r = co_await client.write_websocket(std::string(result.data)); @@ -415,7 +424,7 @@ class coro_http_server { void set_max_size_of_cache_files(size_t max_size = 3 * 1024 * 1024) { std::error_code ec; - for (const auto &file : + for (const auto& file : std::filesystem::recursive_directory_iterator(static_dir_, ec)) { if (ec) { continue; @@ -438,7 +447,7 @@ class coro_http_server { } } - const coro_http_router &get_router() const { return router_; } + const coro_http_router& get_router() const { return router_; } void set_file_resp_format_type(file_resp_format_type type) { format_type_ = type; @@ -454,7 +463,7 @@ class coro_http_server { template void set_static_res_dir(std::string_view uri_suffix = "", - std::string file_path = "www", Aspects &&...aspects) { + std::string file_path = "www", Aspects&&... aspects) { bool has_double_dot = (file_path.find("..") != std::string::npos) || (uri_suffix.find("..") != std::string::npos); if (std::filesystem::path(file_path).has_root_path() || @@ -484,7 +493,7 @@ class coro_http_server { files_.clear(); std::error_code ec; - for (const auto &file : + for (const auto& file : std::filesystem::recursive_directory_iterator(static_dir_, ec)) { if (ec) { continue; @@ -498,7 +507,7 @@ class coro_http_server { std::filesystem::path(static_dir_router_path_); std::string uri; - for (auto &file : files_) { + for (auto& file : files_) { auto relative_path = std::filesystem::path(file.substr(static_dir_.length())).string(); if (size_t pos = relative_path.find('\\') != std::string::npos) { @@ -518,8 +527,8 @@ class coro_http_server { set_http_handler( uri, [this, file_name = file]( - coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { std::string_view extension = get_extension(file_name); std::string_view mime = get_mime_type(extension); auto range_str = req.get_header_value("Range"); @@ -529,7 +538,7 @@ class coro_http_server { auto range_header = build_range_header( mime, file_name, std::to_string(fs::file_size(file_name))); resp.set_delay(true); - std::string &body = it->second; + std::string& body = it->second; std::array arr{asio::buffer(range_header), asio::buffer(body)}; co_await req.get_conn()->async_write(arr); @@ -631,7 +640,7 @@ class coro_http_server { } for (size_t i = 0; i < ranges.size(); i++) { - std::string &part_header = multi_heads[i]; + std::string& part_header = multi_heads[i]; r = co_await req.get_conn()->write_data(part_header); if (!r) { co_return; @@ -708,7 +717,7 @@ class coro_http_server { void set_shrink_to_fit(bool r) { need_shrink_every_time_ = r; } void set_default_handler(std::function( - coro_http_request &, coro_http_response &)> + coro_http_request&, coro_http_response&)> handler) { default_handler_ = std::move(handler); } @@ -727,11 +736,20 @@ class coro_http_server { * true to allow connection, false to reject */ void client_filter( - std::function filter) { + std::function filter) { client_filter_ = std::move(filter); } private: + std::error_code init_acceptor(asio::ip::tcp::acceptor& acceptor, + const asio::ip::tcp::endpoint& endpoint, + bool ipv6_only = false) { + return coro_io::detail::init_tcp_acceptor( + acceptor, endpoint, + ipv6_only ? coro_io::detail::ipv6_only_mode::enable + : coro_io::detail::ipv6_only_mode::disable); + } + std::error_code listen() { CINATRA_LOG_INFO << "begin to listen " << port_; using asio::ip::tcp; @@ -748,36 +766,13 @@ class coro_http_server { return std::make_error_code(std::errc::address_not_available); } - acceptor_.open(endpoint->protocol(), ec); - if (ec) { + bool need_dual_stack = + coro_io::detail::should_create_dual_stack_acceptor(*endpoint); + if (auto init_ec = init_acceptor(acceptor_, *endpoint, need_dual_stack); + init_ec) { CINATRA_LOG_ERROR << "acceptor open failed" - << " error: " << ec.message(); - return ec; - } -#ifdef __GNUC__ - acceptor_.set_option(tcp::acceptor::reuse_address(true), ec); -#endif - if (auto opt_ec = - coro_io::detail::set_ipv6_only_false(acceptor_, *endpoint); - opt_ec) { - CINATRA_LOG_WARNING << "set v6_only(false) failed: " << opt_ec.message(); - } - acceptor_.bind(*endpoint, ec); - if (ec) { - CINATRA_LOG_ERROR << "bind port: " << port_ << " error: " << ec.message(); - std::error_code ignore_ec; - acceptor_.cancel(ignore_ec); - acceptor_.close(ignore_ec); - return ec; - } -#ifdef _MSC_VER - acceptor_.set_option(tcp::acceptor::reuse_address(true)); -#endif - acceptor_.listen(asio::socket_base::max_listen_connections, ec); - if (ec) { - CINATRA_LOG_ERROR << "get local endpoint port: " << port_ - << " listen error: " << ec.message(); - return ec; + << " error: " << init_ec.message(); + return init_ec; } auto end_point = acceptor_.local_endpoint(ec); @@ -788,12 +783,26 @@ class coro_http_server { } port_ = end_point.port(); + if (need_dual_stack) { + acceptor_v4_.emplace(acceptor_.get_executor()); + if (auto init_ec = init_acceptor( + *acceptor_v4_, coro_io::detail::make_ipv4_any_endpoint(port_)); + init_ec) { + CINATRA_LOG_ERROR << "IPv4 acceptor init failed" + << " error: " << init_ec.message(); + coro_io::detail::close_acceptor_now(acceptor_); + acceptor_v4_.reset(); + return init_ec; + } + CINATRA_LOG_INFO << "listen IPv4 port " << port_ << " successfully"; + } + CINATRA_LOG_INFO << "listen port " << port_ << " successfully"; return {}; } public: - void transfer_connection(coro_io::socket_wrapper_t &&soc, + void transfer_connection(coro_io::socket_wrapper_t&& soc, std::string_view head_msg) { auto conn = accept_impl(std::move(soc), true); conn->add_head(head_msg); @@ -802,7 +811,7 @@ class coro_http_server { private: std::shared_ptr accept_impl( - coro_io::socket_wrapper_t &&socket, bool is_transfer_connect = false) { + coro_io::socket_wrapper_t&& socket, bool is_transfer_connect = false) { uint64_t conn_id = ++conn_id_; CINATRA_LOG_DEBUG << "new connection comming, id: " << conn_id; auto conn = std::make_shared( @@ -848,7 +857,7 @@ class coro_http_server { #endif std::weak_ptr weak(conn_mtx_); conn->set_quit_callback( - [this, weak](const uint64_t &id) { + [this, weak](const uint64_t& id) { auto mtx = weak.lock(); if (mtx) { std::scoped_lock lock(*mtx); @@ -864,25 +873,24 @@ class coro_http_server { return conn; } - async_simple::coro::Lazy accept() { + async_simple::coro::Lazy accept( + asio::ip::tcp::acceptor& acceptor, std::promise& close_waiter) { for (;;) { - coro_io::ExecutorWrapper<> *executor; + coro_io::ExecutorWrapper<>* executor; if (out_ctx_ == nullptr) { executor = pool_->get_executor(); } else { - out_executor_ = std::make_unique>( - out_ctx_->get_executor()); executor = out_executor_.get(); } asio::ip::tcp::socket socket(executor->get_asio_executor()); - auto error = co_await coro_io::async_accept(acceptor_, socket); + auto error = co_await coro_io::async_accept(acceptor, socket); if (error) { CINATRA_LOG_INFO << "accept failed, error: " << error.message(); if (error == asio::error::operation_aborted || error == asio::error::bad_descriptor) { - acceptor_close_waiter_.set_value(); + close_waiter.set_value(); co_return error; } continue; @@ -930,7 +938,17 @@ class coro_http_server { acceptor_.cancel(ec); acceptor_.close(ec); }); + if (acceptor_v4_) { + asio::dispatch(acceptor_v4_->get_executor(), [this]() { + asio::error_code ec; + acceptor_v4_->cancel(ec); + acceptor_v4_->close(ec); + }); + } acceptor_close_waiter_.get_future().wait(); + if (acceptor_v4_) { + acceptor_v4_close_waiter_.get_future().wait(); + } } void start_check_timer() { @@ -975,9 +993,9 @@ class coro_http_server { return header_str; } - std::vector build_part_heads(auto &ranges, std::string_view mime, + std::vector build_part_heads(auto& ranges, std::string_view mime, std::string_view file_size_str, - size_t &content_len) { + size_t& content_len) { std::vector multi_heads; for (auto [start, end] : ranges) { std::string part_header = "--"; @@ -1021,8 +1039,8 @@ class coro_http_server { return header_str; } - async_simple::coro::Lazy send_single_part(auto &in_file, auto &content, - auto &req, auto &resp, + async_simple::coro::Lazy send_single_part(auto& in_file, auto& content, + auto& req, auto& resp, size_t part_size, std::string_view more = "") { while (true) { @@ -1062,16 +1080,16 @@ class coro_http_server { } template - size_t erase_if(std::span &sp, Pred p) { + size_t erase_if(std::span& sp, Pred p) { auto it = std::remove_if(sp.begin(), sp.end(), p); size_t count = sp.end() - it; sp = std::span(sp.data(), sp.data() + count); return count; } - int remove_result_headers(resp_data &result, std::string_view value) { + int remove_result_headers(resp_data& result, std::string_view value) { bool r = false; - return erase_if(result.resp_headers, [&](http_header &header) { + return erase_if(result.resp_headers, [&](http_header& header) { if (r) { return false; } @@ -1082,13 +1100,13 @@ class coro_http_server { }); } - void handle_response_header(resp_data &result, std::string &length) { + void handle_response_header(resp_data& result, std::string& length) { int r = remove_result_headers(result, "chunked"); if (r == 0) { r = remove_result_headers(result, "multipart/form-data"); if (r) { length = std::to_string(result.resp_body.size()); - for (auto &[key, val] : result.resp_headers) { + for (auto& [key, val] : result.resp_headers) { if (key == "Content-Length") { val = length; break; @@ -1098,10 +1116,10 @@ class coro_http_server { } } - async_simple::coro::Lazy reply(coro_http_client &client, + async_simple::coro::Lazy reply(coro_http_client& client, std::string_view host, - coro_http_request &req, - coro_http_response &response) { + coro_http_request& req, + coro_http_response& response) { uri_t uri; std::string proxy_host; @@ -1113,7 +1131,7 @@ class coro_http_server { uri.parse_from(host.data()); } std::unordered_map req_headers; - for (auto &[k, v] : req.get_headers()) { + for (auto& [k, v] : req.get_headers()) { req_headers.emplace(k, v); } req_headers["Host"] = uri.host; @@ -1133,51 +1151,29 @@ class coro_http_server { response.set_delay(true); } - bool is_ip_v6(std::string_view address) { - asio::ip::address_v6::bytes_type bytes; - unsigned long scope_id = 0; - - asio::error_code ec; - return asio::detail::socket_ops::inet_pton(ASIO_OS_DEF(AF_INET6), - address.data(), &bytes[0], - &scope_id, ec) > 0; - } - void init_address(std::string address) { #if __has_include() easylog::logger<>::instance(); // init easylog singleton to make sure // server destruct before easylog. #endif - if (size_t pos = address.find(':'); - pos != std::string::npos && !is_ip_v6(address)) { - auto port_sv = std::string_view(address).substr(pos + 1); - - uint16_t port; - auto [ptr, ec] = std::from_chars( - port_sv.data(), port_sv.data() + port_sv.size(), port, 10); - if (ec != std::errc{}) { - address_ = std::move(address); - return; - } - - port_ = port; - address = address.substr(0, pos); - } - - address_ = std::move(address); + auto parsed = coro_io::detail::parse_listen_address(address, port_); + address_ = std::move(parsed.address); + port_ = parsed.port; } private: std::unique_ptr pool_; - asio::io_context *out_ctx_ = nullptr; + asio::io_context* out_ctx_ = nullptr; std::unique_ptr> out_executor_ = nullptr; uint16_t port_; std::string address_; std::error_code errc_ = {}; asio::ip::tcp::acceptor acceptor_; + std::optional acceptor_v4_; std::thread thd_; std::mutex thd_mtx_; std::promise acceptor_close_waiter_; + std::promise acceptor_v4_close_waiter_; bool no_delay_ = true; uint64_t conn_id_ = 0; @@ -1233,8 +1229,8 @@ class coro_http_server { #endif coro_http_router router_; bool need_shrink_every_time_ = false; - std::function(coro_http_request &, - coro_http_response &)> + std::function(coro_http_request&, + coro_http_response&)> default_handler_ = nullptr; int64_t max_http_body_len_ = INT64_MAX; #ifdef INJECT_FOR_HTTP_SEVER_TEST @@ -1242,7 +1238,7 @@ class coro_http_server { bool read_failed_forever_ = false; #endif - std::function client_filter_; + std::function client_filter_; }; using http_server = coro_http_server; diff --git a/src/coro_http/tests/test_cinatra.cpp b/src/coro_http/tests/test_cinatra.cpp index 0d7389321..0c69d6209 100644 --- a/src/coro_http/tests/test_cinatra.cpp +++ b/src/coro_http/tests/test_cinatra.cpp @@ -1631,15 +1631,17 @@ TEST_CASE("test coro_http_client connect/request timeout") { TEST_CASE("test out io_contex server") { auto executor = coro_io::get_global_executor()->get_asio_executor(); - coro_http_server server(executor.context(), "0.0.0.0:8002"); + coro_http_server server(executor.context(), "127.0.0.1:0"); server.set_no_delay(true); server.set_http_handler("/", [](request &req, response &res) { res.set_status_and_content(status_type::ok, "hello"); }); server.async_start(); + REQUIRE(server.port() > 0); coro_http_client client{}; - auto result = client.get("http://127.0.0.1:8002/"); + auto result = + client.get("http://127.0.0.1:" + std::to_string(server.port()) + "/"); CHECK(result.status == 200); server.stop(); } diff --git a/src/coro_http/tests/test_coro_http_server.cpp b/src/coro_http/tests/test_coro_http_server.cpp index e8bae2fcd..dc97ce19b 100644 --- a/src/coro_http/tests/test_coro_http_server.cpp +++ b/src/coro_http/tests/test_coro_http_server.cpp @@ -11,6 +11,8 @@ #include #include +#include "asio/io_context.hpp" +#include "asio/ip/v6_only.hpp" #include "async_simple/coro/Lazy.h" #include "async_simple/coro/SyncAwait.h" #include "doctest.h" @@ -21,6 +23,24 @@ using namespace coro_http; using namespace std::chrono_literals; +namespace { +asio::error_code bind_ipv6_only_probe(uint16_t port) { + asio::io_context ctx; + asio::ip::tcp::acceptor probe(ctx); + asio::error_code ec; + probe.open(asio::ip::tcp::v6(), ec); + if (ec) { + return ec; + } + probe.set_option(asio::ip::v6_only(true), ec); + if (ec) { + return ec; + } + probe.bind({asio::ip::address_v6::any(), port}, ec); + return ec; +} +} // namespace + TEST_CASE("test parse ranges") { bool is_valid = true; auto vec = parse_ranges("200-999", 10000, is_valid); @@ -89,7 +109,7 @@ TEST_CASE("coro_io post") { try { std::rethrow_exception(t4.getException()); - } catch (const std::exception &e) { + } catch (const std::exception& e) { CHECK(e.what() == std::string("e")); CINATRA_LOG_DEBUG << e.what() << "\n"; } @@ -99,7 +119,7 @@ TEST_CASE("coro_server example, will block") { return; // remove this line when you run the coro server. cinatra::coro_http_server server(std::thread::hardware_concurrency(), 9001); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &resp) { + "/", [](coro_http_request& req, coro_http_response& resp) { // response in io thread. std::this_thread::sleep_for(10ms); resp.set_status_and_content(cinatra::status_type::ok, "hello world"); @@ -107,8 +127,8 @@ TEST_CASE("coro_server example, will block") { server.set_http_handler( "/coro", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { // coroutine in other thread. std::this_thread::sleep_for(10ms); @@ -118,7 +138,7 @@ TEST_CASE("coro_server example, will block") { }); server.set_http_handler( - "/echo", [](coro_http_request &req, coro_http_response &resp) { + "/echo", [](coro_http_request& req, coro_http_response& resp) { // response in io thread. resp.set_status_and_content(cinatra::status_type::ok, "hello world"); }); @@ -126,6 +146,92 @@ TEST_CASE("coro_server example, will block") { CHECK(server.port() > 0); } +TEST_CASE("http listen on ipv6 any accepts ipv4 and ipv6 connections") { + coro_http_server server(static_cast(1), + static_cast(0), std::string("::"), + false); + server.set_http_handler( + "/", [](coro_http_request& req, coro_http_response& resp) { + resp.set_status_and_content(status_type::ok, "dual stack ok"); + }); + + auto start_result = server.async_start(); + CHECK(!start_result.hasResult()); + REQUIRE(server.port() > 0); + + std::this_thread::sleep_for(100ms); + + coro_http_client client{}; + client.set_conn_timeout(std::chrono::seconds{1}); + client.set_req_timeout(std::chrono::seconds{1}); + + auto result_v4 = + client.get("http://127.0.0.1:" + std::to_string(server.port()) + "/"); + CHECK(result_v4.status == 200); + CHECK(result_v4.resp_body == "dual stack ok"); + + auto result_v6 = + client.get("http://[::1]:" + std::to_string(server.port()) + "/"); + CHECK(result_v6.status == 200); + CHECK(result_v6.resp_body == "dual stack ok"); + + server.stop(); +} + +TEST_CASE("http ipv6 any address string accepts ipv4 and ipv6 connections") { + coro_http_server server(static_cast(1), std::string("[::]:0"), false); + server.set_http_handler( + "/", [](coro_http_request& req, coro_http_response& resp) { + resp.set_status_and_content(status_type::ok, "dual stack string ok"); + }); + + auto start_result = server.async_start(); + CHECK(!start_result.hasResult()); + REQUIRE(server.port() > 0); + + std::this_thread::sleep_for(100ms); + + coro_http_client client{}; + client.set_conn_timeout(std::chrono::seconds{1}); + client.set_req_timeout(std::chrono::seconds{1}); + + auto result_v4 = + client.get("http://127.0.0.1:" + std::to_string(server.port()) + "/"); + CHECK(result_v4.status == 200); + CHECK(result_v4.resp_body == "dual stack string ok"); + + auto result_v6 = + client.get("http://[::1]:" + std::to_string(server.port()) + "/"); + CHECK(result_v6.status == 200); + CHECK(result_v6.resp_body == "dual stack string ok"); + + server.stop(); +} + +#if defined(__linux__) +TEST_CASE("http failed second acceptor closes primary acceptor") { + asio::io_context ctx; + asio::ip::tcp::acceptor ipv4_blocker(ctx); + asio::error_code ec; + ipv4_blocker.open(asio::ip::tcp::v4(), ec); + REQUIRE(!ec); + ipv4_blocker.bind({asio::ip::address_v4::any(), 0}, ec); + REQUIRE(!ec); + ipv4_blocker.listen(asio::socket_base::max_listen_connections, ec); + REQUIRE(!ec); + + auto port = ipv4_blocker.local_endpoint().port(); + + coro_http_server server(static_cast(1), port, std::string("::"), + false); + auto start_error = server.async_start().get(); + REQUIRE(start_error); + + auto probe_error = bind_ipv6_only_probe(port); + CHECK_MESSAGE(!probe_error, probe_error.message()); +} +#endif + template bool create_file(View filename, size_t file_size = 1024) { CINATRA_LOG_DEBUG << "begin to open file: " << filename << "\n"; @@ -143,12 +249,12 @@ bool create_file(View filename, size_t file_size = 1024) { TEST_CASE("test redirect") { coro_http_server server(1, 9001); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &resp) { + "/", [](coro_http_request& req, coro_http_response& resp) { resp.redirect("/test"); }); server.set_http_handler( - "/test", [](coro_http_request &req, coro_http_response &resp) { + "/test", [](coro_http_request& req, coro_http_response& resp) { resp.set_status_and_content(status_type::ok, "redirect ok"); }); @@ -170,8 +276,8 @@ TEST_CASE("test post") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( "/echo", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { resp.set_status_and_content(status_type::ok, std::string(req.get_body())); co_return; @@ -196,8 +302,8 @@ TEST_CASE("test multiple download") { coro_http_server server(1, 9001); server.set_http_handler( "/", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { // multipart_reader_t multipart(resp.get_conn()); bool ok; if (ok = co_await resp.get_conn()->begin_multipart(); !ok) { @@ -206,7 +312,7 @@ TEST_CASE("test multiple download") { std::vector vec{"hello", " world", " chunked"}; - for (auto &str : vec) { + for (auto& str : vec) { if (ok = co_await resp.get_conn()->write_multipart(str, "text/plain"); !ok) { co_return; @@ -217,8 +323,8 @@ TEST_CASE("test multiple download") { }); server.set_http_handler( "/multipart", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { bool ok; if (ok = co_await resp.get_conn()->begin_multipart(); !ok) { co_return; @@ -226,7 +332,7 @@ TEST_CASE("test multiple download") { std::vector vec{"hello", " world", " multipart"}; - for (auto &str : vec) { + for (auto& str : vec) { if (ok = co_await resp.get_conn()->write_multipart(str, "text/plain"); !ok) { co_return; @@ -328,12 +434,12 @@ TEST_CASE("test range download") { class my_object { public: - void normal(coro_http_request &req, coro_http_response &response) { + void normal(coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); } - async_simple::coro::Lazy lazy(coro_http_request &req, - coro_http_response &response) { + async_simple::coro::Lazy lazy(coro_http_request& req, + coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok lazy"); co_return; } @@ -341,38 +447,38 @@ class my_object { TEST_CASE("set http handler") { cinatra::coro_http_server server(1, 9001); - auto &router = server.get_router(); - auto &handlers = router.get_handlers(); + auto& router = server.get_router(); + auto& handlers = router.get_handlers(); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); CHECK(handlers.size() == 1); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); CHECK(handlers.size() == 1); server.set_http_handler( - "/aa", [](coro_http_request &req, coro_http_response &response) { + "/aa", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); CHECK(handlers.size() == 2); server.set_http_handler( - "/bb", [](coro_http_request &req, coro_http_response &response) { + "/bb", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); CHECK(handlers.size() == 4); cinatra::coro_http_server server2(1, 9001); server2.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); - auto &handlers2 = server2.get_router().get_handlers(); + auto& handlers2 = server2.get_router().get_handlers(); CHECK(handlers2.size() == 1); my_object o{}; @@ -382,13 +488,13 @@ TEST_CASE("set http handler") { CHECK(handlers2.size() == 2); auto coro_func = - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { response.set_status_and_content(status_type::ok, "ok"); co_return; }; - auto &coro_handlers = router.get_coro_handlers(); + auto& coro_handlers = router.get_coro_handlers(); server.set_http_handler("/", coro_func); CHECK(coro_handlers.size() == 1); server.set_http_handler("/", coro_func); @@ -411,8 +517,8 @@ TEST_CASE("test llm chunked stream api") { coro_http_server server(2, 9000); server.set_http_handler( "/llm_api", - [&](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [&](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { resp.set_format_type(format_type::chunked); resp.add_header("Content-Type", "text/event-stream"); resp.add_header("Cache-Control", "no-cache"); @@ -420,7 +526,7 @@ TEST_CASE("test llm chunked stream api") { co_await resp.get_conn()->begin_chunked(); - for (auto &msg : llm_msgs) { + for (auto& msg : llm_msgs) { co_await resp.get_conn()->write_chunked(msg); } co_await resp.get_conn()->end_chunked(); @@ -485,7 +591,7 @@ TEST_CASE("get post") { cinatra::coro_http_server server(1, 9001); server.set_shrink_to_fit(true); server.set_http_handler( - "/test", [](coro_http_request &req, coro_http_response &resp) { + "/test", [](coro_http_request& req, coro_http_response& resp) { auto value = req.get_header_value("connection"); CHECK(!value.empty()); @@ -512,7 +618,7 @@ TEST_CASE("get post") { }); server.set_http_handler( - "/test1", [](coro_http_request &req, coro_http_response &resp) { + "/test1", [](coro_http_request& req, coro_http_response& resp) { CHECK(req.get_method() == "POST"); CHECK(req.get_url() == "/test1"); CHECK(req.get_conn()->local_address() == "127.0.0.1:9001"); @@ -526,8 +632,8 @@ TEST_CASE("get post") { server.set_http_handler( "/test_coro", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { co_await coro_io::post([&] { resp.set_status(cinatra::status_type::ok); resp.set_content("hello world in coro"); @@ -535,13 +641,13 @@ TEST_CASE("get post") { }); server.set_http_handler( - "/empty", [](coro_http_request &req, coro_http_response &resp) { + "/empty", [](coro_http_request& req, coro_http_response& resp) { resp.add_header("Host", "Cinatra"); resp.set_status_and_content(cinatra::status_type::ok, ""); }); server.set_http_handler( - "/close", [](coro_http_request &req, coro_http_response &resp) { + "/close", [](coro_http_request& req, coro_http_response& resp) { resp.set_keepalive(false); resp.set_status_and_content(cinatra::status_type::ok, "hello"); resp.get_conn()->close(); @@ -572,9 +678,9 @@ TEST_CASE("get post") { result = client.get("http://127.0.0.1:9001/empty"); CHECK(result.status == 200); - auto &headers = result.resp_headers; + auto& headers = result.resp_headers; auto it = - std::find_if(headers.begin(), headers.end(), [](http_header &header) { + std::find_if(headers.begin(), headers.end(), [](http_header& header) { return header.name == "Host" && header.value == "Cinatra"; }); CHECK(it != headers.end()); @@ -589,7 +695,7 @@ TEST_CASE("get post") { TEST_CASE("test alias") { http_server server(1, 9001); - server.set_http_handler("/", [](request &req, response &resp) { + server.set_http_handler("/", [](request& req, response& resp) { resp.set_status_and_content(status_type::ok, "ok"); }); server.async_start(); @@ -601,12 +707,12 @@ TEST_CASE("test alias") { } struct log_t { - bool before(coro_http_request &, coro_http_response &) { + bool before(coro_http_request&, coro_http_response&) { CINATRA_LOG_DEBUG << "before log"; return true; } - bool after(coro_http_request &, coro_http_response &res) { + bool after(coro_http_request&, coro_http_response& res) { CINATRA_LOG_DEBUG << "after log"; res.add_header("aaaa", "bbcc"); return true; @@ -614,14 +720,14 @@ struct log_t { }; struct check_t { - bool before(coro_http_request &, coro_http_response &) { + bool before(coro_http_request&, coro_http_response&) { CINATRA_LOG_DEBUG << "check before"; return true; } }; struct check_t1 { - bool before(coro_http_request &, coro_http_response &resp) { + bool before(coro_http_request&, coro_http_response& resp) { CINATRA_LOG_DEBUG << "check1 before"; resp.set_status_and_content(status_type::bad_request, "check failed"); return false; @@ -629,7 +735,7 @@ struct check_t1 { }; struct get_data { - bool before(coro_http_request &req, coro_http_response &res) { + bool before(coro_http_request& req, coro_http_response& res) { req.set_aspect_data("hello", "world"); return true; } @@ -645,7 +751,7 @@ TEST_CASE("test aspects") { server.set_static_res_dir("", "", log_t{}, check_t{}); server.set_http_handler( "/", - [](coro_http_request &req, coro_http_response &resp) { + [](coro_http_request& req, coro_http_response& resp) { resp.add_header("aaaa", "bbcc"); resp.set_status_and_content(status_type::ok, "ok"); }, @@ -653,9 +759,9 @@ TEST_CASE("test aspects") { server.set_http_handler( "/aspect", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { - auto &val = req.get_aspect_data(); + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { + auto& val = req.get_aspect_data(); CHECK(val[0] == "hello"); CHECK(val[1] == "world"); resp.set_status_and_content(status_type::ok, "ok"); @@ -664,7 +770,7 @@ TEST_CASE("test aspects") { get_data{}); server.set_http_handler( "/check1", - [](coro_http_request &req, coro_http_response &resp) { + [](coro_http_request& req, coro_http_response& resp) { resp.set_status_and_content(status_type::ok, "ok"); }, check_t1{}, log_t{}); @@ -673,7 +779,7 @@ TEST_CASE("test aspects") { coro_http_client client{}; auto result = client.get("http://127.0.0.1:9001/"); - auto check = [](auto &result) { + auto check = [](auto& result) { bool has_str = false; for (auto [k, v] : result.resp_headers) { if (k == "aaaa") { @@ -705,7 +811,7 @@ TEST_CASE("use out context") { auto executor = coro_io::get_global_executor()->get_asio_executor(); cinatra::coro_http_server server(executor.context(), 9001); server.set_http_handler( - "/out_ctx", [](coro_http_request &req, coro_http_response &resp) { + "/out_ctx", [](coro_http_request& req, coro_http_response& resp) { resp.set_status_and_content(status_type::ok, "use out ctx"); }); @@ -727,8 +833,8 @@ TEST_CASE("delay reply, server stop, form-urlencode, qureies, throw") { server.set_http_handler( "/delay2", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { resp.set_delay(true); std::this_thread::sleep_for(200ms); resp.set_status_and_content(status_type::ok, "delay reply in coro"); @@ -736,7 +842,7 @@ TEST_CASE("delay reply, server stop, form-urlencode, qureies, throw") { }); server.set_http_handler( - "/form-urlencode", [](coro_http_request &req, coro_http_response &resp) { + "/form-urlencode", [](coro_http_request& req, coro_http_response& resp) { CHECK(req.get_body() == "theCityName=58367&aa=%22bbb%22"); CHECK(req.get_query_value("theCityName") == "58367"); CHECK(req.get_decode_query_value("aa") == "\"bbb\""); @@ -746,30 +852,30 @@ TEST_CASE("delay reply, server stop, form-urlencode, qureies, throw") { }); server.set_http_handler( - "/throw", [](coro_http_request &req, coro_http_response &resp) { + "/throw", [](coro_http_request& req, coro_http_response& resp) { CHECK(req.get_boundary().empty()); throw std::invalid_argument("invalid arguments"); resp.set_status_and_content(status_type::ok, "ok"); }); server.set_http_handler( "/coro_throw", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_boundary().empty()); throw std::invalid_argument("invalid arguments"); resp.set_status_and_content(status_type::ok, "ok"); co_return; }); server.set_http_handler( - "/throw1", [](coro_http_request &req, coro_http_response &resp) { + "/throw1", [](coro_http_request& req, coro_http_response& resp) { CHECK(req.get_boundary().empty()); throw 1; resp.set_status_and_content(status_type::ok, "ok"); }); server.set_http_handler( "/coro_throw1", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_boundary().empty()); throw 1; resp.set_status_and_content(status_type::ok, "ok"); @@ -805,7 +911,7 @@ TEST_CASE("delay reply, server stop, form-urlencode, qureies, throw") { server.stop(); } -async_simple::coro::Lazy chunked_upload1(coro_http_client &client) { +async_simple::coro::Lazy chunked_upload1(coro_http_client& client) { std::string filename = "test.txt"; create_file(filename, 1010); @@ -829,8 +935,8 @@ TEST_CASE("chunked request") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( "/chunked", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { assert(req.get_content_type() == content_type::chunked); chunked_result result{}; std::string content; @@ -857,8 +963,8 @@ TEST_CASE("chunked request") { server.set_http_handler( "/write_chunked", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { resp.set_format_type(format_type::chunked); bool ok; if (ok = co_await resp.get_conn()->begin_chunked(); !ok) { @@ -867,7 +973,7 @@ TEST_CASE("chunked request") { std::vector vec{"hello", " world", " ok"}; - for (auto &str : vec) { + for (auto& str : vec) { if (ok = co_await resp.get_conn()->write_chunked(str); !ok) { co_return; } @@ -901,7 +1007,7 @@ TEST_CASE("test websocket with chunked") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( "/ws_source", - [ws_chunk_size](coro_http_request &req, coro_http_response &resp) + [ws_chunk_size](coro_http_request& req, coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); std::string out_str; @@ -979,8 +1085,8 @@ TEST_CASE("test websocket") { cinatra::coro_http_server server(1, 8003); server.set_http_handler( "/ws_echo", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); std::ofstream out_file("test.temp", std::ios::binary); websocket_result result{}; @@ -1082,8 +1188,8 @@ TEST_CASE("test websocket binary data") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( "/short_binary", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); websocket_result result{}; while (true) { @@ -1105,8 +1211,8 @@ TEST_CASE("test websocket binary data") { }); server.set_http_handler( "/medium_binary", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); websocket_result result{}; while (true) { @@ -1128,8 +1234,8 @@ TEST_CASE("test websocket binary data") { }); server.set_http_handler( "/long_binary", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); websocket_result result{}; while (true) { @@ -1185,7 +1291,7 @@ TEST_CASE("check connecton timeout") { server.set_check_duration(std::chrono::microseconds(600)); server.set_timeout_duration(std::chrono::microseconds(500)); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); @@ -1204,8 +1310,8 @@ TEST_CASE("test websocket with different message size") { cinatra::coro_http_server server(1, 9008); server.set_http_handler( "/ws_echo1", - [](cinatra::coro_http_request &req, - cinatra::coro_http_response &resp) -> async_simple::coro::Lazy { + [](cinatra::coro_http_request& req, + cinatra::coro_http_response& resp) -> async_simple::coro::Lazy { REQUIRE(req.get_content_type() == cinatra::content_type::websocket); cinatra::websocket_result result{}; @@ -1273,7 +1379,7 @@ TEST_CASE("test ssl server") { server.init_ssl("../openssl_files/server.crt", "../openssl_files/server.key", "test"); server.set_http_handler( - "/ssl", [](coro_http_request &req, coro_http_response &resp) { + "/ssl", [](coro_http_request& req, coro_http_response& resp) { resp.set_status_and_content(status_type::ok, "ssl"); }); @@ -1337,8 +1443,8 @@ TEST_CASE("test restful api") { server.set_http_handler( "/test2/{}/test3/{}", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { // coroutine in other thread. CHECK(req.matches_.str(1) == "name"); @@ -1350,13 +1456,13 @@ TEST_CASE("test restful api") { server.set_http_handler( R"(/numbers/(\d+)/test/(\d+))", - [](coro_http_request &req, coro_http_response &response) { + [](coro_http_request& req, coro_http_response& response) { CHECK(req.matches_.str(1) == "100"); CHECK(req.matches_.str(2) == "200"); response.set_status_and_content(status_type::ok, "number regex ok"); }); server.set_http_handler( - "/test4/{}", [](coro_http_request &req, coro_http_response &response) { + "/test4/{}", [](coro_http_request& req, coro_http_response& response) { CHECK(req.matches_.str(1) == "100"); response.set_status_and_content(status_type::ok, "number regex ok"); }); @@ -1418,26 +1524,26 @@ TEST_CASE("test radix tree restful api") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "ok"); }); server.set_http_handler( - "/user/:id", [](coro_http_request &req, coro_http_response &response) { + "/user/:id", [](coro_http_request& req, coro_http_response& response) { CHECK(req.params_["id"] == "cinatra"); response.set_status_and_content(status_type::ok, "ok"); }); server.set_http_handler( "/user/:id/subscriptions", - [](coro_http_request &req, coro_http_response &response) { + [](coro_http_request& req, coro_http_response& response) { CHECK(req.params_["id"] == "subid"); response.set_status_and_content(status_type::ok, "ok"); }); server.set_http_handler( "/users/:userid/subscriptions/:subid", - [](coro_http_request &req, coro_http_response &response) { + [](coro_http_request& req, coro_http_response& response) { CHECK(req.params_["userid"] == "ultramarines"); CHECK(req.params_["subid"] == "guilliman"); response.set_status_and_content(status_type::ok, "ok"); @@ -1445,7 +1551,7 @@ TEST_CASE("test radix tree restful api") { server.set_http_handler( "/values/:x/:y/:z", - [](coro_http_request &req, coro_http_response &response) { + [](coro_http_request& req, coro_http_response& response) { CHECK(req.params_["x"] == "guilliman"); CHECK(req.params_["y"] == "cawl"); CHECK(req.params_["z"] == "yvraine"); @@ -1476,8 +1582,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { response.set_status_and_content(status_type::ok, "ok"); }); @@ -1485,8 +1591,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/user/:id", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { CHECK(req.params_["id"] == "cinatra"); response.set_status_and_content(status_type::ok, "ok"); @@ -1495,8 +1601,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/user/:id/subscriptions", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&] { CHECK(req.params_["id"] == "subid"); response.set_status_and_content(status_type::ok, "ok"); @@ -1505,8 +1611,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/users/:userid/subscriptions/:subid", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&] { CHECK(req.params_["userid"] == "ultramarines"); CHECK(req.params_["subid"] == "guilliman"); @@ -1516,8 +1622,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/values/:x/:y/:z", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&] { CHECK(req.params_["x"] == "guilliman"); CHECK(req.params_["y"] == "cawl"); @@ -1528,8 +1634,8 @@ TEST_CASE("test coro radix tree restful api") { server.set_http_handler( "/ai/robot/:messages", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { CHECK(req.params_["messages"] == "android"); response.set_status_and_content(status_type::ok, "ok"); @@ -1584,8 +1690,8 @@ TEST_CASE("test reverse proxy") { web_one.set_http_handler( "/", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { response.set_status_and_content(status_type::ok, "web1"); }); @@ -1597,8 +1703,8 @@ TEST_CASE("test reverse proxy") { web_two.set_http_handler( "/", - [](coro_http_request &req, - coro_http_response &response) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& response) -> async_simple::coro::Lazy { co_await coro_io::post([&]() { response.set_status_and_content(status_type::ok, "web2"); }); @@ -1609,7 +1715,7 @@ TEST_CASE("test reverse proxy") { cinatra::coro_http_server web_three(1, 9003); web_three.set_http_handler( - "/", [](coro_http_request &req, coro_http_response &response) { + "/", [](coro_http_request& req, coro_http_response& response) { response.set_status_and_content(status_type::ok, "web3"); }); @@ -1683,8 +1789,8 @@ TEST_CASE("test reverse proxy download") { cinatra::coro_http_server server(1, 9001); server.set_http_handler( "/test_chunked", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { resp.set_format_type(format_type::chunked); bool ok; if (ok = co_await resp.get_conn()->begin_chunked(); !ok) { @@ -1693,7 +1799,7 @@ TEST_CASE("test reverse proxy download") { std::vector vec{"hello", " world", " ok"}; - for (auto &str : vec) { + for (auto& str : vec) { if (ok = co_await resp.get_conn()->write_chunked(str); !ok) { co_return; } @@ -1702,13 +1808,13 @@ TEST_CASE("test reverse proxy download") { ok = co_await resp.get_conn()->end_chunked(); }); server.set_http_handler( - "/test", [](coro_http_request &req, coro_http_response &resp) { + "/test", [](coro_http_request& req, coro_http_response& resp) { resp.set_status_and_content(status_type::ok, "hello world"); }); server.set_http_handler( "/test_multipart", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { bool ok; if (ok = co_await resp.get_conn()->begin_multipart(); !ok) { co_return; @@ -1716,7 +1822,7 @@ TEST_CASE("test reverse proxy download") { std::vector vec{"hello", " world", " multipart"}; - for (auto &str : vec) { + for (auto& str : vec) { if (ok = co_await resp.get_conn()->write_multipart(str, "text/plain"); !ok) { co_return; @@ -1756,8 +1862,8 @@ TEST_CASE("test reverse proxy websocket") { coro_http_server server(1, 9005); server.set_http_handler( "/ws_echo", - [](coro_http_request &req, - coro_http_response &resp) -> async_simple::coro::Lazy { + [](coro_http_request& req, + coro_http_response& resp) -> async_simple::coro::Lazy { CHECK(req.get_content_type() == content_type::websocket); websocket_result result{}; while (true) { diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 3504a94cf..105847370 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -29,12 +30,37 @@ #include "async_simple/coro/Sleep.h" #include "async_simple/coro/SpinLock.h" #include "ylt/coro_http/coro_http_client.hpp" +#include "ylt/coro_http/coro_http_server.hpp" #include "ylt/coro_rpc/impl/coro_rpc_client.hpp" #include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" #include "ylt/coro_rpc/impl/expected.hpp" #include "ylt/struct_pack.hpp" using namespace std::chrono_literals; using namespace async_simple::coro; + +class local_http_server { + public: + local_http_server() : server_(1, std::string{"127.0.0.1:0"}) { + server_.set_http_handler( + "/", [](cinatra::request&, cinatra::response& res) { + res.set_status_and_content(cinatra::status_type::ok, "ok"); + }); + server_.async_start(); + REQUIRE(server_.port() > 0); + } + + ~local_http_server() { server_.stop(); } + + std::string url() const { + return "http://127.0.0.1:" + std::to_string(server_.port()); + } + + uint16_t port() const { return server_.port(); } + + private: + coro_http::coro_http_server server_; +}; + template async_simple::coro::Lazy event( int lim, coro_io::client_pool &pool, ConditionVariable &cv, @@ -246,9 +272,10 @@ TEST_CASE("test client pools parallel r/w") { } TEST_CASE("test client pools dns cache") { - async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + local_http_server server; + async_simple::coro::syncAwait([&]() -> async_simple::coro::Lazy { auto pool = coro_io::client_pool::create( - "http://www.baidu.com", + server.url(), coro_io::client_pool::pool_config{ .dns_cache_update_duration = 600s}); auto eps_init = pool->get_remote_endpoints(); @@ -261,7 +288,7 @@ TEST_CASE("test client pools dns cache") { auto eps = pool->get_remote_endpoints(); CHECK(!eps->empty()); CHECK(eps.get() != eps_init.get()); - CHECK(eps->front().port() == 80); + CHECK(eps->front().port() == server.port()); co_await pool->send_request( [](coro_http::coro_http_client &cli) -> Lazy { cli.close(); @@ -273,9 +300,10 @@ TEST_CASE("test client pools dns cache") { } TEST_CASE("test client pools dns refresh") { - async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + local_http_server server; + async_simple::coro::syncAwait([&]() -> async_simple::coro::Lazy { auto pool = coro_io::client_pool::create( - "http://www.baidu.com", + server.url(), coro_io::client_pool::pool_config{ .dns_cache_update_duration = 0s}); auto eps_init = pool->get_remote_endpoints(); @@ -288,7 +316,7 @@ TEST_CASE("test client pools dns refresh") { auto eps = pool->get_remote_endpoints(); CHECK(!eps->empty()); CHECK(eps.get() != eps_init.get()); - CHECK(eps->front().port() == 80); + CHECK(eps->front().port() == server.port()); co_await pool->send_request( [](coro_http::coro_http_client &cli) -> Lazy { co_return; @@ -299,9 +327,10 @@ TEST_CASE("test client pools dns refresh") { } TEST_CASE("test client pools dns parallel refresh") { - async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + local_http_server server; + async_simple::coro::syncAwait([&]() -> async_simple::coro::Lazy { auto pool = coro_io::client_pool::create( - "http://www.baidu.com", + server.url(), coro_io::client_pool::pool_config{ .dns_cache_update_duration = 0s}); auto eps_init = pool->get_remote_endpoints(); @@ -309,7 +338,7 @@ TEST_CASE("test client pools dns parallel refresh") { std::vector< async_simple::coro::RescheduleLazy>> results; - std::atomic err_cnt; + std::atomic err_cnt{0}; for (int i = 0; i < 100; ++i) { results.push_back( pool->send_request( @@ -328,9 +357,10 @@ TEST_CASE("test client pools dns parallel refresh") { } TEST_CASE("test client pools dns don't refresh") { - async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + local_http_server server; + async_simple::coro::syncAwait([&]() -> async_simple::coro::Lazy { auto pool = coro_io::client_pool::create( - "http://www.baidu.com", + server.url(), coro_io::client_pool::pool_config{ .dns_cache_update_duration = -1s}); auto eps_init = pool->get_remote_endpoints(); @@ -347,9 +377,10 @@ TEST_CASE("test client pools dns don't refresh") { } TEST_CASE("test client pools client pool") { - async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + local_http_server server; + async_simple::coro::syncAwait([&]() -> async_simple::coro::Lazy { auto pool = coro_io::client_pool::create( - "http://www.baidu.com", + server.url(), coro_io::client_pool::pool_config{ .max_connection_life_time = 0s}); co_await pool->send_request( @@ -359,4 +390,4 @@ TEST_CASE("test client pools client pool") { }); CHECK(pool->free_client_count() == 0); }()); -} \ No newline at end of file +} diff --git a/src/coro_rpc/tests/test_acceptor.cpp b/src/coro_rpc/tests/test_acceptor.cpp index e12aa4289..ef31f85d3 100644 --- a/src/coro_rpc/tests/test_acceptor.cpp +++ b/src/coro_rpc/tests/test_acceptor.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include +#include #include #include #include @@ -26,6 +28,25 @@ #include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" using namespace coro_rpc; + +namespace { +asio::error_code bind_ipv6_only_probe(uint16_t port) { + asio::io_context ctx; + asio::ip::tcp::acceptor probe(ctx); + asio::error_code ec; + probe.open(asio::ip::tcp::v6(), ec); + if (ec) { + return ec; + } + probe.set_option(asio::ip::v6_only(true), ec); + if (ec) { + return ec; + } + probe.bind({asio::ip::address_v6::any(), port}, ec); + return ec; +} +} // namespace + #ifdef YLT_ENABLE_IBV std::string addr; void test_rdma_multi_dev_server() { @@ -44,6 +65,115 @@ void test_rdma_multi_dev_server() { #endif TEST_CASE("test server acceptor") { + SUBCASE("test ipv6 any acceptor layout") { + coro_rpc_server server(static_cast(1), + static_cast(8826), + std::string("::")); + const auto& acceptors = server.get_acceptors(); +#if defined(__linux__) + REQUIRE(acceptors.size() == 2); + CHECK(acceptors[0]->address() == "::"); + CHECK(acceptors[1]->address() == "0.0.0.0"); + CHECK(acceptors[0]->port() == 8826); + CHECK(acceptors[1]->port() == 8826); +#else + REQUIRE(acceptors.size() == 1); + CHECK(acceptors[0]->address() == "::"); + CHECK(acceptors[0]->port() == 8826); +#endif + } + + SUBCASE("test ipv6 any accepts ipv4 and ipv6 rpc clients") { + coro_rpc_server server(static_cast(1), + static_cast(0), std::string("::")); + server.register_handler(); + + auto res = server.async_start(); + CHECK_MESSAGE(!res.hasResult(), "server start timeout"); + REQUIRE(server.port() > 0); + + const auto port = std::to_string(server.port()); + + coro_rpc_client client_v4; + auto ec = syncAwait(client_v4.connect("127.0.0.1", port)); + CHECK_MESSAGE(!ec, ec.message()); + auto result = syncAwait(client_v4.call()); + CHECK_MESSAGE(result.has_value(), result.error().msg); + + coro_rpc_client client_v6; + ec = syncAwait(client_v6.connect("::1", port)); + CHECK_MESSAGE(!ec, ec.message()); + result = syncAwait(client_v6.call()); + CHECK_MESSAGE(result.has_value(), result.error().msg); + +#if defined(__linux__) + CHECK(server.get_acceptors().size() == 2); + CHECK(server.get_acceptors()[1]->address() == "0.0.0.0"); + CHECK(server.get_acceptors()[1]->port() == server.port()); +#endif + + server.stop(); + } + + SUBCASE("test ipv6 any address string creates dual acceptors") { + coro_rpc_server server(static_cast(1), std::string("[::]:8827")); + const auto& acceptors = server.get_acceptors(); +#if defined(__linux__) + REQUIRE(acceptors.size() == 2); + CHECK(acceptors[0]->address() == "::"); + CHECK(acceptors[1]->address() == "0.0.0.0"); + CHECK(acceptors[0]->port() == 8827); + CHECK(acceptors[1]->port() == 8827); +#else + REQUIRE(acceptors.size() == 1); + CHECK(acceptors[0]->address() == "::"); + CHECK(acceptors[0]->port() == 8827); +#endif + } + + SUBCASE("test ipv6 any address string with port 0 creates dual acceptors") { + coro_rpc_server server(static_cast(1), std::string("[::]:0")); + server.register_handler(); + + auto res = server.async_start(); + CHECK_MESSAGE(!res.hasResult(), "server start timeout"); + REQUIRE(server.port() > 0); + +#if defined(__linux__) + const auto& acceptors = server.get_acceptors(); + REQUIRE(acceptors.size() == 2); + CHECK(acceptors[0]->address() == "::"); + CHECK(acceptors[1]->address() == "0.0.0.0"); + CHECK(acceptors[0]->port() == server.port()); + CHECK(acceptors[1]->port() == server.port()); +#endif + + server.stop(); + } + +#if defined(__linux__) + SUBCASE("test failed second acceptor closes primary acceptor") { + asio::io_context ctx; + asio::ip::tcp::acceptor ipv4_blocker(ctx); + asio::error_code ec; + ipv4_blocker.open(asio::ip::tcp::v4(), ec); + REQUIRE(!ec); + ipv4_blocker.bind({asio::ip::address_v4::any(), 0}, ec); + REQUIRE(!ec); + ipv4_blocker.listen(asio::socket_base::max_listen_connections, ec); + REQUIRE(!ec); + + auto port = ipv4_blocker.local_endpoint().port(); + + coro_rpc_server server(static_cast(1), port, std::string("::")); + auto start_error = server.async_start().get(); + REQUIRE(start_error); + + auto probe_error = bind_ipv6_only_probe(port); + CHECK_MESSAGE(!probe_error, probe_error.message()); + } +#endif + SUBCASE("test multi server acceptor") { std::vector> acceptors; acceptors.emplace_back( @@ -76,7 +206,7 @@ TEST_CASE("test server acceptor") { #ifdef YLT_ENABLE_IBV SUBCASE("test multi rdma device for server") { std::vector> ibv_dev_lists; - for (auto &dev : coro_io::g_ib_device_manager()->get_dev_list()) { + for (auto& dev : coro_io::g_ib_device_manager()->get_dev_list()) { ibv_dev_lists.push_back(dev.second); } coro_rpc_server server( @@ -106,4 +236,4 @@ TEST_CASE("test server acceptor") { server.stop(); } #endif -} \ No newline at end of file +}