feat: implement OpenAI-compatible HTTP API gateway on port 18790#45
feat: implement OpenAI-compatible HTTP API gateway on port 18790#45mugiwaraluffy56 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements a fully functional OpenAI-compatible HTTP API gateway that binds to port 18790 (or CLI-specified port) when running mofaclaw gateway. It bridges HTTP requests to the existing agent loop via the message bus, using a oneshot-channel-based request correlation mechanism keyed by chat_id. Both streaming (SSE) and non-streaming response modes are supported.
Changes:
- Added
core/src/gateway/types.rswith OpenAI-compatible request/response data structures (ChatCompletionRequest,ChatCompletionResponse,ChatCompletionChunk,ModelList,ApiError) - Added
core/src/gateway/mod.rswith theGatewayServerthat runs an axum HTTP server with four routes (/,/health,/v1/models,/v1/chat/completions) and a background dispatch task for routing agent responses back to waiting HTTP handlers - Integrated the gateway into the CLI's
command_gatewayfunction as a third concurrent task alongside the agent loop and channel manager
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
core/src/gateway/types.rs |
New file with OpenAI-compatible API request/response types |
core/src/gateway/mod.rs |
New file with GatewayServer, HTTP routes, and oneshot-based request correlation |
core/Cargo.toml |
Added axum, tower, and tower-http workspace dependencies |
core/src/lib.rs |
Added pub mod gateway and re-exported GatewayServer |
cli/src/main.rs |
Imported GatewayServer, wired it into the tokio::select! block in command_gateway |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub use gateway::GatewayServer; | ||
| pub use error::*; |
There was a problem hiding this comment.
The re-exports in this file follow alphabetical order (agent, bus, channels, config, cron, error, heartbeat, …). The gateway::GatewayServer re-export should appear after error::* to maintain this ordering. Currently it's placed between cron and error, breaking the alphabetical convention.
| pub use gateway::GatewayServer; | |
| pub use error::*; | |
| pub use error::*; | |
| pub use gateway::GatewayServer; |
|
|
||
| # http server for REST API gateway | ||
| axum = { workspace = true } | ||
| tower = { workspace = true } |
There was a problem hiding this comment.
The tower crate is added as a dependency here but is never directly imported or used anywhere in core/src/. Only tower_http is used (for CorsLayer). The tower dependency can be removed to keep the dependency list clean.
| tower = { workspace = true } |
| .route("/health", get(handle_health)) | ||
| .route("/v1/models", get(handle_models)) | ||
| .route("/v1/chat/completions", post(handle_chat_completions)) | ||
| .layer(CorsLayer::permissive()) |
There was a problem hiding this comment.
CorsLayer::permissive() allows all origins, methods, and headers with no restrictions. While this is convenient for development and local usage, it should be documented as a known security trade-off. If this gateway is ever exposed to non-local networks, unrestricted CORS could allow any website to make requests to the API. Consider at least logging a warning when the server starts if it's binding to a non-loopback address, or making the CORS policy configurable.
| let model_role = model.clone(); | ||
| let id_role = id_for_stream.clone(); | ||
| let model_words = model.clone(); | ||
| let id_words = id_for_stream.clone(); | ||
| let model_stop = model.clone(); | ||
| let id_stop = id_for_stream.clone(); | ||
|
|
||
| // Build a Vec of SSE events: role → words → stop | ||
| let mut events: Vec<Result<Event, Infallible>> = Vec::new(); | ||
|
|
||
| // Role event | ||
| let role_chunk = ChatCompletionChunk::role_chunk(&id_role, &model_role); | ||
| if let Ok(data) = serde_json::to_string(&role_chunk) { | ||
| events.push(Ok(Event::default().data(data))); | ||
| } | ||
|
|
||
| // Word events | ||
| for word in words { | ||
| let chunk = ChatCompletionChunk::content_chunk(&id_words, &model_words, word); | ||
| if let Ok(data) = serde_json::to_string(&chunk) { | ||
| events.push(Ok(Event::default().data(data))); | ||
| } | ||
| } | ||
|
|
||
| // Stop event | ||
| let stop_chunk = ChatCompletionChunk::stop_chunk(&id_stop, &model_stop); |
There was a problem hiding this comment.
There are six unnecessary clones here. Since model and id_for_stream are not moved into a closure or spawned task within this block, you can use &model and &id_for_stream (or just &completion_id) directly in the role_chunk, content_chunk, and stop_chunk calls below—these methods already accept impl Into<String>. All of model_role, id_role, model_words, id_words, model_stop, and id_stop can be removed.
| let model_role = model.clone(); | |
| let id_role = id_for_stream.clone(); | |
| let model_words = model.clone(); | |
| let id_words = id_for_stream.clone(); | |
| let model_stop = model.clone(); | |
| let id_stop = id_for_stream.clone(); | |
| // Build a Vec of SSE events: role → words → stop | |
| let mut events: Vec<Result<Event, Infallible>> = Vec::new(); | |
| // Role event | |
| let role_chunk = ChatCompletionChunk::role_chunk(&id_role, &model_role); | |
| if let Ok(data) = serde_json::to_string(&role_chunk) { | |
| events.push(Ok(Event::default().data(data))); | |
| } | |
| // Word events | |
| for word in words { | |
| let chunk = ChatCompletionChunk::content_chunk(&id_words, &model_words, word); | |
| if let Ok(data) = serde_json::to_string(&chunk) { | |
| events.push(Ok(Event::default().data(data))); | |
| } | |
| } | |
| // Stop event | |
| let stop_chunk = ChatCompletionChunk::stop_chunk(&id_stop, &model_stop); | |
| // Build a Vec of SSE events: role → words → stop | |
| let mut events: Vec<Result<Event, Infallible>> = Vec::new(); | |
| // Role event | |
| let role_chunk = ChatCompletionChunk::role_chunk(&id_for_stream, &model); | |
| if let Ok(data) = serde_json::to_string(&role_chunk) { | |
| events.push(Ok(Event::default().data(data))); | |
| } | |
| // Word events | |
| for word in words { | |
| let chunk = | |
| ChatCompletionChunk::content_chunk(&id_for_stream, &model, word); | |
| if let Ok(data) = serde_json::to_string(&chunk) { | |
| events.push(Ok(Event::default().data(data))); | |
| } | |
| } | |
| // Stop event | |
| let stop_chunk = ChatCompletionChunk::stop_chunk(&id_for_stream, &model); |
| // Register pending request BEFORE publishing so the reply isn't missed | ||
| let (tx, rx) = oneshot::channel::<String>(); | ||
| { | ||
| let mut map = state.pending.lock().await; |
There was a problem hiding this comment.
When a conversation_id is provided, chat_id is set to that value. If two concurrent requests share the same conversation_id, the second map.insert(chat_id.clone(), tx) will silently overwrite the first sender in the PendingMap. This causes the first request's oneshot sender to be dropped, and the first request's receiver will get a RecvError, resulting in a 500 "Agent response channel closed unexpectedly" error.
To prevent this, either reject the second request if a pending entry already exists for the same chat_id, or append a unique suffix (e.g., format!("{}-{}", conversation_id, Uuid::new_v4())) to disambiguate the map key while still passing the conversation_id as the session key to the agent.
| let mut map = state.pending.lock().await; | |
| let mut map = state.pending.lock().await; | |
| if map.contains_key(&chat_id) { | |
| // There is already a pending request for this conversation; reject to avoid | |
| // overwriting the existing sender and breaking the in‑flight request. | |
| return ( | |
| StatusCode::CONFLICT, | |
| Json(ApiError::internal( | |
| "A request for this conversation is already in progress", | |
| )), | |
| ) | |
| .into_response(); | |
| } |
| if user_content.is_empty() { | ||
| let status = StatusCode::BAD_REQUEST; | ||
| let body = Json(ApiError::internal("No user message found in request")); | ||
| return (status, body).into_response(); |
There was a problem hiding this comment.
ApiError::internal(...) is used here for a 400 BAD_REQUEST response, but internal sets the error type to "internal_error". For OpenAI API compatibility, a missing user message is a client error and should use "invalid_request_error" as the type. Consider adding a bad_request constructor to ApiError (similar to the internal and timeout constructors) that sets r#type: "invalid_request_error".
| usage: Usage { | ||
| prompt_tokens: 0, | ||
| completion_tokens, | ||
| total_tokens: completion_tokens, | ||
| }, |
There was a problem hiding this comment.
The prompt_tokens field is hardcoded to 0, and completion_tokens uses a whitespace-based word count rather than an actual token count. While approximate token counts are acceptable for a proxy, reporting prompt_tokens: 0 can mislead clients that rely on usage data for cost tracking or rate limiting. Consider at least estimating prompt_tokens from the input messages (e.g., using the same split_whitespace().count() heuristic applied to the concatenated input) so the total_tokens field is more representative.
| )) | ||
| .into_response() | ||
| } | ||
| } |
There was a problem hiding this comment.
The gateway module has no tests, while nearly every other module in core/src/ has a #[cfg(test)] section (e.g., messages.rs, bus/queue.rs, channels/manager.rs, config.rs). At minimum, the types in types.rs should have unit tests for serialization/deserialization (e.g., verifying ChatCompletionResponse::new produces the expected JSON shape), and mod.rs should have tests verifying the pending-map dispatch logic (register a sender, simulate an outbound message, assert the receiver gets the content).
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::*; | |
| use axum::response::IntoResponse; | |
| use hyper::body::to_bytes; | |
| #[tokio::test] | |
| async fn handle_health_returns_ok_status() { | |
| let response = handle_health().into_response(); | |
| let body_bytes = to_bytes(response.into_body()).await.unwrap(); | |
| let value: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); | |
| assert_eq!(value, serde_json::json!({ "status": "ok" })); | |
| } | |
| } |
feat: implement OpenAI-compatible HTTP API gateway on port 18790
Summary
The
mofaclaw gatewaycommand exposed port 18790 in Docker and had aGatewayConfigstruct withhostandportfields, but nothing ever bound to that port. The axum, tower, and tower-http crates were already declared as workspace dependencies but unused. This PR closes that gap by implementing a fully functional OpenAI-compatible REST API server that starts alongside the existing agent loop and channel manager when you runmofaclaw gateway.What Changed
core/src/gateway/types.rs — New file containing all OpenAI-compatible data structures. This includes
ChatCompletionRequest(with an extra optionalconversation_idfield for multi-turn sessions),ChatCompletionResponse,ChatCompletionChunkfor streaming,ModelList, andApiError. All structs mirror the OpenAI wire format so existing clients work without modification.core/src/gateway/mod.rs — New file containing
GatewayServer. It starts an axum HTTP server and registers four routes. A background tokio task subscribes to the message bus outbound channel and routes replies back to the waiting HTTP handler using oneshot channels stored in aPendingMap. The streaming path word-tokenizes the full agent response and emits it as a sequence of SSE chunks so clients that setstream: trueget the familiar token-by-token experience.core/Cargo.toml — Added axum, tower, and tower-http as dependencies (all already declared at the workspace level, so no version pinning needed here).
core/src/lib.rs — Added
pub mod gatewayand re-exportedGatewayServer.cli/src/main.rs — Imported
GatewayServerand addedhttp_server.run()as a third arm of thetokio::select!block incommand_gateway, so the HTTP server runs concurrently with the agent loop and channel manager under a shared tokio runtime.API Reference
GET / info and endpoint list
GET /health returns {"status":"ok"}
GET /v1/models OpenAI model list response
POST /v1/chat/completions chat, supports stream true or false
How Request Correlation Works
Each HTTP request generates a UUID used as
chat_id. Before publishing to the message bus, the handler inserts aoneshot::Sender<String>into a sharedPendingMapkeyed by thatchat_id. The background dispatch task listens on the broadcast outbound channel and, whenever it sees a message withchannel == "api", pops the matching sender from the map and fires it. The HTTP handler then receives on the oneshot receiver with a 120-second timeout before formatting the OpenAI response. Multi-turn sessions work by passing a stableconversation_idin the request body, which reuses the same session key across calls.Example Usage
Any OpenAI-compatible client (Python openai SDK, LangChain, LlamaIndex, Continue, etc.) can point its
base_urlathttp://localhost:18790/v1and use mofaclaw as a drop-in model with full tool and memory capabilities.