Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions crates/trigger-http/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::HashMap,
future::Future,
io::{ErrorKind, IsTerminal},
net::SocketAddr,
sync::{Arc, OnceLock},
Expand Down Expand Up @@ -180,9 +179,10 @@ impl<F: RuntimeFactors> HttpServer<F> {
None | Some(HttpExecutorType::Http) => HandlerType::from_instance_pre(
pre,
HttpHandlerState {
trigger_app: trigger_app.clone(),
component_id: component_id.into(),
reuse_config,
server: Default::default(),
self_scheme: Default::default(),
},
)?,
Some(HttpExecutorType::Wagi(wagi_config)) => {
Expand Down Expand Up @@ -329,7 +329,7 @@ impl<F: RuntimeFactors> HttpServer<F> {
server_scheme: Scheme,
client_addr: SocketAddr,
) -> anyhow::Result<Response<Body>> {
set_req_uri(&mut req, server_scheme.clone())?;
set_req_uri(&mut req, server_scheme)?;
let app_id = self
.trigger_app
.app()
Expand All @@ -355,7 +355,6 @@ impl<F: RuntimeFactors> HttpServer<F> {
self.respond_wasm_component(
req,
route_match,
server_scheme,
client_addr,
component,
&trigger_config.executor,
Expand Down Expand Up @@ -383,28 +382,10 @@ impl<F: RuntimeFactors> HttpServer<F> {
self: &Arc<Self>,
req: Request<Body>,
route_match: RouteMatch<'_, '_>,
server_scheme: Scheme,
client_addr: SocketAddr,
component_id: &str,
executor: &Option<HttpExecutorType>,
) -> anyhow::Result<Response<Body>> {
let mut instance_builder = self.trigger_app.prepare(component_id)?;

// Set up outbound HTTP request origin and service chaining
// The outbound HTTP factor is required since both inbound and outbound wasi HTTP
// implementations assume they use the same underlying wasmtime resource storage.
// Eventually, we may be able to factor this out to a separate factor.
let outbound_http = instance_builder
.factor_builder::<OutboundHttpFactor>()
.context(
"The wasi HTTP trigger was configured without the required wasi outbound http support",
)?;

let self_addr = self.get_local_addr();
let origin = SelfRequestOrigin::create(server_scheme, &self_addr.to_string())?;
outbound_http.set_self_request_origin(origin);
outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;

// Prepare HTTP executor
let handler_type = self
.component_handler_types
Expand All @@ -416,19 +397,19 @@ impl<F: RuntimeFactors> HttpServer<F> {
HttpExecutorType::Http => match handler_type {
HandlerType::Spin => {
SpinHttpExecutor
.execute(instance_builder, &route_match, req, client_addr)
.execute(self, &route_match, req, client_addr, component_id)
.await
}
HandlerType::Wasi0_3(_, handler) => {
Wasip3HttpExecutor(handler)
.execute(&route_match, req, client_addr)
.execute(self, &route_match, req, client_addr)
.await
}
HandlerType::Wasi0_2(_)
| HandlerType::Wasi2023_11_10(_)
| HandlerType::Wasi2023_10_18(_) => {
WasiHttpExecutor { handler_type }
.execute(instance_builder, &route_match, req, client_addr)
.execute(self, &route_match, req, client_addr, component_id)
.await
}
HandlerType::Wagi(_) => unreachable!(),
Expand All @@ -443,7 +424,7 @@ impl<F: RuntimeFactors> HttpServer<F> {
indices,
};
executor
.execute(instance_builder, &route_match, req, client_addr)
.execute(self, &route_match, req, client_addr, component_id)
.await
}
};
Expand All @@ -460,6 +441,31 @@ impl<F: RuntimeFactors> HttpServer<F> {
}
}

pub(crate) fn trigger_instance_builder(
self: &'_ Arc<Self>,
component_id: &str,
self_scheme: Option<&Scheme>,
) -> anyhow::Result<TriggerInstanceBuilder<'_, F>> {
let mut instance_builder = self.trigger_app.prepare(component_id)?;

// Set up outbound HTTP request origin and service chaining
// The outbound HTTP factor is required since both inbound and outbound wasi HTTP
// implementations assume they use the same underlying wasmtime resource storage.
// Eventually, we may be able to factor this out to a separate factor.
let outbound_http = instance_builder
.factor_builder::<OutboundHttpFactor>()
.context(
"The wasi HTTP trigger was configured without the required wasi outbound http support",
)?;

let self_scheme = self_scheme.cloned().unwrap_or(Scheme::HTTPS);
let self_addr = self.get_local_addr();
let origin = SelfRequestOrigin::create(self_scheme, &self_addr.to_string())?;
outbound_http.set_self_request_origin(origin);
outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
Ok(instance_builder)
}

fn respond_static_response(
sr: &spin_http::config::StaticResponse,
) -> anyhow::Result<Response<Body>> {
Expand Down Expand Up @@ -688,21 +694,20 @@ fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
Ok(())
}

/// An HTTP executor.
pub(crate) trait HttpExecutor {
fn execute<F: RuntimeFactors>(
&self,
instance_builder: TriggerInstanceBuilder<F>,
route_match: &RouteMatch<'_, '_>,
req: Request<Body>,
client_addr: SocketAddr,
) -> impl Future<Output = anyhow::Result<Response<Body>>>;
}
Comment on lines -691 to -700

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by refactor removing this useless trait but it was difficult to extract from the rest of the work.


pub(crate) struct HttpHandlerState<F: RuntimeFactors> {
trigger_app: Arc<TriggerApp<F>>,
component_id: String,
reuse_config: InstanceReuseConfig,
server: OnceLock<Arc<HttpServer<F>>>,
self_scheme: OnceLock<Scheme>,
}

impl<F: RuntimeFactors> HttpHandlerState<F> {
pub(crate) fn init_once(&self, server: &Arc<HttpServer<F>>, first_uri: &Uri) {
self.server.get_or_init(|| server.clone());
if let Some(scheme) = first_uri.scheme() {
self.self_scheme.get_or_init(|| scheme.clone());
}
}
}

impl<F: RuntimeFactors> HandlerState for HttpHandlerState<F> {
Expand All @@ -711,8 +716,10 @@ impl<F: RuntimeFactors> HandlerState for HttpHandlerState<F> {
fn new_store(&self, _req_id: Option<u64>) -> wasmtime::Result<StoreBundle<Self::StoreData>> {
Ok(StoreBundle {
store: self
.trigger_app
.prepare(&self.component_id)
.server
.get()
.expect("server should have been set")
.trigger_instance_builder(&self.component_id, self.self_scheme.get())
Comment thread
lann marked this conversation as resolved.
.to_wasmtime_result()?
.instantiate_store(())
.to_wasmtime_result()?
Expand Down
22 changes: 10 additions & 12 deletions crates/trigger-http/src/spin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use http_body_util::BodyExt;
Expand All @@ -10,32 +10,30 @@ use spin_world::v1::http_types;
use tracing::{Level, instrument};

use crate::{
Body, TriggerInstanceBuilder,
Body, HttpServer,
headers::{append_headers, prepare_request_headers},
server::HttpExecutor,
};

/// An [`HttpExecutor`] that uses the `fermyon:spin/inbound-http` interface.
#[derive(Clone)]
pub struct SpinHttpExecutor;

impl HttpExecutor for SpinHttpExecutor {
impl SpinHttpExecutor {
#[instrument(name = "spin_trigger_http.execute_wasm", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wasm_component {}", route_match.lookup_key().to_string())))]
async fn execute<F: RuntimeFactors>(
pub async fn execute<F: RuntimeFactors>(
&self,
instance_builder: TriggerInstanceBuilder<'_, F>,
server: &Arc<HttpServer<F>>,
route_match: &RouteMatch<'_, '_>,
req: Request<Body>,
client_addr: SocketAddr,
component_id: &str,
) -> Result<Response<Body>> {
let spin_http::routes::TriggerLookupKey::Component(component_id) = route_match.lookup_key()
else {
unreachable!()
};

tracing::trace!("Executing request using the Spin executor for component {component_id}");

let (instance, mut store) = instance_builder.instantiate(()).await?;
let (instance, mut store) = server
.trigger_instance_builder(component_id, req.uri().scheme())?
.instantiate(())
.await?;

let headers = prepare_request_headers(&req, route_match, client_addr)?;
// Expects here are safe since we have already checked that this
Expand Down
25 changes: 10 additions & 15 deletions crates/trigger-http/src/wagi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io::Cursor, net::SocketAddr};
use std::{io::Cursor, net::SocketAddr, sync::Arc};

use anyhow::{Context, Result, ensure};
use http_body_util::BodyExt;
Expand All @@ -11,31 +11,24 @@ use wasmtime_wasi::p2::bindings::CommandIndices;
use wasmtime_wasi::p2::pipe::MemoryOutputPipe;
use wasmtime_wasi_http::p2::body::HyperIncomingBody as Body;

use crate::{TriggerInstanceBuilder, headers::compute_default_headers, server::HttpExecutor};
use crate::{HttpServer, headers::compute_default_headers};

pub struct WagiHttpExecutor<'a> {
pub wagi_config: &'a WagiTriggerConfig,
pub indices: &'a CommandIndices,
}

impl HttpExecutor for WagiHttpExecutor<'_> {
impl WagiHttpExecutor<'_> {
#[instrument(name = "spin_trigger_http.execute_wagi", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wagi_component {}", route_match.lookup_key().to_string())))]
async fn execute<F: RuntimeFactors>(
pub async fn execute<F: RuntimeFactors>(
&self,
mut instance_builder: TriggerInstanceBuilder<'_, F>,
server: &Arc<HttpServer<F>>,
route_match: &RouteMatch<'_, '_>,
req: Request<Body>,
client_addr: SocketAddr,
component_id: &str,
) -> Result<Response<Body>> {
let spin_http::routes::TriggerLookupKey::Component(component) = route_match.lookup_key()
else {
unreachable!()
};

tracing::trace!(
"Executing request using the Wagi executor for component {}",
component
);
tracing::trace!("Executing request using the Wagi executor for component {component_id}");

let uri_path = req.uri().path();

Expand Down Expand Up @@ -79,6 +72,8 @@ impl HttpExecutor for WagiHttpExecutor<'_> {

let stdout = MemoryOutputPipe::new(usize::MAX);

let mut instance_builder =
server.trigger_instance_builder(component_id, parts.uri.scheme())?;
let wasi_builder = instance_builder
.factor_builder::<WasiFactor>()
.context("The wagi HTTP trigger was configured without the required wasi support")?;
Expand Down Expand Up @@ -110,7 +105,7 @@ impl HttpExecutor for WagiHttpExecutor<'_> {
let stdout = stdout.try_into_inner().unwrap();
ensure!(
!stdout.is_empty(),
"The {component:?} component is configured to use the WAGI executor \
"The {component_id:?} component is configured to use the WAGI executor \
but did not write to stdout. Check the `executor` in spin.toml."
);

Expand Down
16 changes: 11 additions & 5 deletions crates/trigger-http/src/wasi.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::IsTerminal;
use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::{Context, Result, anyhow};
use futures::TryFutureExt;
Expand All @@ -16,7 +17,8 @@ use wasmtime_wasi_http::handler::HandlerState;
use wasmtime_wasi_http::p2::bindings::http::types::Scheme;
use wasmtime_wasi_http::p2::{bindings::Proxy, body::HyperIncomingBody as Body};

use crate::{TriggerInstanceBuilder, headers::prepare_request_headers, server::HttpExecutor};
use crate::HttpServer;
use crate::headers::prepare_request_headers;

pub(super) fn prepare_request(
route_match: &RouteMatch<'_, '_>,
Expand Down Expand Up @@ -51,18 +53,22 @@ pub struct WasiHttpExecutor<'a, S: HandlerState> {
pub handler_type: &'a HandlerType<S>,
}

impl<S: HandlerState> HttpExecutor for WasiHttpExecutor<'_, S> {
impl<S: HandlerState> WasiHttpExecutor<'_, S> {
#[instrument(name = "spin_trigger_http.execute_wasm", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wasm_component {}", route_match.lookup_key().to_string())))]
async fn execute<F: RuntimeFactors>(
pub async fn execute<F: RuntimeFactors>(
&self,
instance_builder: TriggerInstanceBuilder<'_, F>,
server: &Arc<HttpServer<F>>,
route_match: &RouteMatch<'_, '_>,
mut req: Request<Body>,
client_addr: SocketAddr,
component_id: &str,
) -> Result<Response<Body>> {
prepare_request(route_match, &mut req, client_addr)?;

let (instance, mut store) = instance_builder.instantiate(()).await?;
let (instance, mut store) = server
.trigger_instance_builder(component_id, req.uri().scheme())?
.instantiate(())
.await?;

let mut wasi_http = spin_factor_outbound_http::OutboundHttpFactor::get_wasi_http_impl(
store.data_mut().factors_instance_state_mut(),
Expand Down
6 changes: 4 additions & 2 deletions crates/trigger-http/src/wasip3.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::server::HttpHandlerState;
use crate::{HttpServer, server::HttpHandlerState};
use anyhow::{Context as _, Result};
use futures::{FutureExt, channel::oneshot};
use http_body_util::BodyExt;
use spin_factor_outbound_http::{NotifyOnDropBody, p3_to_p2_error_code};
use spin_factors::RuntimeFactors;
use spin_factors_executor::InstanceState;
use spin_http::routes::RouteMatch;
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};
use tracing::{Instrument, Level, instrument};
use wasmtime::component::Accessor;
use wasmtime_wasi_http::{
Expand All @@ -24,10 +24,12 @@ impl<F: RuntimeFactors> Wasip3HttpExecutor<'_, F> {
#[instrument(name = "spin_trigger_http.execute_wasm", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wasm_component {}", route_match.lookup_key().to_string())))]
pub async fn execute(
&self,
server: &Arc<HttpServer<F>>,
route_match: &RouteMatch<'_, '_>,
mut req: http::Request<Body>,
client_addr: SocketAddr,
) -> Result<http::Response<Body>> {
self.0.state().init_once(server, req.uri());
super::wasi::prepare_request(route_match, &mut req, client_addr)?;

let getter = (|data| wasi_http::<F>(data).unwrap())
Expand Down
Loading