Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 125 additions & 2 deletions include/ylt/coro_io/listen_endpoint.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

#include <charconv>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>

#include "asio/error_code.hpp"
#include "asio/ip/address.hpp"
Expand All @@ -11,6 +14,71 @@

namespace coro_io::detail {

struct listen_address {
std::string address;
uint16_t port = 0;
};

inline listen_address parse_listen_address(std::string_view address,
uint16_t default_port = 0) {
listen_address parsed{std::string(address), default_port};
if (address.empty()) {
return parsed;
}

if (address.front() == '[') {
auto close = address.find(']');
if (close != std::string_view::npos) {
parsed.address = std::string(address.substr(1, close - 1));
if (close + 1 < address.size() && address[close + 1] == ':') {
auto port_sv = address.substr(close + 2);
uint16_t port = 0;
auto [ptr, ec] = std::from_chars(port_sv.data(),
port_sv.data() + port_sv.size(), port);
if (ec == std::errc{}) {
parsed.port = port;
}
}
return parsed;
}
}

asio::error_code ec;
(void)asio::ip::make_address(address, ec);
if (!ec) {
parsed.address = std::string(address);
return parsed;
}

if (size_t pos = address.rfind(':'); pos != std::string_view::npos) {
auto port_sv = address.substr(pos + 1);
uint16_t port = 0;
auto [ptr, parse_ec] =
std::from_chars(port_sv.data(), port_sv.data() + port_sv.size(), port);
if (parse_ec == std::errc{}) {
parsed.address = std::string(address.substr(0, pos));
parsed.port = port;
}
}

return parsed;
}

inline bool is_ipv6_any_endpoint(const asio::ip::tcp::endpoint& endpoint) {
return endpoint.address().is_v6() &&
endpoint.address().to_v6().is_unspecified();
}

inline bool should_create_dual_stack_acceptor(
const asio::ip::tcp::endpoint& endpoint) {
#if defined(__linux__)
return is_ipv6_any_endpoint(endpoint);
#else
(void)endpoint;
return false;
#endif
}

// Resolve a string address into a TCP endpoint.
// - If `address` is a numeric IP literal (e.g. "::", "0.0.0.0", "127.0.0.1"),
// it is parsed directly via `asio::ip::make_address` to avoid the ambiguity
Expand Down Expand Up @@ -49,14 +117,69 @@ inline std::optional<asio::ip::tcp::endpoint> resolve_listen_endpoint(
// cannot be set (e.g. on platforms that disallow toggling V6ONLY at runtime).
// Callers are expected to log a warning on failure rather than abort, since
// the listen socket can still serve IPv6 connections.
enum class ipv6_only_mode {
keep,
enable,
disable,
};

inline asio::error_code set_ipv6_only(asio::ip::tcp::acceptor& acceptor,
const asio::ip::tcp::endpoint& endpoint,
ipv6_only_mode mode) {
asio::error_code ec;
if (endpoint.protocol() == asio::ip::tcp::v6() &&
mode != ipv6_only_mode::keep) {
acceptor.set_option(asio::ip::v6_only(mode == ipv6_only_mode::enable), ec);
}
return ec;
}

inline asio::error_code set_ipv6_only_false(
asio::ip::tcp::acceptor& acceptor,
const asio::ip::tcp::endpoint& endpoint) {
return set_ipv6_only(acceptor, endpoint, ipv6_only_mode::disable);
}

inline void close_acceptor_now(asio::ip::tcp::acceptor& acceptor) {
asio::error_code ec;
acceptor.cancel(ec);
acceptor.close(ec);
}

inline asio::error_code init_tcp_acceptor(
asio::ip::tcp::acceptor& acceptor, const asio::ip::tcp::endpoint& endpoint,
ipv6_only_mode mode) {
using asio::ip::tcp;
asio::error_code ec;
if (endpoint.protocol() == asio::ip::tcp::v6()) {
acceptor.set_option(asio::ip::v6_only(false), ec);

acceptor.open(endpoint.protocol(), ec);
if (ec) {
return ec;
}
#ifdef __GNUC__
acceptor.set_option(tcp::acceptor::reuse_address(true), ec);
#endif
if (auto opt_ec = set_ipv6_only(acceptor, endpoint, mode); opt_ec) {
close_acceptor_now(acceptor);
return opt_ec;
}
acceptor.bind(endpoint, ec);
if (ec) {
close_acceptor_now(acceptor);
return ec;
}
#ifdef _MSC_VER
acceptor.set_option(tcp::acceptor::reuse_address(true));
#endif
acceptor.listen(asio::socket_base::max_listen_connections, ec);
if (ec) {
close_acceptor_now(acceptor);
}
return ec;
}

inline asio::ip::tcp::endpoint make_ipv4_any_endpoint(uint16_t port) {
return {asio::ip::make_address_v4("0.0.0.0"), port};
}

} // namespace coro_io::detail
118 changes: 57 additions & 61 deletions include/ylt/coro_io/server_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <async_simple/coro/Lazy.h>

#include <asio/ip/tcp.hpp>
#include <atomic>
#include <charconv>
#include <cstdint>
#include <optional>
Expand Down Expand Up @@ -32,6 +33,7 @@ struct server_acceptor_base {
uint16_t port() const noexcept { return port_; }
std::string_view address() const { return address_; }
virtual void close() = 0;
virtual void close_now() = 0;
virtual listen_errc listen() = 0;
virtual async_simple::coro::Lazy<
ylt::expected<coro_io::socket_wrapper_t, std::error_code>>
Expand All @@ -46,77 +48,52 @@ struct server_acceptor_base {
pool_ = pool;
}
void init_address(std::string_view address) {
if (port_ == 0) {
if (size_t pos = address.rfind(':'); pos != std::string::npos) {
auto port_sv = std::string_view(address).substr(pos + 1);

uint16_t port;
auto [ptr, ec] = std::from_chars(
port_sv.data(), port_sv.data() + port_sv.size(), port, 10);
if (ec != std::errc{}) {
address_ = std::string{address};
return;
}

port_ = port;
address = address.substr(0, pos);
if (address.front() == '[') {
if (address.size() > 2)
address = address.substr(1, address.size() - 2);
}
}
}

address_ = std::string{address};
auto parsed = detail::parse_listen_address(address, port_);
address_ = std::move(parsed.address);
port_ = parsed.port;
}
void set_ipv6_dual_stack(bool v) noexcept { ipv6_dual_stack_ = v; }
bool ipv6_dual_stack() const noexcept { return ipv6_dual_stack_; }
uint16_t port_;
std::string address_;
coro_io::io_context_pool* pool_ = nullptr;
bool ipv6_dual_stack_ = false;
};

struct tcp_server_acceptor : public server_acceptor_base {
virtual listen_errc listen() override {
acceptor_ =
asio::ip::tcp::acceptor{pool_->get_executor()->get_asio_executor()};
executor_ = pool_->get_executor();
acceptor_ = asio::ip::tcp::acceptor{executor_->get_asio_executor()};
ELOG_INFO << "begin to listen";
using asio::ip::tcp;
asio::error_code ec;

auto endpoint = detail::resolve_listen_endpoint(acceptor_->get_executor(),
address_, port_, ec);
auto endpoint = detail::resolve_listen_endpoint(
executor_->get_asio_executor(), address_, port_, ec);
if (!endpoint) {
ELOG_ERROR << "resolve address " << address_
<< " error: " << ec.message();
return listen_errc::bad_address;
}

acceptor_->open(endpoint->protocol(), ec);
if (ec) {
ELOG_ERROR << "open failed, error: " << ec.message();
return listen_errc::open_error;
}
#ifdef __GNUC__
acceptor_->set_option(tcp::acceptor::reuse_address(true), ec);
#endif
if (auto opt_ec = detail::set_ipv6_only_false(*acceptor_, *endpoint);
opt_ec) {
ELOG_WARN << "set v6_only(false) failed: " << opt_ec.message();
}
acceptor_->bind(*endpoint, ec);
if (ec) {
ELOG_ERROR << "bind port " << port_ << " error: " << ec.message();
acceptor_->cancel(ec);
acceptor_->close(ec);
return listen_errc::address_in_used;
}
#ifdef _MSC_VER
acceptor_->set_option(tcp::acceptor::reuse_address(true));
#endif
acceptor_->listen(asio::socket_base::max_listen_connections, ec);
auto mode = ipv6_dual_stack_ ? detail::ipv6_only_mode::enable
: detail::ipv6_only_mode::disable;
ec = detail::init_tcp_acceptor(*acceptor_, *endpoint, mode);
if (ec) {
ELOG_ERROR << "port " << port_ << " listen error: " << ec.message();
acceptor_->cancel(ec);
acceptor_->close(ec);
ELOG_ERROR << "listen init failed, error: " << ec.message();
if (ec == asio::error::address_in_use) {
return listen_errc::address_in_used;
}
if (ec == asio::error::invalid_argument ||
ec == asio::error::address_family_not_supported ||
ec == asio::error::fault) {
return listen_errc::bad_address;
}
if (ec == asio::error::already_open ||
ec == asio::error::operation_not_supported ||
ec == asio::error::no_descriptors) {
return listen_errc::open_error;
}
return listen_errc::listen_error;
}

Expand All @@ -134,10 +111,12 @@ struct tcp_server_acceptor : public server_acceptor_base {
virtual async_simple::coro::Lazy<
ylt::expected<coro_io::socket_wrapper_t, std::error_code>>
accept() override {
accept_started_.store(true, std::memory_order_release);
assert(acceptor_ != std::nullopt);
auto executor = pool_->get_executor();
asio::ip::tcp::socket socket(executor->get_asio_executor());
auto socket_executor = pool_->get_executor();
asio::ip::tcp::socket socket(socket_executor->get_asio_executor());
ELOG_TRACE << "start accepting from acceptor: " << address_ << ":" << port_;
co_await coro_io::dispatch(executor_->get_asio_executor());
auto error = co_await coro_io::async_accept(*acceptor_, socket);
ELOG_TRACE << "get connection from acceptor: " << address_ << ":" << port_;
if (error) {
Expand All @@ -155,22 +134,39 @@ struct tcp_server_acceptor : public server_acceptor_base {
ylt::unexpected<std::error_code>{error}};
}
else {
co_return coro_io::socket_wrapper_t{std::move(socket), executor};
co_return coro_io::socket_wrapper_t{std::move(socket), socket_executor};
}
}

virtual void close() override {
asio::dispatch(acceptor_->get_executor(), [this]() {
asio::error_code ec;
(void)acceptor_->cancel(ec);
(void)acceptor_->close(ec);
virtual void close_now() override {
if (!acceptor_) {
return;
}
if (!accept_started_.load(std::memory_order_acquire)) {
detail::close_acceptor_now(*acceptor_);
return;
}
asio::dispatch(executor_->get_asio_executor(), [this]() {
if (acceptor_) {
detail::close_acceptor_now(*acceptor_);
}
});
acceptor_close_waiter_.get_future().wait();
}

virtual void close() override {
close_now();
if (accept_started_.load(std::memory_order_acquire)) {
acceptor_close_future_.wait();
}
}
virtual ~tcp_server_acceptor() = default;
using server_acceptor_base::server_acceptor_base;

std::optional<asio::ip::tcp::acceptor> acceptor_;
coro_io::ExecutorWrapper<>* executor_ = nullptr;
std::promise<void> acceptor_close_waiter_;
std::future<void> acceptor_close_future_ =
acceptor_close_waiter_.get_future();
std::atomic<bool> accept_started_ = false;
};
} // namespace coro_io
10 changes: 5 additions & 5 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1242,11 +1242,11 @@ class coro_rpc_client {

/*
* buffer layout
* 閳瑰备鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞顑芥敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
* 閳逛拷eq_header 閳逛繘rgs 閳?
* 閳规壕鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞灏栨敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
* 閳逛縼EQ_HEADER_LEN 閳瑰€俛riable length 閳?
* 閳规柡鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞绮规敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
* ┌────────────────┬────────────────┐
* │req_header │args
* ├────────────────┼────────────────┤
* │REQ_HEADER_LEN │variable length
* └────────────────┴────────────────┘
*/
template <auto func, typename... Args>
std::vector<std::byte> prepare_buffer(uint32_t &id,
Expand Down
Loading
Loading