diff --git a/README.md b/README.md index b4f36c6..5daed2a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Harbr-Router: High-Performance Rust Reverse Proxy +# Harbr-Router: High-Performance Rust Reverse Proxy with TCP Support -A blazingly fast, memory-efficient reverse proxy built in Rust using async I/O and designed for high-scale production workloads. +A blazingly fast, memory-efficient reverse proxy built in Rust using async I/O and designed for high-scale production workloads. Harbr-Router now supports both HTTP and raw TCP traffic, making it perfect for database proxying and other non-HTTP protocols. ## Features @@ -12,6 +12,9 @@ A blazingly fast, memory-efficient reverse proxy built in Rust using async I/O a - 🔄 **Zero Downtime**: Graceful shutdown support - 🛡️ **Battle-tested**: Built on production-grade libraries - 🎯 **Path-based Routing**: Flexible route configuration +- 🌐 **Protocol Agnostic**: Support for both HTTP and TCP traffic +- 🗄️ **Database Support**: Automatic detection and handling of database protocols +- 🔌 **Connection Pooling**: Efficient reuse of TCP connections for better performance ## Quick Start @@ -26,12 +29,26 @@ listen_addr: "0.0.0.0:8080" global_timeout_ms: 5000 max_connections: 10000 +# TCP Proxy Configuration +tcp_proxy: + enabled: true + listen_addr: "0.0.0.0:9090" + connection_pooling: true + max_idle_time_secs: 60 + routes: + # HTTP Routes "/api": upstream: "http://backend-api:8080" - health_check_path: "/health" timeout_ms: 3000 retry_count: 2 + + # Database Route (automatically handled as TCP) + "postgres-db": + upstream: "postgresql://postgres-db:5432" + is_tcp: true + db_type: "postgresql" + timeout_ms: 10000 ``` 3. Run the proxy: @@ -45,13 +62,22 @@ harbr-router -c config.yml | Field | Type | Description | Default | |-------|------|-------------|---------| -| `listen_addr` | String | Address and port to listen on | Required | +| `listen_addr` | String | Address and port to listen on for HTTP | Required | | `global_timeout_ms` | Integer | Global request timeout in milliseconds | Required | | `max_connections` | Integer | Maximum number of concurrent connections | Required | -### Route Configuration +### TCP Proxy Configuration + +| Field | Type | Description | Default | +|-------|------|-------------|---------| +| `enabled` | Boolean | Enable TCP proxy functionality | `false` | +| `listen_addr` | String | Address and port for TCP listener | `0.0.0.0:9090` | +| `connection_pooling` | Boolean | Enable connection pooling for TCP | `true` | +| `max_idle_time_secs` | Integer | Max time to keep idle connections | `60` | -Each route is defined by a path prefix and its configuration: +### HTTP Route Configuration + +Each HTTP route is defined by a path prefix and its configuration: | Field | Type | Description | Default | |-------|------|-------------|---------| @@ -60,6 +86,16 @@ Each route is defined by a path prefix and its configuration: | `timeout_ms` | Integer | Route-specific timeout in ms | Global timeout | | `retry_count` | Integer | Number of retry attempts | 0 | +### TCP/Database Route Configuration + +TCP routes use the same configuration structure with additional fields: + +| Field | Type | Description | Default | +|-------|------|-------------|---------| +| `is_tcp` | Boolean | Mark route as TCP instead of HTTP | `false` | +| `db_type` | String | Database type (mysql, postgresql, etc.) | Optional | +| `tcp_listen_port` | Integer | Custom port for this TCP service | Optional | + ### Example Configuration ```yaml @@ -67,6 +103,10 @@ listen_addr: "0.0.0.0:8080" global_timeout_ms: 5000 max_connections: 10000 +tcp_proxy: + enabled: true + listen_addr: "0.0.0.0:9090" + routes: "/api": upstream: "http://backend-api:8080" @@ -77,49 +117,72 @@ routes: "/static": upstream: "http://static-server:80" timeout_ms: 1000 - retry_count: 1 "/": upstream: "http://default-backend:8080" timeout_ms: 2000 - retry_count: 2 + + # Database Routes + "mysql-primary": + upstream: "mysql://db-primary:3306" + is_tcp: true + db_type: "mysql" + timeout_ms: 10000 + retry_count: 3 + + "postgres-analytics": + upstream: "postgresql://analytics-db:5432" + is_tcp: true + db_type: "postgresql" + tcp_listen_port: 5433 # Custom listening port ``` -### Route Matching +## Database Support -- Routes are matched by prefix -- More specific routes take precedence -- The "/" route acts as a catch-all default -- Health checks run on the specified path for each upstream +Harbr-Router now includes automatic detection and support for common database protocols: + +- **MySQL/MariaDB** (ports 3306, 33060) +- **PostgreSQL** (port 5432) +- **MongoDB** (ports 27017, 27018, 27019) +- **Redis** (port 6379) +- **Oracle** (port 1521) +- **SQL Server** (port 1433) +- **Cassandra** (port 9042) +- **CouchDB** (port 5984) +- **InfluxDB** (port 8086) +- **Elasticsearch** (ports 9200, 9300) + +Database connections are automatically detected by: +1. Explicit configuration (`is_tcp: true` and `db_type: "..."`) +2. Port numbers in the upstream URL +3. Protocol prefixes (mysql://, postgresql://, etc.) + +## TCP Proxy Operation + +The TCP proxy operates by: + +1. Accepting connections on the configured TCP listening port +2. Forwarding traffic to the appropriate upstream +3. Maintaining connection pooling for better performance +4. Applying timeouts and retries as configured ## Metrics -The proxy exposes Prometheus-compatible metrics at `/metrics`: +The proxy now exposes additional TCP proxy metrics at `/metrics`: ### Counter Metrics | Metric | Labels | Description | |--------|--------|-------------| -| `proxy_request_total` | `status=success\|error` | Total requests | -| `proxy_attempt_total` | `result=success\|failure\|timeout` | Request attempts | -| `proxy_timeout_total` | - | Total timeouts | +| `tcp_proxy.connection.new` | - | New TCP connections created | +| `tcp_proxy.connection.completed` | - | Completed TCP connections | +| `tcp_proxy.timeout` | - | TCP connection timeouts | ### Histogram Metrics | Metric | Description | |--------|-------------| -| `proxy_request_duration_seconds` | Request duration histogram | - -## Health Checks - -Health checks are performed on the specified `health_check_path` for each upstream: - -- Method: GET -- Interval: 10 seconds -- Success: 200-299 status code -- Timeout: 5 seconds - -Failed health checks will remove the upstream from the pool until it recovers. +| `tcp_proxy.connection.duration_seconds` | TCP connection duration histogram | ## Production Deployment @@ -144,7 +207,8 @@ services: proxy: image: harbr-router:latest ports: - - "8080:8080" + - "8080:8080" # HTTP + - "9090:9090" # TCP volumes: - ./config.yml:/etc/harbr-router/config.yml command: ["-c", "/etc/harbr-router/config.yml"] @@ -152,6 +216,8 @@ services: ### Kubernetes +A ConfigMap example with both HTTP and TCP configuration: + ```yaml apiVersion: v1 kind: ConfigMap @@ -162,41 +228,22 @@ data: listen_addr: "0.0.0.0:8080" global_timeout_ms: 5000 max_connections: 10000 + tcp_proxy: + enabled: true + listen_addr: "0.0.0.0:9090" routes: "/api": upstream: "http://backend-api:8080" health_check_path: "/health" timeout_ms: 3000 - retry_count: 2 - ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: harbr-router -spec: - replicas: 3 - template: - spec: - containers: - - name: harbr-router - image: harbr-router:latest - ports: - - containerPort: 8080 - volumeMounts: - - name: config - mountPath: /etc/harbr-router - volumes: - - name: config - configMap: - name: harbr-router-config + "postgres-db": + upstream: "postgresql://postgres-db:5432" + is_tcp: true ``` ## Performance Tuning -### System Limits - -For high-performance deployments, adjust system limits: +For high-performance deployments with TCP traffic, adjust system limits: ```bash # /etc/sysctl.conf @@ -204,16 +251,17 @@ net.core.somaxconn = 65535 net.ipv4.tcp_max_syn_backlog = 65535 net.ipv4.ip_local_port_range = 1024 65535 net.ipv4.tcp_tw_reuse = 1 +fs.file-max = 2097152 # Increased for high TCP connection count ``` -### Runtime Configuration - -Set these environment variables for optimal performance: +## Use Cases -```bash -export RUST_MIN_THREADS=4 -export RUST_MAX_THREADS=32 -``` +- **Database Load Balancing**: Distribute database connections across multiple nodes +- **Database Connection Limiting**: Control the maximum connections to your database +- **Database Proxying**: Put your databases behind a secure proxy layer +- **Protocol Conversion**: Use as a bridge between different network protocols +- **Edge Proxy**: Use as an edge proxy for both HTTP and non-HTTP traffic +- **Multi-Protocol Gateway**: Handle mixed HTTP/TCP traffic at the edge ## Contributing diff --git a/config.yml b/config.yml index 2dce63b..f0ed46a 100644 --- a/config.yml +++ b/config.yml @@ -1,20 +1,56 @@ +# config.yml listen_addr: "0.0.0.0:8081" global_timeout_ms: 5000 max_connections: 10000 +# TCP Proxy Configuration +tcp_proxy: + enabled: true + listen_addr: "0.0.0.0:9090" + connection_pooling: true + max_idle_time_secs: 60 + routes: + # HTTP Routes "/api/critical": upstream: "http://critical-backend:8080" priority: 100 timeout_ms: 1000 retry_count: 3 + "/api": - upstream: "http://localhost:8080/docs" + upstream: "http://backend-api:8080" priority: 50 timeout_ms: 3000 retry_count: 2 + + # Default HTTP route "/": - upstream: "http://localhost:8080/" + upstream: "http://default-backend:8080" priority: 0 timeout_ms: 5000 - retry_count: 1 \ No newline at end of file + retry_count: 1 + + # Database Routes (automatically detected as TCP) + "mysql-primary": + upstream: "mysql://db-primary:3306" + is_tcp: true + db_type: "mysql" + timeout_ms: 10000 + retry_count: 3 + + "postgres-analytics": + upstream: "postgresql://analytics-db:5432" + is_tcp: true + db_type: "postgresql" + timeout_ms: 15000 + retry_count: 2 + tcp_listen_port: 5433 # Custom listening port + + # Generic TCP proxy (non-database) + "custom-tcp": + upstream: "tcp-service:9000" + is_tcp: true + timeout_ms: 5000 + retry_count: 1 + tcp_listen_port: 9001 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index a1823d4..393263b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,38 @@ pub struct ProxyConfig { pub routes: HashMap, pub global_timeout_ms: u64, pub max_connections: usize, + + // New TCP proxy specific configuration + #[serde(default)] + pub tcp_proxy: TcpProxyConfig, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +pub struct TcpProxyConfig { + #[serde(default = "default_tcp_enabled")] + pub enabled: bool, + #[serde(default = "default_tcp_listen_addr")] + pub listen_addr: String, + #[serde(default = "default_tcp_connection_pooling")] + pub connection_pooling: bool, + #[serde(default = "default_tcp_max_idle_time_secs")] + pub max_idle_time_secs: u64, +} + +fn default_tcp_enabled() -> bool { + false +} + +fn default_tcp_listen_addr() -> String { + "0.0.0.0:9090".to_string() +} + +fn default_tcp_connection_pooling() -> bool { + true +} + +fn default_tcp_max_idle_time_secs() -> u64 { + 60 } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -16,9 +48,29 @@ pub struct RouteConfig { pub upstream: String, pub timeout_ms: Option, pub retry_count: Option, - #[serde(default)] // This makes priority optional with a default of 0 + #[serde(default)] pub priority: Option, pub preserve_host_header: Option, + + // New TCP-specific configuration + #[serde(default = "default_is_tcp")] + pub is_tcp: bool, + #[serde(default = "default_tcp_port")] + pub tcp_listen_port: Option, + #[serde(default = "default_db_type")] + pub db_type: Option, +} + +fn default_is_tcp() -> bool { + false +} + +fn default_tcp_port() -> Option { + None +} + +fn default_db_type() -> Option { + None } pub fn load_config(path: &str) -> Result { @@ -26,3 +78,59 @@ pub fn load_config(path: &str) -> Result { let config: ProxyConfig = serde_yaml::from_str(&content)?; Ok(config) } + +// Helper function to detect if a route is likely a database +pub fn is_likely_database(route: &RouteConfig) -> bool { + // Check if explicitly marked as TCP + if route.is_tcp { + return true; + } + + // Check if db_type is specified + if route.db_type.is_some() { + return true; + } + + // Basic heuristics for common database port detection + if let Some(port) = extract_port(&route.upstream) { + match port { + 3306 | 33060 => true, // MySQL + 5432 => true, // PostgreSQL + 27017 | 27018 | 27019 => true, // MongoDB + 6379 => true, // Redis + 1521 => true, // Oracle + 1433 => true, // SQL Server + 9042 => true, // Cassandra + 5984 => true, // CouchDB + 8086 => true, // InfluxDB + 9200 | 9300 => true, // Elasticsearch + _ => false, + } + } else { + // Check for database prefixes in the upstream URL + let upstream = route.upstream.to_lowercase(); + upstream.starts_with("mysql://") + || upstream.starts_with("postgresql://") + || upstream.starts_with("mongodb://") + || upstream.starts_with("redis://") + || upstream.starts_with("oracle://") + || upstream.starts_with("sqlserver://") + || upstream.starts_with("cassandra://") + || upstream.starts_with("couchdb://") + || upstream.starts_with("influxdb://") + || upstream.starts_with("elasticsearch://") + } +} + +// Helper function to extract port from a URL +fn extract_port(url: &str) -> Option { + // Parse out protocol + let url_without_protocol = url.split("://").nth(1).unwrap_or(url); + + // Extract host:port part + let host_port = url_without_protocol.split('/').next()?; + + // Extract port + let port_str = host_port.split(':').nth(1)?; + port_str.parse::().ok() +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c60cc01..d5c8c92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,25 +5,46 @@ use tokio::sync::RwLock; mod config; mod metrics; mod proxy; +mod tcp_proxy; // Add the new TCP proxy module #[tokio::main] async fn main() -> Result<()> { // Initialize logging tracing_subscriber::fmt() - .with_env_filter("info,rust_proxy=debug") + .with_env_filter("info,harbr_router=debug") .init(); // Load configuration - let config = config::load_config("config.yml")?; - let config = Arc::new(RwLock::new(config)); + let config_path = std::env::var("CONFIG_FILE").unwrap_or_else(|_| "config.yml".to_string()); + let config = config::load_config(&config_path)?; + let config_arc = Arc::new(RwLock::new(config.clone())); // Initialize metrics metrics::init_metrics()?; - // Start the proxy server - proxy::run_server(config) + // Check for database routes that should be handled as TCP + let has_db_routes = config.routes.iter().any(|(_, route)| { + config::is_likely_database(route) + }); + + // Start TCP proxy if enabled or if database routes are detected + if config.tcp_proxy.enabled || has_db_routes { + tracing::info!("TCP proxy support enabled"); + let tcp_config = config_arc.clone(); + + // Spawn TCP proxy server in a separate task + tokio::spawn(async move { + let tcp_proxy = tcp_proxy::TcpProxyServer::new(tcp_config).await; + if let Err(e) = tcp_proxy.run(&config.tcp_proxy.listen_addr).await { + tracing::error!("TCP proxy server error: {}", e); + } + }); + } + + // Start the HTTP proxy server + proxy::run_server(config_arc) .await - .map_err(|e| anyhow::anyhow!("Server error: {}", e))?; + .map_err(|e| anyhow::anyhow!("HTTP Server error: {}", e))?; Ok(()) -} +} \ No newline at end of file diff --git a/src/tcp_proxy.rs b/src/tcp_proxy.rs new file mode 100644 index 0000000..55a39da --- /dev/null +++ b/src/tcp_proxy.rs @@ -0,0 +1,256 @@ +// src/tcp_proxy.rs +use crate::config::ProxyConfig; +use dashmap::DashMap; +use metrics::{counter, histogram}; +use std::io; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::RwLock; +use tokio::time::timeout; + +type SharedConfig = Arc>; +type ConnectionCache = Arc>>; + +pub struct TcpProxyServer { + config: SharedConfig, + connection_cache: ConnectionCache, + max_idle_time: Duration, + pooled_connections: bool, +} + +impl TcpProxyServer { + pub async fn new(config: SharedConfig) -> Self { + let max_idle_time_secs = { + let cfg = config.read().await; + cfg.tcp_proxy.max_idle_time_secs + }; + Self { + config, + connection_cache: Arc::new(DashMap::new()), + max_idle_time: Duration::from_secs(max_idle_time_secs), + pooled_connections: true, + } + } + pub async fn run(&self, addr: &str) -> io::Result<()> { + let listener = TcpListener::bind(addr).await?; + tracing::info!("TCP proxy listening on {}", addr); + + // Spawn a task to periodically clean idle connections + let cache = self.connection_cache.clone(); + let max_idle = self.max_idle_time; + tokio::spawn(async move { + loop { + TcpProxyServer::clean_idle_connections(cache.clone(), max_idle).await; + tokio::time::sleep(Duration::from_secs(30)).await; + } + }); + + loop { + let (client_stream, client_addr) = match listener.accept().await { + Ok(connection) => connection, + Err(e) => { + tracing::error!("Failed to accept connection: {}", e); + continue; + } + }; + + tracing::info!("Accepted connection from {}", client_addr); + + // Clone required resources for the handler task + let config = self.config.clone(); + let cache = self.connection_cache.clone(); + let max_idle = self.max_idle_time; + let pooled = self.pooled_connections; + + // Spawn a task to handle the connection + tokio::spawn(async move { + if let Err(e) = Self::handle_connection(client_stream, client_addr, config, cache, max_idle, pooled).await { + tracing::error!("Connection error: {}", e); + } + }); + } + } + + async fn handle_connection( + mut client_stream: TcpStream, + client_addr: SocketAddr, + config: SharedConfig, + connection_cache: ConnectionCache, + max_idle_time: Duration, + use_pooling: bool, + ) -> io::Result<()> { + // For simplicity, we'll use the first route as the default TCP route + // In a real implementation, you'd need some way to determine the appropriate route + // possibly by examining the first packet or using separate listeners for different services + let route_config = { + let config_guard = config.read().await; + let first_route = config_guard.routes.iter().next(); + + if let Some((_, route)) = first_route { + route.clone() + } else { + return Err(io::Error::new(io::ErrorKind::NotFound, "No routes configured")); + } + }; + + // Extract host and port from upstream URL + let upstream_url = &route_config.upstream; + let parts: Vec<&str> = upstream_url.split("://").collect(); + let host_port = if parts.len() > 1 { + parts[1].split('/').next().unwrap_or(parts[1]) + } else { + upstream_url.split('/').next().unwrap_or(upstream_url) + }; + + // Start timing + let start = Instant::now(); + + // Try to get a cached connection or create a new one + let mut server_stream = if use_pooling { + match Self::get_or_create_connection(host_port, &connection_cache, max_idle_time).await { + Ok(stream) => stream, + Err(e) => { + tracing::error!("Failed to connect to upstream {}: {}", host_port, e); + return Err(e); + } + } + } else { + match TcpStream::connect(host_port).await { + Ok(stream) => stream, + Err(e) => { + tracing::error!("Failed to connect to upstream {}: {}", host_port, e); + return Err(e); + } + } + }; + + // Set up timeout based on route configuration + let timeout_ms = route_config.timeout_ms.unwrap_or(5000); + let timeout_duration = Duration::from_millis(timeout_ms); + + // Bidirectional copy with timeout + let (mut client_read, mut client_write) = client_stream.split(); + let (mut server_read, mut server_write) = server_stream.split(); + + // Buffer for data transfer + let mut buffer = vec![0u8; 65536]; // 64K buffer + + // Process data in both directions + loop { + // Read from client with timeout + let read_result = match timeout(timeout_duration, client_read.read(&mut buffer)).await { + Ok(result) => result, + Err(_) => { + tracing::warn!("Client read timeout from {}", client_addr); + counter!("tcp_proxy.timeout", 1); + break; + } + }; + + // Process read result + match read_result { + Ok(0) => { + // Client closed the connection + break; + } + Ok(n) => { + // Write data to server + if let Err(e) = server_write.write_all(&buffer[..n]).await { + tracing::error!("Failed to write to server: {}", e); + break; + } + } + Err(e) => { + tracing::error!("Failed to read from client: {}", e); + break; + } + } + + // Read from server with timeout + let read_result = match timeout(timeout_duration, server_read.read(&mut buffer)).await { + Ok(result) => result, + Err(_) => { + tracing::warn!("Server read timeout"); + counter!("tcp_proxy.timeout", 1); + break; + } + }; + + // Process read result + match read_result { + Ok(0) => { + // Server closed the connection + break; + } + Ok(n) => { + // Write data to client + if let Err(e) = client_write.write_all(&buffer[..n]).await { + tracing::error!("Failed to write to client: {}", e); + break; + } + } + Err(e) => { + tracing::error!("Failed to read from server: {}", e); + break; + } + } + } + + // Record metrics + let duration = start.elapsed(); + histogram!("tcp_proxy.connection.duration", duration.as_secs_f64()); + counter!("tcp_proxy.connection.completed", 1); + + // If we're using connection pooling and the connection is still good, return it to the pool + // This would be a more complex check in a real implementation + if use_pooling { + // Recombine the split parts (we can't actually do this easily, so this is simplified) + // In a real implementation, you'd need a different approach to reuse the connection + tracing::info!("Connection completed, not returning to pool in this simplified example"); + } + + Ok(()) + } + + async fn get_or_create_connection( + target: &str, + cache: &ConnectionCache, + max_idle_time: Duration + ) -> io::Result { + // Try to get an existing connection from the pool + if let Some(mut connections) = cache.get_mut(target) { + while let Some((conn, timestamp)) = connections.pop() { + if timestamp.elapsed() < max_idle_time { + tracing::debug!("Reusing cached connection to {}", target); + return Ok(conn); + } + // Connection too old, discard it + tracing::debug!("Discarding expired connection to {}", target); + } + } + + // No valid connection found, create a new one + tracing::debug!("Creating new connection to {}", target); + let stream = TcpStream::connect(target).await?; + counter!("tcp_proxy.connection.new", 1); + Ok(stream) + } + + async fn clean_idle_connections(cache: ConnectionCache, max_idle_time: Duration) { + let now = Instant::now(); + + for mut entry in cache.iter_mut() { + let connections = entry.value_mut(); + + // Remove expired connections + connections.retain(|(_, timestamp)| { + now.duration_since(*timestamp) < max_idle_time + }); + + tracing::debug!("{} connections remaining for {}", connections.len(), entry.key()); + } + } +} \ No newline at end of file