diff --git a/.github/workflows/nginx.yaml b/.github/workflows/nginx.yaml index 02fedf1a..1f10490f 100644 --- a/.github/workflows/nginx.yaml +++ b/.github/workflows/nginx.yaml @@ -50,6 +50,7 @@ env: NGX_TEST_FILES: examples/t NGX_TEST_GLOBALS_DYNAMIC: >- load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_module.so; + load_module ${{ github.workspace }}/nginx/objs/ngx_http_tokio_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_awssigv4_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_curl_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_shared_dict_module.so; @@ -185,9 +186,8 @@ jobs: ~/.cargo/registry/index/ ~/.cargo/registry/cache/ ~/.cargo/git/db/ - nginx/objs/**/CACHEDIR.TAG - nginx/objs/**/ngx-debug - nginx/objs/**/ngx-release + # Windows nmake implementation doesn't support .PHONY. Don't cache + # anything make creates because it might not rebuild correctly key: ${{ runner.os }}-nginx-${{ hashFiles('**/Cargo.lock') }} restore-keys: ${{ runner.os }}-nginx- diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 9006aba7..c87586c2 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -25,6 +25,13 @@ idna_adapter = "=1.1.0" libc = "0.2.140" tokio = { version = "1.33.0", features = ["full"] } + +[[example]] +name = "async" +path = "async.rs" +crate-type = ["cdylib"] +required-features = [ "async" ] + [[example]] name = "curl" path = "curl.rs" @@ -47,8 +54,8 @@ path = "upstream.rs" crate-type = ["cdylib"] [[example]] -name = "async" -path = "async.rs" +name = "tokio" +path = "tokio.rs" crate-type = ["cdylib"] [[example]] @@ -65,3 +72,4 @@ default = ["export-modules", "ngx/vendored"] # See https://github.com/rust-lang/rust/issues/20267 export-modules = [] linux = [] +async = [ "ngx/async" ] diff --git a/examples/async.rs b/examples/async.rs index 47f5de8e..72d93ef3 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -1,26 +1,24 @@ use std::ffi::{c_char, c_void}; -use std::ptr::{addr_of, addr_of_mut}; -use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use std::sync::{Arc, OnceLock}; use std::time::Instant; +use ngx::async_::{sleep, spawn, Task}; use ngx::core; use ngx::ffi::{ - ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt, - ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t, - ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, - NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG, + ngx_array_push, ngx_buf_t, ngx_chain_t, ngx_command_t, ngx_conf_t, ngx_http_finalize_request, + ngx_http_handler_pt, ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, + ngx_http_read_client_request_body, ngx_http_request_t, ngx_int_t, ngx_module_t, ngx_str_t, + ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, + NGX_HTTP_SPECIAL_RESPONSE, NGX_LOG_EMERG, }; use ngx::http::{self, HttpModule, MergeConfigError}; use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule}; use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string}; -use tokio::runtime::Runtime; struct Module; impl http::HttpModule for Module { fn module() -> &'static ngx_module_t { - unsafe { &*::core::ptr::addr_of!(ngx_http_async_module) } + unsafe { &*std::ptr::addr_of!(ngx_http_async_module) } } unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { @@ -96,47 +94,33 @@ impl http::Merge for ModuleConfig { } } -unsafe extern "C" fn check_async_work_done(event: *mut ngx_event_t) { - let ctx = ngx::ngx_container_of!(event, RequestCTX, event); - let c: *mut ngx_connection_t = (*event).data.cast(); - - if (*ctx).done.load(Ordering::Relaxed) { - // Triggering async_access_handler again - ngx_post_event((*c).write, addr_of_mut!(ngx_posted_events)); - } else { - // this doesn't have have good performance but works as a simple thread-safe example and - // doesn't causes segfault. The best method that provides both thread-safety and - // performance requires an nginx patch. - ngx_post_event(event, addr_of_mut!(ngx_posted_next_events)); - } -} - -struct RequestCTX { - done: Arc, - event: ngx_event_t, - task: Option>, -} +extern "C" fn ngx_http_async_commands_set_enable( + cf: *mut ngx_conf_t, + _cmd: *mut ngx_command_t, + conf: *mut c_void, +) -> *mut c_char { + unsafe { + let conf = &mut *(conf as *mut ModuleConfig); + let args: &[ngx_str_t] = (*(*cf).args).as_slice(); + let val = match args[1].to_str() { + Ok(s) => s, + Err(_) => { + ngx_conf_log_error!(NGX_LOG_EMERG, cf, "`async` argument is not utf-8 encoded"); + return ngx::core::NGX_CONF_ERROR; + } + }; -impl Default for RequestCTX { - fn default() -> Self { - Self { - done: AtomicBool::new(false).into(), - event: unsafe { std::mem::zeroed() }, - task: Default::default(), - } - } -} + // set default value optionally + conf.enable = false; -impl Drop for RequestCTX { - fn drop(&mut self) { - if let Some(handle) = self.task.take() { - handle.abort(); + if val.eq_ignore_ascii_case("on") { + conf.enable = true; + } else if val.eq_ignore_ascii_case("off") { + conf.enable = false; } + }; - if self.event.posted() != 0 { - unsafe { ngx::ffi::ngx_delete_posted_event(&mut self.event) }; - } - } + ngx::core::NGX_CONF_OK } http_request_handler!(async_access_handler, |request: &mut http::Request| { @@ -148,96 +132,117 @@ http_request_handler!(async_access_handler, |request: &mut http::Request| { return core::Status::NGX_DECLINED; } - if let Some(ctx) = - unsafe { request.get_module_ctx::(&*addr_of!(ngx_http_async_module)) } + if request + .get_module_ctx::>(unsafe { &*std::ptr::addr_of!(ngx_http_async_module) }) + .is_some() { - if !ctx.done.load(Ordering::Relaxed) { - return core::Status::NGX_AGAIN; - } - - return core::Status::NGX_OK; + return core::Status::NGX_DONE; } - let ctx = request.pool().allocate(RequestCTX::default()); - if ctx.is_null() { - return core::Status::NGX_ERROR; + let rc = + unsafe { ngx_http_read_client_request_body(request.into(), Some(content_event_handler)) }; + if rc as u32 >= NGX_HTTP_SPECIAL_RESPONSE { + return core::Status(rc); } - request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) }); - - let ctx = unsafe { &mut *ctx }; - ctx.event.handler = Some(check_async_work_done); - ctx.event.data = request.connection().cast(); - ctx.event.log = unsafe { (*request.connection()).log }; - unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) }; - // Request is no longer needed and can be converted to something movable to the async block - let req = AtomicPtr::new(request.into()); - let done_flag = ctx.done.clone(); + core::Status::NGX_DONE +}); - let rt = ngx_http_async_runtime(); - ctx.task = Some(rt.spawn(async move { +extern "C" fn content_event_handler(request: *mut ngx_http_request_t) { + let task = spawn(async move { let start = Instant::now(); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) }; - // not really thread safe, we should apply all these operation in nginx thread - // but this is just an example. proper way would be storing these headers in the request ctx - // and apply them when we get back to the nginx thread. + sleep(std::time::Duration::from_secs(2)).await; + + let req = unsafe { http::Request::from_ngx_http_request(request) }; req.add_header_out( "X-Async-Time", start.elapsed().as_millis().to_string().as_str(), ); + req.set_status(http::HTTPStatus::OK); + req.send_header(); + let buf = req.pool().calloc(std::mem::size_of::()) as *mut ngx_buf_t; + unsafe { + (*buf).set_last_buf(if req.is_main() { 1 } else { 0 }); + (*buf).set_last_in_chain(1); + } + req.output_filter(&mut ngx_chain_t { + buf, + next: std::ptr::null_mut(), + }); + + unsafe { + ngx::ffi::ngx_post_event( + (*(*request).connection).write, + std::ptr::addr_of_mut!(ngx::ffi::ngx_posted_events), + ); + } + }); - done_flag.store(true, Ordering::Release); - // there is a small issue here. If traffic is low we may get stuck behind a 300ms timer - // in the nginx event loop. To workaround it we can notify the event loop using - // pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx - // and use the same trick as the thread pool) - })); + let req = unsafe { http::Request::from_ngx_http_request(request) }; - core::Status::NGX_AGAIN -}); + let ctx = req.pool().allocate::>(task); + if ctx.is_null() { + unsafe { ngx_http_finalize_request(request, core::Status::NGX_ERROR.into()) }; + return; + } + req.set_module_ctx(ctx.cast(), unsafe { + &*std::ptr::addr_of!(ngx_http_async_module) + }); + unsafe { (*request).write_event_handler = Some(write_event_handler) }; +} -extern "C" fn ngx_http_async_commands_set_enable( - cf: *mut ngx_conf_t, - _cmd: *mut ngx_command_t, - conf: *mut c_void, -) -> *mut c_char { - unsafe { - let conf = &mut *(conf as *mut ModuleConfig); - let args: &[ngx_str_t] = (*(*cf).args).as_slice(); - let val = match args[1].to_str() { - Ok(s) => s, - Err(_) => { - ngx_conf_log_error!(NGX_LOG_EMERG, cf, "`async` argument is not utf-8 encoded"); - return ngx::core::NGX_CONF_ERROR; - } - }; +extern "C" fn write_event_handler(request: *mut ngx_http_request_t) { + let req = unsafe { http::Request::from_ngx_http_request(request) }; + if let Some(task) = + req.get_module_ctx::>(unsafe { &*std::ptr::addr_of!(ngx_http_async_module) }) + { + if task.is_finished() { + unsafe { ngx_http_finalize_request(request, core::Status::NGX_OK.into()) }; + return; + } + } - // set default value optionally - conf.enable = false; + let write_event = + unsafe { (*(*request).connection).write.as_ref() }.expect("write event is not null"); + if write_event.timedout() != 0 { + unsafe { + ngx::ffi::ngx_connection_error( + (*request).connection, + ngx::ffi::NGX_ETIMEDOUT as i32, + c"client timed out".as_ptr() as *mut _, + ) + }; + return; + } - if val.eq_ignore_ascii_case("on") { - conf.enable = true; - } else if val.eq_ignore_ascii_case("off") { - conf.enable = false; - } - }; + if unsafe { ngx::ffi::ngx_http_output_filter(request, std::ptr::null_mut()) } + == ngx::ffi::NGX_ERROR as isize + { + // Client error + return; + } + let clcf = + NgxHttpCoreModule::location_conf(unsafe { request.as_ref().expect("request not null") }) + .expect("http core server conf"); - ngx::core::NGX_CONF_OK -} + if unsafe { + ngx::ffi::ngx_handle_write_event(std::ptr::from_ref(write_event) as *mut _, clcf.send_lowat) + } != ngx::ffi::NGX_OK as isize + { + // Client error + return; + } -fn ngx_http_async_runtime() -> &'static Runtime { - // Should not be called from the master process - assert_ne!( - unsafe { ngx::ffi::ngx_process }, - ngx::ffi::NGX_PROCESS_MASTER as _ - ); - - static RUNTIME: OnceLock = OnceLock::new(); - RUNTIME.get_or_init(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("tokio runtime init") - }) + if write_event.delayed() == 0 { + if (write_event.active() != 0) && (write_event.ready() == 0) { + unsafe { + ngx::ffi::ngx_add_timer( + std::ptr::from_ref(write_event) as *mut _, + clcf.send_timeout, + ) + } + } else if write_event.timer_set() != 0 { + unsafe { ngx::ffi::ngx_del_timer(std::ptr::from_ref(write_event) as *mut _) } + } + } } diff --git a/examples/config b/examples/config index 6b763652..45aa9a24 100644 --- a/examples/config +++ b/examples/config @@ -19,6 +19,15 @@ if [ $HTTP = YES ]; then ngx_module_name=ngx_http_async_module ngx_module_libs="-lm" ngx_rust_target_name=async + ngx_rust_target_features=async + + ngx_rust_module + fi + + if :; then + ngx_module_name=ngx_http_tokio_module + ngx_module_libs="-lm" + ngx_rust_target_name=tokio ngx_rust_module fi diff --git a/examples/t/tokio.t b/examples/t/tokio.t new file mode 100644 index 00000000..a8371831 --- /dev/null +++ b/examples/t/tokio.t @@ -0,0 +1,56 @@ +#!/usr/bin/perl + +# (C) Nginx, Inc + +# Tests for ngx-rust example modules. + +############################################################################### + +use warnings; +use strict; + +use Test::More; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http/)->plan(1) + ->write_file_expand('nginx.conf', <<'EOF'); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location / { + tokio on; + } + } +} + +EOF + +$t->write_file('index.html', ''); +$t->run(); + +############################################################################### + +like(http_get('/index.html'), qr/X-Tokio-Time:/, 'tokio handler'); + +############################################################################### diff --git a/examples/tokio.conf b/examples/tokio.conf new file mode 100644 index 00000000..aea1e4a3 --- /dev/null +++ b/examples/tokio.conf @@ -0,0 +1,24 @@ +daemon off; +master_process off; +# worker_processes 1; + +load_module modules/libtokio.so; +error_log error.log debug; + +events { } + +http { + server { + listen *:8000; + server_name localhost; + location / { + root html; + index index.html index.htm; + tokio on; + } + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root html; + } + } +} diff --git a/examples/tokio.rs b/examples/tokio.rs new file mode 100644 index 00000000..85d9f34a --- /dev/null +++ b/examples/tokio.rs @@ -0,0 +1,243 @@ +use std::ffi::{c_char, c_void}; +use std::ptr::{addr_of, addr_of_mut}; +use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use std::sync::{Arc, OnceLock}; +use std::time::Instant; + +use ngx::core; +use ngx::ffi::{ + ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt, + ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t, + ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, + NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG, +}; +use ngx::http::{self, HttpModule, MergeConfigError}; +use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule}; +use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string}; +use tokio::runtime::Runtime; + +struct Module; + +impl http::HttpModule for Module { + fn module() -> &'static ngx_module_t { + unsafe { &*::core::ptr::addr_of!(ngx_http_tokio_module) } + } + + unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { + // SAFETY: this function is called with non-NULL cf always + let cf = &mut *cf; + let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf"); + + let h = ngx_array_push( + &mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers, + ) as *mut ngx_http_handler_pt; + if h.is_null() { + return core::Status::NGX_ERROR.into(); + } + // set an Access phase handler + *h = Some(tokio_access_handler); + core::Status::NGX_OK.into() + } +} + +#[derive(Debug, Default)] +struct ModuleConfig { + enable: bool, +} + +unsafe impl HttpModuleLocationConf for Module { + type LocationConf = ModuleConfig; +} + +static mut NGX_HTTP_TOKIO_COMMANDS: [ngx_command_t; 2] = [ + ngx_command_t { + name: ngx_string!("tokio"), + type_: (NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1) as ngx_uint_t, + set: Some(ngx_http_tokio_commands_set_enable), + conf: NGX_HTTP_LOC_CONF_OFFSET, + offset: 0, + post: std::ptr::null_mut(), + }, + ngx_command_t::empty(), +]; + +static NGX_HTTP_TOKIO_MODULE_CTX: ngx_http_module_t = ngx_http_module_t { + preconfiguration: Some(Module::preconfiguration), + postconfiguration: Some(Module::postconfiguration), + create_main_conf: None, + init_main_conf: None, + create_srv_conf: None, + merge_srv_conf: None, + create_loc_conf: Some(Module::create_loc_conf), + merge_loc_conf: Some(Module::merge_loc_conf), +}; + +// Generate the `ngx_modules` table with exported modules. +// This feature is required to build a 'cdylib' dynamic module outside of the NGINX buildsystem. +#[cfg(feature = "export-modules")] +ngx::ngx_modules!(ngx_http_tokio_module); + +#[used] +#[allow(non_upper_case_globals)] +#[cfg_attr(not(feature = "export-modules"), no_mangle)] +pub static mut ngx_http_tokio_module: ngx_module_t = ngx_module_t { + ctx: std::ptr::addr_of!(NGX_HTTP_TOKIO_MODULE_CTX) as _, + commands: unsafe { &NGX_HTTP_TOKIO_COMMANDS[0] as *const _ as *mut _ }, + type_: NGX_HTTP_MODULE as _, + ..ngx_module_t::default() +}; + +impl http::Merge for ModuleConfig { + fn merge(&mut self, prev: &ModuleConfig) -> Result<(), MergeConfigError> { + if prev.enable { + self.enable = true; + }; + Ok(()) + } +} + +unsafe extern "C" fn check_tokio_work_done(event: *mut ngx_event_t) { + let ctx = ngx::ngx_container_of!(event, RequestCTX, event); + let c: *mut ngx_connection_t = (*event).data.cast(); + + if (*ctx).done.load(Ordering::Relaxed) { + // Triggering tokio_access_handler again + ngx_post_event((*c).write, addr_of_mut!(ngx_posted_events)); + } else { + // this doesn't have have good performance but works as a simple thread-safe example and + // doesn't causes segfault. The best method that provides both thread-safety and + // performance requires an nginx patch. + ngx_post_event(event, addr_of_mut!(ngx_posted_next_events)); + } +} + +struct RequestCTX { + done: Arc, + event: ngx_event_t, + task: Option>, +} + +impl Default for RequestCTX { + fn default() -> Self { + Self { + done: AtomicBool::new(false).into(), + event: unsafe { std::mem::zeroed() }, + task: Default::default(), + } + } +} + +impl Drop for RequestCTX { + fn drop(&mut self) { + if let Some(handle) = self.task.take() { + handle.abort(); + } + + if self.event.posted() != 0 { + unsafe { ngx::ffi::ngx_delete_posted_event(&mut self.event) }; + } + } +} + +http_request_handler!(tokio_access_handler, |request: &mut http::Request| { + let co = Module::location_conf(request).expect("module config is none"); + + ngx_log_debug_http!(request, "tokio module enabled: {}", co.enable); + + if !co.enable { + return core::Status::NGX_DECLINED; + } + + if let Some(ctx) = + unsafe { request.get_module_ctx::(&*addr_of!(ngx_http_tokio_module)) } + { + if !ctx.done.load(Ordering::Relaxed) { + return core::Status::NGX_AGAIN; + } + + return core::Status::NGX_OK; + } + + let ctx = request.pool().allocate(RequestCTX::default()); + if ctx.is_null() { + return core::Status::NGX_ERROR; + } + request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_tokio_module) }); + + let ctx = unsafe { &mut *ctx }; + ctx.event.handler = Some(check_tokio_work_done); + ctx.event.data = request.connection().cast(); + ctx.event.log = unsafe { (*request.connection()).log }; + unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) }; + + // Request is no longer needed and can be converted to something movable to the async block + let req = AtomicPtr::new(request.into()); + let done_flag = ctx.done.clone(); + + let rt = ngx_http_tokio_runtime(); + ctx.task = Some(rt.spawn(async move { + let start = Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) }; + // not really thread safe, we should apply all these operation in nginx thread + // but this is just an example. proper way would be storing these headers in the request ctx + // and apply them when we get back to the nginx thread. + req.add_header_out( + "X-Tokio-Time", + start.elapsed().as_millis().to_string().as_str(), + ); + + done_flag.store(true, Ordering::Release); + // there is a small issue here. If traffic is low we may get stuck behind a 300ms timer + // in the nginx event loop. To workaround it we can notify the event loop using + // pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx + // and use the same trick as the thread pool) + })); + + core::Status::NGX_AGAIN +}); + +extern "C" fn ngx_http_tokio_commands_set_enable( + cf: *mut ngx_conf_t, + _cmd: *mut ngx_command_t, + conf: *mut c_void, +) -> *mut c_char { + unsafe { + let conf = &mut *(conf as *mut ModuleConfig); + let args: &[ngx_str_t] = (*(*cf).args).as_slice(); + let val = match args[1].to_str() { + Ok(s) => s, + Err(_) => { + ngx_conf_log_error!(NGX_LOG_EMERG, cf, "`tokio` argument is not utf-8 encoded"); + return ngx::core::NGX_CONF_ERROR; + } + }; + + // set default value optionally + conf.enable = false; + + if val.eq_ignore_ascii_case("on") { + conf.enable = true; + } else if val.eq_ignore_ascii_case("off") { + conf.enable = false; + } + }; + + ngx::core::NGX_CONF_OK +} + +fn ngx_http_tokio_runtime() -> &'static Runtime { + // Should not be called from the master process + assert_ne!( + unsafe { ngx::ffi::ngx_process }, + ngx::ffi::NGX_PROCESS_MASTER as _ + ); + + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("tokio runtime init") + }) +}