Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ class client_pool : public std::enable_shared_from_this<
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
alive_detect(client->get_config(), std::move(self)).start([](auto&&) {
});
typename client_t::config config_copy = client->get_config();
client = nullptr;
alive_detect(std::move(config_copy), std::move(self)).start([](auto&&) {
});
}

static async_simple::coro::Lazy<void> alive_detect(
const typename client_t::config& client_config,
typename client_t::config client_config,
std::weak_ptr<client_pool> watcher) {
std::shared_ptr<client_pool> self = watcher.lock();
using namespace std::chrono_literals;
Expand Down
17 changes: 17 additions & 0 deletions include/ylt/coro_io/socket_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ struct socket_wrapper_t {
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
socket_->close(ignored_ec);
}
#ifdef YLT_ENABLE_SSL
// Do NOT destroy ssl_stream_ here. Closing the socket cancels pending
// async SSL operations, but on Windows IOCP their completion handlers
// may be dequeued AFTER any asio::post handler (LIFO ordering). If we
// destroy ssl_stream_ (even via asio::post), the cancellation completion
// handler would access freed SSL memory. Instead, leave ssl_stream_ alive
// so pending completions can safely reference it. It will be cleaned up
// later by init_ssl() (during reset) or by the destructor.
#endif
}

coro_io::endpoint remote_endpoint() {
Expand Down Expand Up @@ -196,6 +205,14 @@ struct socket_wrapper_t {
using tcp_socket_t = asio::ip::tcp::socket;
#ifdef YLT_ENABLE_SSL
void init_ssl(asio::ssl::context &ssl_ctx) {
// If an old ssl_stream_ exists, destroy it directly.
// This is safe because init_ssl() is only called from reset(), which
// happens after the cancelled async_connect coroutine has already
// completed (its completion handler has run), so no pending async
// operations reference the old ssl_stream_.
if (ssl_stream_) {
ssl_stream_.reset();
}
ssl_stream_ = std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>(
*socket_, ssl_ctx);
}
Expand Down
58 changes: 53 additions & 5 deletions include/ylt/coro_rpc/impl/common_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <ylt/easylog.hpp>

#ifdef YLT_ENABLE_SSL
#include <openssl/ssl.h>

#include <asio/ssl.hpp>
#endif

Expand All @@ -31,10 +33,14 @@ namespace coro_rpc {
* SSL config
*/
struct ssl_configure {
std::string base_path; //!< all config files base path
std::string cert_file; //!< relative path of certificate chain file
std::string key_file; //!< relative path of private key file
std::string dh_file; //!< relative path of tmp dh file (optional)
std::string base_path; //!< all config files base path
std::string cert_file; //!< relative path of certificate chain file
std::string key_file; //!< relative path of private key file
std::string dh_file; //!< relative path of tmp dh file (optional)
std::string ca_cert_file; //!< relative path of CA certificate file for
//!< client verification (optional)
bool enable_client_verify =
false; //!< enable client certificate verification
};
#ifdef YLT_ENABLE_NTLS
///*!
Expand Down Expand Up @@ -172,6 +178,47 @@ inline bool init_ssl_context_helper(asio::ssl::context &context,
ELOG_INFO << "no temp dh file " << dh_file.string();
}

// Set lower security level for test certificates (OpenSSL 3.0
// compatibility)
SSL_CTX_set_security_level(context.native_handle(), 0);

// Load CA certificate for client verification if provided
asio::error_code ec;
if (!conf.ca_cert_file.empty()) {
auto ca_cert_file = fs::path(conf.base_path).append(conf.ca_cert_file);
if (file_exists(ca_cert_file)) {
context.load_verify_file(ca_cert_file.string(), ec);
if (ec) {
ELOG_ERROR << "failed to load CA certificate: " << ec.message();
return false;
}
ELOG_INFO << "loaded CA certificate: " << ca_cert_file.string();
}
else {
ELOG_ERROR << "CA certificate file not found: "
<< ca_cert_file.string();
return false;
}
}

// Set verification mode based on client verification configuration
if (conf.enable_client_verify) {
context.set_verify_mode(
asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert, ec);
if (ec) {
ELOG_WARN << "failed to set verify mode: " << ec.message();
}
else {
ELOG_INFO << "client certificate verification enabled (mandatory)";
}
}
else {
context.set_verify_mode(asio::ssl::verify_none, ec);
if (ec) {
ELOG_WARN << "failed to set verify mode: " << ec.message();
}
}

return true;
} catch (std::exception &e) {
ELOG_INFO << e.what();
Expand Down Expand Up @@ -409,7 +456,8 @@ inline bool init_ntls_context_helper(asio::ssl::context &context,

// Set verification mode
if (conf.enable_client_verify) {
context.set_verify_mode(asio::ssl::verify_peer, ec);
context.set_verify_mode(
asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert, ec);
}
else {
context.set_verify_mode(asio::ssl::verify_none, ec);
Expand Down
132 changes: 117 additions & 15 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
#include "inject_action.hpp"
#endif

#ifdef YLT_ENABLE_SSL
#include <openssl/ssl.h>
#endif

#ifdef GENERATE_BENCHMARK_DATA
#include <fstream>
#endif
Expand Down Expand Up @@ -196,6 +200,10 @@ class coro_rpc_client {
bool enable_tcp_no_delay = true;
std::filesystem::path ssl_cert_path{};
std::string ssl_domain{};
std::filesystem::path
client_cert_file{}; // Client certificate for mutual authentication
std::filesystem::path
client_key_file{}; // Client private key for mutual authentication
};
#ifdef YLT_ENABLE_NTLS
struct tcp_with_ntls_config {
Expand Down Expand Up @@ -303,6 +311,11 @@ class coro_rpc_client {
ELOG_INFO << "init ssl: " << config.ssl_domain;
auto &cert_file = config.ssl_cert_path;
ELOG_INFO << "current path: " << std::filesystem::current_path().string();

// Set lower security level for test certificates (OpenSSL 3.0
// compatibility)
SSL_CTX_set_security_level(ssl_ctx_.native_handle(), 0);

if (file_exists(cert_file)) {
ELOG_INFO << "load " << cert_file.string();
ssl_ctx_.load_verify_file(cert_file.string());
Expand All @@ -311,9 +324,48 @@ class coro_rpc_client {
ELOG_INFO << "no certificate file " << cert_file.string();
return ssl_init_ret_;
}

// Load client certificate and key for mutual authentication
if (!config.client_cert_file.empty() || !config.client_key_file.empty()) {
if (config.client_cert_file.empty() || config.client_key_file.empty()) {
ELOG_ERROR << "Both client certificate and key must be provided for "
"mutual authentication";
return ssl_init_ret_;
}

if (file_exists(config.client_cert_file)) {
ELOG_INFO << "load client certificate: "
<< config.client_cert_file.string();
ssl_ctx_.use_certificate_chain_file(config.client_cert_file.string());
}
else {
ELOG_ERROR << "client certificate file not found: "
<< config.client_cert_file.string();
return ssl_init_ret_;
}

if (file_exists(config.client_key_file)) {
ELOG_INFO << "load client private key: "
<< config.client_key_file.string();
ssl_ctx_.use_private_key_file(config.client_key_file.string(),
asio::ssl::context::pem);
}
else {
ELOG_ERROR << "client key file not found: "
<< config.client_key_file.string();
return ssl_init_ret_;
}
ELOG_INFO << "client certificate loaded for mutual authentication";
}

ssl_ctx_.set_verify_mode(asio::ssl::verify_peer);
ssl_ctx_.set_verify_callback(
asio::ssl::host_name_verification(config.ssl_domain));
// Set hostname verification for DNS names (skip for IP addresses and
// empty)
if (!config.ssl_domain.empty() && config.ssl_domain != "127.0.0.1" &&
config.ssl_domain != "localhost") {
ssl_ctx_.set_verify_callback(
asio::ssl::host_name_verification(config.ssl_domain));
}
auto init_result = control_->socket_wrapper_.init_client(
ssl_ctx_, config.enable_tcp_no_delay);
if (!init_result) {
Expand Down Expand Up @@ -509,13 +561,15 @@ class coro_rpc_client {
}
}

// Set verification mode - use same approach as HTTP client
// Set verification mode
if (config.enable_client_verify) {
ssl_ctx_.set_verify_mode(asio::ssl::verify_peer);
// Note: Skip host_name_verification for NTLS as it may not be
// compatible The server certificate will still be verified against CA
// ssl_ctx_.set_verify_callback(
// asio::ssl::host_name_verification(config.ssl_domain));
// Set hostname verification for DNS names (skip for IP addresses)
if (!config.ssl_domain.empty() && config.ssl_domain != "127.0.0.1" &&
config.ssl_domain != "localhost") {
ssl_ctx_.set_verify_callback(
asio::ssl::host_name_verification(config.ssl_domain));
}
}
else {
ssl_ctx_.set_verify_mode(asio::ssl::verify_none);
Expand Down Expand Up @@ -650,13 +704,55 @@ class coro_rpc_client {
.ssl_domain = std::move(ssl_domain)};
}
else {
auto &conf = std::get<tcp_with_ssl_config>(config_.socket_config);
auto& conf = std::get<tcp_with_ssl_config>(config_.socket_config);
conf.ssl_cert_path = std::move(ssl_cert_path);
conf.ssl_domain = domain = std::move(ssl_domain);
}
return init_socket_wrapper(
std::get<tcp_with_ssl_config>(config_.socket_config));
}

/*!
* Initialize SSL with client certificate for mutual authentication
* @param cert_base_path Base path for certificate files
* @param cert_file_name CA certificate file name for server verification
* @param client_cert_file Client certificate file name for mutual
* authentication
* @param client_key_file Client private key file name for mutual
* authentication
* @param domain Server domain name
* @return true if initialization successful
*/
[[nodiscard]] bool init_ssl(std::string_view cert_base_path,
std::string_view cert_file_name,
std::string_view client_cert_file,
std::string_view client_key_file,
std::string_view domain = "localhost") {
std::string ssl_domain = std::string{domain};
std::string ssl_cert_path =
std::filesystem::path(cert_base_path).append(cert_file_name).string();
std::string ssl_client_cert_path =
std::filesystem::path(cert_base_path).append(client_cert_file).string();
std::string ssl_client_key_path =
std::filesystem::path(cert_base_path).append(client_key_file).string();

if (config_.socket_config.index() != 1) {
config_.socket_config = tcp_with_ssl_config{
.ssl_cert_path = std::move(ssl_cert_path),
.ssl_domain = std::move(ssl_domain),
.client_cert_file = std::move(ssl_client_cert_path),
.client_key_file = std::move(ssl_client_key_path)};
}
else {
auto &conf = std::get<tcp_with_ssl_config>(config_.socket_config);
conf.ssl_cert_path = std::move(ssl_cert_path);
conf.ssl_domain = std::move(ssl_domain);
conf.client_cert_file = std::move(ssl_client_cert_path);
conf.client_key_file = std::move(ssl_client_key_path);
}
return init_socket_wrapper(
std::get<tcp_with_ssl_config>(config_.socket_config));
}
#ifdef YLT_ENABLE_NTLS
[[nodiscard]] bool init_ntls(const ssl_ntls_configure &conf) {
if (conf.mode == ntls_mode::tls13_single_cert) {
Expand Down Expand Up @@ -1056,7 +1152,13 @@ class coro_rpc_client {
<< ", the first endpoint is: " << (*eps)[0].address().to_string()
<< ":" << std::to_string((*eps)[0].port())
<< ", client_id: " << config_.client_id;
ec = co_await coro_io::async_connect(soc, *eps);
// Use socket_wrapper_.visit() to get a fresh socket reference instead of
// the `soc` parameter, which may dangle if reset() destroyed and
// recreated ssl_stream_ above.
ec = co_await control_->socket_wrapper_.visit(
[eps](auto &fresh_soc) {
return coro_io::async_connect(fresh_soc, *eps);
});
std::error_code ignore_ec;
timer_->cancel(ignore_ec);
if (control_->is_timeout_) {
Expand Down Expand Up @@ -1138,11 +1240,11 @@ class coro_rpc_client {

/*
* buffer layout
* ┌────────────────┬────────────────┐
* │req_header │args
* ├────────────────┼────────────────┤
* │REQ_HEADER_LEN │variable length
* └────────────────┴────────────────┘
* 閳瑰备鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞顑芥敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
* 閳逛拷eq_header 閳逛繘rgs 閳?
* 閳规壕鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞灏栨敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
* 閳逛縼EQ_HEADER_LEN 閳瑰€俛riable length 閳?
* 閳规柡鏀㈤埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞鈧埞绮规敘閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳光偓閳?
*/
template <auto func, typename... Args>
std::vector<std::byte> prepare_buffer(uint32_t &id,
Expand Down Expand Up @@ -1361,7 +1463,7 @@ class coro_rpc_client {
co_await coro_io::post(
[]() {
},
control->executor_); // post to control ioc
control->executor_); // drain: wait for any pending close_socket_async dispatch
co_return;
}
co_await coro_io::post(
Expand Down
Loading
Loading