Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
pull_request:
branches: [ "main" ]

permissions:
contents: read

env:
CARGO_TERM_COLOR: always

Expand Down Expand Up @@ -34,11 +37,28 @@ jobs:
- name: Redirect
run: echo '<!DOCTYPE HTML><html lang="en-US"><head><meta charset="UTF-8"><meta http-equiv="refresh" content="0; url=https://liberdus.com/liberdus-proxy/liberdus_proxy"><script type="text/javascript">window.location.href = "https://liberdus.com/liberdus-proxy/liberdus_proxy"</script><title>Page Redirection</title></head><body>If you are not redirected automatically, follow this <a href="https://liberdus.com/liberdus-proxy/liberdus_proxy">link to example</a>.</body></html>' > ./target/doc/index.html

- name: Upload docs
uses: actions/upload-artifact@v4
with:
name: docs
path: target/doc/

deploy-docs:
needs: build
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
permissions:
contents: write

steps:
- name: Download docs
uses: actions/download-artifact@v4
with:
name: docs
path: target/doc/

- name: Deploy to gh-pages
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./target/doc/

2 changes: 1 addition & 1 deletion src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
return Err(Box::new(e));
}
}
},
}
Ok(Err(e)) => {
eprintln!("Error forwarding request to collector api server: {}", e);
http::respond_with_internal_error(client_stream).await?;
Expand Down
92 changes: 69 additions & 23 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn get_timestamp() -> String {
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

// Format as human-readable timestamp (you can adjust format as needed)
format!("{}", now)
}
Expand Down Expand Up @@ -142,8 +142,12 @@ pub async fn handle_stream<StreamLike>(
where
StreamLike: AsyncWrite + AsyncRead + Unpin + Send,
{
println!("[{}] [HTTP_CONNECT] IP: {} | New connection established", get_timestamp(), client_addr);

println!(
"[{}] [HTTP_CONNECT] IP: {} | New connection established",
get_timestamp(),
client_addr
);

loop {
let mut req_buf = Vec::new();
match timeout(
Expand All @@ -158,35 +162,58 @@ where
// Client closed connection cleanly
break;
}

// Log that we received data
println!("[{}] [HTTP_DATA] IP: {} | Received {} bytes", get_timestamp(), client_addr, req_buf.len());

println!(
"[{}] [HTTP_DATA] IP: {} | Received {} bytes",
get_timestamp(),
client_addr,
req_buf.len()
);

// Log warning for unusually large requests
if req_buf.len() > 100_000 {
eprintln!("[{}] [HTTP_WARNING] IP: {} | Large request received: {} bytes", get_timestamp(), client_addr, req_buf.len());
eprintln!(
"[{}] [HTTP_WARNING] IP: {} | Large request received: {} bytes",
get_timestamp(),
client_addr,
req_buf.len()
);
}

let method_route = match get_route(&req_buf) {
Some(route_info) => route_info,
None => {
eprintln!("[{}] [HTTP_ERROR] IP: {} | Failed to extract route from request", get_timestamp(), client_addr);
eprintln!(
"[{}] [HTTP_ERROR] IP: {} | Failed to extract route from request",
get_timestamp(),
client_addr
);
// Log the first 200 bytes of the request for debugging
if let Ok(req_str) = std::str::from_utf8(&req_buf) {
let preview = if req_str.len() > 200 { &req_str[..200] } else { req_str };
eprintln!("[{}] [HTTP_DEBUG] IP: {} | Request preview: {:?}", get_timestamp(), client_addr, preview);
let preview = if req_str.len() > 200 {
&req_str[..200]
} else {
req_str
};
eprintln!(
"[{}] [HTTP_DEBUG] IP: {} | Request preview: {:?}",
get_timestamp(),
client_addr,
preview
);
}
break;
}
};

let (method, route) = method_route;

// Extract client information for logging
let user_agent = extract_user_agent(&req_buf);
let client_ip = extract_client_ip(&req_buf, &client_addr);
let (platform, app_version, device_id) = extract_client_info(&req_buf);

// Log successful request
println!(
"[{}] [REQUEST] IP: {} | Route: {} {} | UA: {} | Platform: {} | AppVer: {} | DeviceID: {}",
Expand Down Expand Up @@ -270,11 +297,13 @@ where
}
Ok(Err(e)) => {
let error_details = match e.kind() {
std::io::ErrorKind::UnexpectedEof => "Unexpected EOF - client disconnected abruptly",
std::io::ErrorKind::UnexpectedEof => {
"Unexpected EOF - client disconnected abruptly"
}
std::io::ErrorKind::ConnectionReset => "Connection reset by peer",
std::io::ErrorKind::ConnectionAborted => "Connection aborted",
std::io::ErrorKind::TimedOut => "Read operation timed out",
_ => "Unknown IO error"
_ => "Unknown IO error",
};
eprintln!(
"[{}] [STREAM_ERROR] IP: {} | Error attempting to read bytes out of client stream: {} (Kind: {:?}, Details: {})",
Expand All @@ -292,8 +321,12 @@ where
}
}

println!("[{}] [HTTP_DISCONNECT] IP: {} | Connection closed", get_timestamp(), client_addr);

println!(
"[{}] [HTTP_DISCONNECT] IP: {} | Connection closed",
get_timestamp(),
client_addr
);

match client_stream.shutdown().await {
Ok(_) => Ok(()),
Err(_e) => Ok(()),
Expand Down Expand Up @@ -487,7 +520,7 @@ pub fn extract_client_info(buffer: &[u8]) -> (String, String, String) {
let mut platform = "Unknown".to_string();
let mut app_version = "Unknown".to_string();
let mut device_id = "Unknown".to_string();

if let Ok(buffer_str) = std::str::from_utf8(buffer) {
for line in buffer_str.lines() {
let line_lower = line.to_lowercase();
Expand All @@ -500,7 +533,7 @@ pub fn extract_client_info(buffer: &[u8]) -> (String, String, String) {
}
}
}

(platform, app_version, device_id)
}
pub async fn listen(
Expand Down Expand Up @@ -558,8 +591,14 @@ pub async fn listen(
Ok(tls_stream) => {
let tls_stream = tokio_rustls::TlsStream::Server(tls_stream);
let client_addr = format!("{}", socket_addr);
let e =
handle_stream(tls_stream, liberdus, subscription_manager, config, client_addr).await;
let e = handle_stream(
tls_stream,
liberdus,
subscription_manager,
config,
client_addr,
)
.await;
if let Err(e) = e {
eprintln!("Handle Stream Error: {}", e);
}
Expand All @@ -574,7 +613,14 @@ pub async fn listen(
},
None => {
let client_addr = format!("{}", socket_addr);
let e = handle_stream(raw_stream, liberdus, subscription_manager, config, client_addr).await;
let e = handle_stream(
raw_stream,
liberdus,
subscription_manager,
config,
client_addr,
)
.await;
if let Err(e) = e {
eprintln!("Handle Stream Error: {}", e);
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ pub struct Stats {
pub mod archivers;
pub mod collector;
pub mod config;
pub mod observer_gateway;
pub mod crypto;
pub mod http;
pub mod liberdus;
pub mod notifier;
pub mod observer_gateway;
pub mod rpc;
pub mod shardus_monitor;
pub mod subscription;
Expand Down
28 changes: 21 additions & 7 deletions src/observer_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ where
};

let endpoint = match validate_observer_endpoint(&method, &route) {
ObserverEndpointValidation::AllowedNotifyBridgeout => {
ObserverEndpoint::NotifyBridgeout
}
ObserverEndpointValidation::AllowedNotifyBridgeout => ObserverEndpoint::NotifyBridgeout,
ObserverEndpointValidation::AllowedPreflight => ObserverEndpoint::Preflight,
ObserverEndpointValidation::AllowedTransactionGet(forward_route) => {
ObserverEndpoint::TransactionGet(forward_route)
Expand Down Expand Up @@ -106,7 +104,12 @@ where
// Validate request: Bridge UI sends JSON.stringify({ chainId }) — body must be {"chainId": number}
if body.is_empty() {
eprintln!("[notify-bridgeout] rejected: empty body");
respond_json(client_stream, 400, r#"{"Err":"Invalid or missing chainId"}"#).await?;
respond_json(
client_stream,
400,
r#"{"Err":"Invalid or missing chainId"}"#,
)
.await?;
return Ok(());
}
let chain_id_valid = serde_json::from_slice::<serde_json::Value>(&body)
Expand All @@ -118,7 +121,12 @@ where
.is_some();
if !chain_id_valid {
eprintln!("[notify-bridgeout] rejected: invalid or missing chainId in body");
respond_json(client_stream, 400, r#"{"Err":"Invalid or missing chainId"}"#).await?;
respond_json(
client_stream,
400,
r#"{"Err":"Invalid or missing chainId"}"#,
)
.await?;
return Ok(());
}

Expand Down Expand Up @@ -157,7 +165,10 @@ where
}
}
Err(e) => {
eprintln!("[notify-bridgeout] observer request failed: {} (url={})", e, url);
eprintln!(
"[notify-bridgeout] observer request failed: {} (url={})",
e, url
);
}
}
}
Expand Down Expand Up @@ -524,7 +535,10 @@ fn filter_transaction_response_by_timestamp(

if let Some(ok) = v.get_mut("Ok").and_then(|ok| ok.as_object_mut()) {
let len = filtered.len() as u64;
ok.insert("transactions".to_string(), serde_json::Value::Array(filtered));
ok.insert(
"transactions".to_string(),
serde_json::Value::Array(filtered),
);
ok.insert(
"totalTranactions".to_string(),
serde_json::Value::Number(serde_json::Number::from(len)),
Expand Down
Loading
Loading