diff --git a/.gitignore b/.gitignore index 2a2c5a3dd2..00f01468f2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ /target +examples/**/target /.vscode +/**/.vscode .DS_Store *.asm *.img diff --git a/Cargo.lock b/Cargo.lock index 5dbabbab2e..1446ce4fc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ dependencies = [ "axdisplay", "axdma", "axdriver", + "axembassy", "axerrno", "axfeat", "axfs", @@ -253,6 +254,20 @@ dependencies = [ "memory_addr", ] +[[package]] +name = "axasync" +version = "0.1.0" +dependencies = [ + "arceos_api", + "axfeat", + "cfg-if", + "embassy-executor", + "embassy-futures", + "embassy-sync", + "embassy-time", + "static_cell", +] + [[package]] name = "axconfig" version = "0.1.0" @@ -405,6 +420,26 @@ dependencies = [ "virtio-drivers", ] +[[package]] +name = "axembassy" +version = "0.1.0" +dependencies = [ + "axconfig", + "axhal", + "axsync", + "axtask", + "cfg-if", + "embassy-executor", + "embassy-futures", + "embassy-sync", + "embassy-time-driver", + "embassy-time-queue-utils", + "kspin", + "log", + "percpu", + "static_cell", +] + [[package]] name = "axerrno" version = "0.1.0" @@ -421,6 +456,7 @@ dependencies = [ "axalloc", "axdisplay", "axdriver", + "axembassy", "axfs", "axhal", "axlog", @@ -755,6 +791,7 @@ dependencies = [ "axconfig", "axdisplay", "axdriver", + "axembassy", "axfs", "axhal", "axlog", @@ -1080,6 +1117,41 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.104", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.104", +] + [[package]] name = "defmt" version = "0.3.100" @@ -1121,6 +1193,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", +] + [[package]] name = "dw_apb_uart" version = "0.1.0" @@ -1136,12 +1217,136 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "embassy-executor" +version = "0.7.0" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "critical-section", + "document-features", + "embassy-executor-macros", +] + +[[package]] +name = "embassy-executor-macros" +version = "0.6.2" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "embassy-futures" +version = "0.1.1" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" + +[[package]] +name = "embassy-preempt" +version = "0.0.0" +dependencies = [ + "axasync", + "axstd", + "embassy-executor", + "log", +] + +[[package]] +name = "embassy-single" +version = "0.0.0" +dependencies = [ + "axasync", + "axstd", + "embassy-executor", +] + +[[package]] +name = "embassy-sync" +version = "0.7.0" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "cfg-if", + "critical-section", + "embedded-io-async", + "futures-core", + "futures-sink", + "heapless 0.8.0", +] + +[[package]] +name = "embassy-time" +version = "0.4.0" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "cfg-if", + "critical-section", + "document-features", + "embassy-time-driver", + "embedded-hal 0.2.7", + "embedded-hal 1.0.0", + "embedded-hal-async", + "futures-core", +] + +[[package]] +name = "embassy-time-driver" +version = "0.2.0" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "document-features", +] + +[[package]] +name = "embassy-time-queue-utils" +version = "0.1.0" +source = "git+https://github.com/embassy-rs/embassy?branch=main#504261a8d0bc58fcfa8b73245eaf859f88d62a94" +dependencies = [ + "embassy-executor", + "heapless 0.8.0", +] + +[[package]] +name = "embedded-hal" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35949884794ad573cf46071e41c9b60efb0cb311e3ca01f7af807af1debc66ff" +dependencies = [ + "nb 0.1.3", + "void", +] + [[package]] name = "embedded-hal" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89" +[[package]] +name = "embedded-hal-async" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4c685bbef7fe13c3c6dd4da26841ed3980ef33e841cddfa15ce8a8fb3f1884" +dependencies = [ + "embedded-hal 1.0.0", +] + +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + +[[package]] +name = "embedded-io-async" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f" +dependencies = [ + "embedded-io", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1166,6 +1371,24 @@ dependencies = [ "bitmaps", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "fxmac_rs" version = "0.2.0" @@ -1277,6 +1500,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" version = "2.10.0" @@ -1406,6 +1635,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.13" @@ -1474,6 +1709,21 @@ dependencies = [ "paste", ] +[[package]] +name = "nb" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "801d31da0513b6ec5214e9bf433a77966320625a37860f910be265be6e18d06f" +dependencies = [ + "nb 1.1.0", +] + +[[package]] +name = "nb" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d" + [[package]] name = "nom" version = "7.1.3" @@ -1567,6 +1817,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1715,7 +1971,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f1671c79a01a149fe000af2429ce9ccc8e58cdecda72672355d50e5536b363c" dependencies = [ "critical-section", - "embedded-hal", + "embedded-hal 1.0.0", "paste", "riscv-macros", "riscv-pac", @@ -1875,6 +2131,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "static_cell" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89b0684884a883431282db1e4343f34afc2ff6996fe1f4a1664519b66e14c1e" +dependencies = [ + "portable-atomic", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2018,6 +2283,12 @@ dependencies = [ "zerocopy 0.7.35", ] +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "volatile" version = "0.2.7" diff --git a/Cargo.toml b/Cargo.toml index 1931e1fa30..57f53e96cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "modules/axconfig", "modules/axdisplay", "modules/axdriver", + "modules/axembassy", "modules/axfs", "modules/axhal", "modules/axlog", @@ -21,6 +22,7 @@ members = [ "api/arceos_api", "api/arceos_posix_api", + "ulib/axasync", "ulib/axstd", "ulib/axlibc", @@ -30,6 +32,8 @@ members = [ "examples/httpserver", "examples/httpserver", "examples/shell", + "examples/embassy-single", + "examples/embassy-preempt", ] [workspace.package] @@ -44,6 +48,7 @@ keywords = ["arceos", "kernel"] categories = ["os", "no-std"] [workspace.dependencies] +axasync = { path = "ulib/axasync" } axstd = { path = "ulib/axstd" } axlibc = { path = "ulib/axlibc" } @@ -55,6 +60,7 @@ axalloc = { path = "modules/axalloc" } axconfig = { path = "modules/axconfig" } axdisplay = { path = "modules/axdisplay" } axdriver = { path = "modules/axdriver" } +axembassy = { path = "modules/axembassy" } axfs = { path = "modules/axfs" } axhal = { path = "modules/axhal" } axlog = { path = "modules/axlog" } diff --git a/api/arceos_api/Cargo.toml b/api/arceos_api/Cargo.toml index 9f679d830f..44da8282fc 100644 --- a/api/arceos_api/Cargo.toml +++ b/api/arceos_api/Cargo.toml @@ -23,6 +23,10 @@ display = ["dep:axdisplay", "dep:axdriver", "axfeat/display"] myfs = ["axfeat/myfs"] +async-preempt = ["async-thread", "axfeat/async-preempt"] +async-thread = ["dep:axembassy", "axfeat/async-thread"] +async-single = ["dep:axembassy", "axfeat/async-single"] + # Use dummy functions if the feature is not enabled dummy-if-not-enabled = [] @@ -43,3 +47,4 @@ axdriver = { workspace = true, optional = true } axfs = { workspace = true, optional = true } axnet = { workspace = true, optional = true } axdisplay = { workspace = true, optional = true } +axembassy = { workspace = true, optional = true } diff --git a/api/arceos_api/src/imp/embassy_async.rs b/api/arceos_api/src/imp/embassy_async.rs new file mode 100644 index 0000000000..0568cbb29b --- /dev/null +++ b/api/arceos_api/src/imp/embassy_async.rs @@ -0,0 +1,20 @@ +cfg_async! { + pub use axembassy::Executor as AxExecutor; + pub use axembassy::Spawner as AxSpawner; +} + +cfg_async_thread! { + pub use axembassy::SendSpawner as AxSendSpawner; + + pub fn ax_spawner() -> AxSendSpawner { + axembassy::spawner() + } + + pub fn ax_block_on(fut: F) -> F::Output { + axembassy::block_on(fut) + } +} + +cfg_async_preempt! { + pub use axembassy::PrioFuture as AxPrioFuture; +} diff --git a/api/arceos_api/src/imp/mod.rs b/api/arceos_api/src/imp/mod.rs index a8dbdfdb74..1ba15768d5 100644 --- a/api/arceos_api/src/imp/mod.rs +++ b/api/arceos_api/src/imp/mod.rs @@ -1,3 +1,4 @@ +mod embassy_async; mod mem; mod task; @@ -45,6 +46,7 @@ mod time { }; } +pub use self::embassy_async::*; pub use self::mem::*; pub use self::stdio::*; pub use self::task::*; diff --git a/api/arceos_api/src/imp/task.rs b/api/arceos_api/src/imp/task.rs index 2ad194cb7b..953ed1c946 100644 --- a/api/arceos_api/src/imp/task.rs +++ b/api/arceos_api/src/imp/task.rs @@ -26,6 +26,21 @@ pub fn ax_exit(_exit_code: i32) -> ! { cfg_task! { use core::time::Duration; + /// A handle to a futex. + pub type AxFutex = axtask::Futex; + + pub fn ax_futex_wake(futex: &AxFutex) { + axtask::futex_wake(futex); + } + + pub fn ax_futex_wake_all(futex: &AxFutex) { + axtask::futex_wake_all(futex); + } + + pub fn ax_futex_wait(futex: &AxFutex, expected: u32, timeout: Option) -> bool { + axtask::futex_wait(futex, expected, timeout) + } + /// A handle to a task. pub struct AxTaskHandle { inner: axtask::AxTaskRef, diff --git a/api/arceos_api/src/lib.rs b/api/arceos_api/src/lib.rs index 3e6d291870..babbc45fc9 100644 --- a/api/arceos_api/src/lib.rs +++ b/api/arceos_api/src/lib.rs @@ -122,6 +122,7 @@ pub mod task { pub type AxTaskHandle; pub type AxWaitQueueHandle; pub type AxCpuMask; + pub type AxFutex; } define_api! { @@ -175,6 +176,12 @@ pub mod task { /// The maximum number of tasks to wake up is specified by `count`. If /// `count` is `u32::MAX`, it will wake up all tasks in the wait queue. pub fn ax_wait_queue_wake(wq: &AxWaitQueueHandle, count: u32); + /// Wakes up a single task that is currently waiting on the given `Futex`. + pub fn ax_futex_wake(futex: &AxFutex); + /// Wakes up all tasks that are currently waiting on the given `Futex`. + pub fn ax_futex_wake_all(futex: &AxFutex); + /// Attempts to wait on a `Futex` until its value is no longer `expected`. + pub fn ax_futex_wait(futex: &AxFutex, expected: u32, timeout: Option) -> bool; } } @@ -381,6 +388,55 @@ pub mod io { } } +/// Embassy runtime. +pub mod embassy_async { + use core::future::Future; + + #[cfg(any(feature = "async-single", feature = "async-thread"))] + pub use crate::imp::AxExecutor; + #[cfg(all( + feature = "dummy-if-not-enabled", + not(any(feature = "async-single", feature = "async-thread")) + ))] + pub struct AxExecutor; + + #[cfg(any(feature = "async-single", feature = "async-thread"))] + pub use crate::imp::AxSpawner; + #[cfg(all( + feature = "dummy-if-not-enabled", + not(any(feature = "async-single", feature = "async-thread")) + ))] + pub struct AxSpawner; + + define_api_type! { + @cfg "async-thread"; + pub type AxSendSpawner; + } + + define_api! { + @cfg "async-thread"; + /// Returns a global spawner. + pub fn ax_spawner() -> AxSendSpawner; + } + + #[cfg(feature = "async-thread")] + /// Blocks the current task until the future completes. + pub fn ax_block_on(fut: F) -> F::Output { + crate::imp::ax_block_on(fut) + } + + #[allow(unused_variables)] + #[cfg(all(feature = "dummy-if-not-enabled", not(feature = "async-thread")))] + pub fn ax_block_on(fut: F) -> F::Output { + unimplemented!(stringify!(ax_block_on)) + } + + #[cfg(feature = "async-preempt")] + pub use crate::imp::AxPrioFuture; + #[cfg(all(feature = "dummy-if-not-enabled", not(feature = "async-thread")))] + pub struct AxPrioFuture; +} + /// Re-exports of ArceOS modules. /// /// You should prefer to use other APIs rather than these modules. The modules @@ -400,6 +456,12 @@ pub mod modules { pub use axdma; #[cfg(any(feature = "fs", feature = "net", feature = "display"))] pub use axdriver; + #[cfg(any( + feature = "async-thread", + feature = "async-single", + feature = "async-preempt" + ))] + pub use axembassy; #[cfg(feature = "fs")] pub use axfs; #[cfg(feature = "paging")] diff --git a/api/arceos_api/src/macros.rs b/api/arceos_api/src/macros.rs index ae96825967..a9451e717e 100644 --- a/api/arceos_api/src/macros.rs +++ b/api/arceos_api/src/macros.rs @@ -1,20 +1,19 @@ #![allow(unused_macros)] macro_rules! define_api_type { - ($( $(#[$attr:meta])* $vis:vis type $name:ident; )+) => { + ($( $(#[$attr:meta])* $vis:vis type $name:ident $(<$($generic:tt)*>)? ; )+) => { $( $vis use $crate::imp::$name; )+ }; - ( @cfg $feature:literal; $( $(#[$attr:meta])* $vis:vis type $name:ident; )+ ) => { + ( @cfg $feature:literal; $( $(#[$attr:meta])* $vis:vis type $name:ident $(<$($generic:tt)*>)? ; )+ ) => { $( #[cfg(feature = $feature)] $(#[$attr])* $vis use $crate::imp::$name; - #[cfg(all(feature = "dummy-if-not-enabled", not(feature = $feature)))] $(#[$attr])* - $vis struct $name; + $vis struct $name $(<$($generic)*>)? $(where $($generic)*: {})?; )+ }; } @@ -108,3 +107,24 @@ macro_rules! cfg_display { macro_rules! cfg_task { ($($item:item)*) => { _cfg_common!{ "multitask" $($item)* } } } + +macro_rules! cfg_async_preempt { + ($($item:item)*) => { _cfg_common!{ "async-preempt" $($item)* } } +} + +macro_rules! cfg_async_thread { + ($($item:item)*) => { _cfg_common!{ "async-thread" $($item)* } } +} + +macro_rules! cfg_async_single { + ($($item:item)*) => { _cfg_common!{ "async-single" $($item)* } } +} + +macro_rules! cfg_async { + ($($item:item)*) => { + $( + #[cfg(any(feature = "async-thread", feature = "async-single"))] + $item + )* + } +} diff --git a/api/axfeat/Cargo.toml b/api/axfeat/Cargo.toml index dedc478203..358222d340 100644 --- a/api/axfeat/Cargo.toml +++ b/api/axfeat/Cargo.toml @@ -30,8 +30,8 @@ alloc = ["axalloc", "axruntime/alloc"] alloc-tlsf = ["axalloc/tlsf"] alloc-slab = ["axalloc/slab"] alloc-buddy = ["axalloc/buddy"] -page-alloc-64g = ["axalloc/page-alloc-64g"] # up to 64G memory capacity -page-alloc-4g = ["axalloc/page-alloc-4g"] # up to 4G memory capacity +page-alloc-64g = ["axalloc/page-alloc-64g"] # up to 64G memory capacity +page-alloc-4g = ["axalloc/page-alloc-4g"] # up to 4G memory capacity paging = ["alloc", "axhal/paging", "axruntime/paging"] tls = ["alloc", "axhal/tls", "axruntime/tls", "axtask?/tls"] dma = ["alloc", "paging"] @@ -42,15 +42,46 @@ sched-fifo = ["axtask/sched-fifo"] sched-rr = ["axtask/sched-rr", "irq"] sched-cfs = ["axtask/sched-cfs", "irq"] +# Embassy asynchronous runtime with multithread support and preemption +async-preempt = ["async-thread", "axembassy/executor-preempt"] +# Embassy asynchronous runtime with multithread support +async-thread = [ + "axembassy/driver", + "axembassy/executor-thread", + "axruntime/embassy-timer", + "multitask", + "irq", +] +# Embassy asynchronous runtime depended on irq with single thread support +async-single = [ + "axembassy/driver", + "axembassy/executor-single", + "axruntime/embassy-timer", + "alloc", + "irq", +] + # File system -fs = ["alloc", "paging", "axdriver/virtio-blk", "dep:axfs", "axruntime/fs"] # TODO: try to remove "paging" +fs = [ + "alloc", + "paging", + "axdriver/virtio-blk", + "dep:axfs", + "axruntime/fs", +] # TODO: try to remove "paging" myfs = ["axfs?/myfs"] # Networking net = ["alloc", "paging", "axdriver/virtio-net", "dep:axnet", "axruntime/net"] # Display -display = ["alloc", "paging", "axdriver/virtio-gpu", "dep:axdisplay", "axruntime/display"] +display = [ + "alloc", + "paging", + "axdriver/virtio-gpu", + "dep:axdisplay", + "axruntime/display", +] # Real Time Clock (RTC) Driver. rtc = ["axhal/rtc", "axruntime/rtc"] @@ -60,7 +91,7 @@ bus-mmio = ["axdriver?/bus-mmio"] bus-pci = ["axdriver?/bus-pci"] driver-ramdisk = ["axdriver?/ramdisk", "axfs?/use-ramdisk"] driver-ixgbe = ["axdriver?/ixgbe"] -driver-fxmac = ["axdriver?/fxmac"] # fxmac ethernet driver for PhytiumPi +driver-fxmac = ["axdriver?/fxmac"] # fxmac ethernet driver for PhytiumPi driver-bcm2835-sdhci = ["axdriver?/bcm2835-sdhci"] # Logging @@ -82,4 +113,5 @@ axnet = { workspace = true, optional = true } axdisplay = { workspace = true, optional = true } axsync = { workspace = true, optional = true } axtask = { workspace = true, optional = true } +axembassy = { workspace = true, optional = true } kspin = { version = "0.1", optional = true } diff --git a/api/axfeat/src/lib.rs b/api/axfeat/src/lib.rs index c2f8d16c06..137ca01053 100644 --- a/api/axfeat/src/lib.rs +++ b/api/axfeat/src/lib.rs @@ -34,6 +34,9 @@ //! - `log-level-off`: Disable all logging. //! - `log-level-error`, `log-level-warn`, `log-level-info`, `log-level-debug`, //! `log-level-trace`: Keep logging only at the specified level or higher. +//! - Embassy runtime +//! - `async-thread`: Enable Embassy asynchronous runtime with multithread support. +//! - `async-single`: Enable Embassy asynchronous runtime depenended on irq with single thread. //! //! [ArceOS]: https://github.com/arceos-org/arceos diff --git a/examples/embassy-preempt/.axconfig.old.toml b/examples/embassy-preempt/.axconfig.old.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/examples/embassy-preempt/.axconfig.old.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/examples/embassy-preempt/.axconfig.toml b/examples/embassy-preempt/.axconfig.toml new file mode 100644 index 0000000000..8e1487100f --- /dev/null +++ b/examples/embassy-preempt/.axconfig.toml @@ -0,0 +1,85 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform package. +package = "axplat-riscv64-qemu-virt" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO ranges with format (`base_paddr`, `size`). +mmio-ranges = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# Timer interrupt num. +timer-irq = "0x8000_0000_0000_0005" # uint +# VirtIO MMIO ranges with format (`base_paddr`, `size`). +virtio-mmio-ranges = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Stack size on bootstrapping. (256K) +boot-stack-size = 0x40000 # uint +# Number of CPUs. +cpu-num = 1 # uint +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/examples/embassy-preempt/Cargo.toml b/examples/embassy-preempt/Cargo.toml new file mode 100644 index 0000000000..1e84ecffa4 --- /dev/null +++ b/examples/embassy-preempt/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "embassy-preempt" +authors = ["nostalgia "] +edition.workspace = true + +[features] +default = [] + +async-test = [] +thread-test = [] + +atomic-sum = [] +iter-delay = [] + +[dependencies] +axstd = { workspace = true, features = [ + "multitask", + "sched-cfs", + "alloc", +], optional = true } +axasync = { workspace = true, features = ["preempt", "time"] } +embassy-executor = { git = "https://github.com/embassy-rs/embassy", branch = "main", default-features = false } +log = "=0.4.21" diff --git a/examples/embassy-preempt/src/main.rs b/examples/embassy-preempt/src/main.rs new file mode 100644 index 0000000000..dacb0ae8ae --- /dev/null +++ b/examples/embassy-preempt/src/main.rs @@ -0,0 +1,352 @@ +//! The embassy preemptive executor. +//! +//! A comparation between embassy and native threads. +//! +#![feature(impl_trait_in_assoc_type)] +#![feature(type_alias_impl_trait)] +#![cfg_attr(feature = "axstd", no_std)] +#![cfg_attr(feature = "axstd", no_main)] + +#[macro_use] +#[cfg(feature = "axstd")] +extern crate axstd as std; + +extern crate alloc; + +use alloc::sync::Arc; +use axasync::executor::{PrioFuture, spawner, yield_now}; +use axasync::time::Timer; +use core::hint::black_box; +use core::sync::atomic::AtomicUsize; +use std::thread::{self, sleep}; +use std::time::Duration; + +macro_rules! work_loop { + ( + task_type: $task_type:literal, + id: $id:expr, + expected: $millis:expr, + volume: $volume:expr, + busy: {$($busy_tt:tt)*}, + sleep: {$($sleep_tt:tt)*}, + $(output: {$($output_tt:tt)*},)? + ) => { + use std::time::{Instant,Duration}; + use log; + use core::hint::black_box; + + let mut cnt = 0; + let mut last_report = Instant::now(); + let expected = Duration::from_millis($millis); + loop { + let iter_start = Instant::now(); + cnt += 1; + + $($busy_tt)* + $($sleep_tt)* + $($($output_tt)*)? + + let iter_end = Instant::now(); + let iter_dur = iter_end - iter_start; + let full_dur = iter_end - last_report; + + log::info!( + "{} {}: volume {}, works {}, times {}/s, iters {}, expected {}/ns, actual {}/ns, full {}/ns", + $task_type, + $id, + $volume, + NUM_ITERS, + TEST_SECS, + cnt, + expected.as_nanos(), + iter_dur.as_nanos(), + full_dur.as_nanos(), + ); + + last_report = iter_end; + } + }; +} + +macro_rules! prio_task { + ( + fn_name: $fn_name:ident, + task_type: $task_type:literal, + prios: $prios:expr, + pool_size: $pool_size:expr, + atomic, + ) => { + #[axasync::executor::task(pool_size = $pool_size)] + async fn $fn_name(id: u64, millis: u64, busy_iters: u64, iters: Arc) { + work_loop! { + task_type: $task_type, + id: id, + expected: millis, + volume: $pool_size, + busy: { + if busy_iters > 0 { + let _res = black_box(PrioFuture::new(async_busy_work(busy_iters), $prios).await); + } + }, + sleep: { + PrioFuture::new(Timer::after_millis(millis),$prios).await; + }, + output: { + iters.fetch_add(1,core::sync::atomic::Ordering::SeqCst); + }, + } + } + }; + ( + fn_name: $fn_name:ident, + task_type: $task_type:literal, + prios: $prios:expr, + pool_size: $pool_size:expr, + ) => ( + #[axasync::executor::task(pool_size = $pool_size)] + async fn $fn_name(id: u64, millis: u64, busy_iters: u64) { + work_loop! { + task_type: $task_type, + id: id, + expected: millis, + volume: $pool_size, + busy: { + if busy_iters > 0 { + let _res = black_box(PrioFuture::new(async_busy_work(busy_iters), $prios).await); + } + }, + sleep: { + PrioFuture::new(Timer::after_millis(millis),$prios).await; + }, + } + } + ) +} + +const HIGH_PRIOS: u8 = 1; +const LOW_PRIOS: u8 = 3; + +#[cfg(feature = "async-test")] +prio_task! { + fn_name: prio_tick_high, + task_type: "ASYNC_TASK_REPORT_HIGH", + prios: HIGH_PRIOS, + pool_size: NUM_HIGH_TASKS as usize, +} + +#[cfg(feature = "async-test")] +prio_task! { + fn_name: prio_tick_low, + task_type: "ASYNC_TASK_REPORT_LOW", + prios: LOW_PRIOS, + pool_size: NUM_LOW_TASKS as usize, +} + +#[cfg(feature = "async-test")] +prio_task! { + fn_name: prio_add_high, + task_type: "ASYNC_TASK_REPORT_HIGH", + prios: HIGH_PRIOS, + pool_size: NUM_HIGH_TASKS as usize, + atomic, +} + +#[cfg(feature = "async-test")] +prio_task! { + fn_name: prio_add_low, + task_type: "ASYNC_TASK_REPORT_LOW", + prios: LOW_PRIOS, + pool_size: NUM_LOW_TASKS as usize, + atomic, +} + +#[cfg(feature = "thread-test")] +fn thread_tick(id: u64, millis: u64, busy_iters: u64) { + work_loop! { + task_type: "NATIVE_THREAD_REPORT", + id:id, + expected: millis, + volume: NUM_THREADS, + busy: { + if busy_iters > 0 { + let _res = black_box(busy_work(busy_iters)); + } + }, + sleep: { + sleep(Duration::from_millis(millis)); + }, + output: {}, + } +} + +#[cfg(feature = "thread-test")] +fn thread_add(id: u64, millis: u64, busy_iters: u64, iters: Arc) { + work_loop! { + task_type: "NATIVE_THREAD_ATOMIC_REPORT", + id:id, + expected: millis, + volume: NUM_THREADS, + busy: { + if busy_iters > 0 { + let _res = black_box(busy_work(busy_iters)); + } + }, + sleep: { + sleep(Duration::from_millis(millis)); + }, + output: { + iters.fetch_add(1, core::sync::atomic::Ordering::SeqCst); + }, + } +} + +fn busy_work(iters: u64) -> u64 { + let mut total = 0; + for _ in 0..iters { + total = black_box(total + 1); + thread::yield_now(); + } + black_box(total) +} + +async fn async_busy_work(iters: u64) -> u64 { + let mut total = 0; + for _ in 0..iters { + total = black_box(total + 1); + yield_now().await; + } + black_box(total) +} + +// const NUM_THREADS: u64 = 30; +const NUM_THREADS: u64 = 50; +// const NUM_THREADS: u64 = 100; +// const NUM_THREADS: u64 = 125; +// const NUM_THREADS: u64 = 150; +// const NUM_THREADS: u64 = 175; +// +const NUM_HIGH_TASKS: u64 = 15; +// const NUM_HIGH_TASKS: u64 = 25; +// const NUM_HIGH_TASKS: u64 = 30; +// const NUM_HIGH_TASKS: u64 = 50; +// const NUM_HIGH_TASKS: u64 = 75; +// const NUM_HIGH_TASKS: u64 = 100; +// const NUM_HIGH_TASKS: u64 = 200; +// +const NUM_LOW_TASKS: u64 = 15; +// const NUM_LOW_TASKS: u64 = 25; +// const NUM_LOW_TASKS: u64 = 30; +// const NUM_LOW_TASKS: u64 = 50; +// const NUM_LOW_TASKS: u64 = 75; +// const NUM_LOW_TASKS: u64 = 100; +// const NUM_LOW_TASKS: u64 = 200; +// + +// const NUM_ITERS: u64 = 100; +// The num iters for volume delay +// const NUM_ITERS: u64 = 1000; +// const NUM_ITERS: u64 = 1_000_0; +// const NUM_ITERS: u64 = 1_000_00; +const NUM_ITERS: u64 = 10; +// const NUM_ITERS: u64 = 1_000_000_0; +// const NUM_ITERS: u64 = 100; +const TEST_SECS: u64 = 15; + +#[cfg_attr(feature = "axstd", unsafe(no_mangle))] +fn main() { + log::info!("Starting Test"); + let th_iters = Arc::new(AtomicUsize::new(0)); + let high_prio_iters = Arc::new(AtomicUsize::new(0)); + let low_prio_iters = Arc::new(AtomicUsize::new(0)); + + #[cfg(feature = "thread-test")] + for i in 1..NUM_THREADS { + #[cfg(feature = "atomic-sum")] + let th_iters = th_iters.clone(); + + thread::spawn(move || { + #[cfg(feature = "atomic-sum")] + { + thread_add(i, i % 20 + 1, NUM_ITERS, th_iters); + } + #[cfg(feature = "iter-delay")] + { + thread_tick(i, (i % 20 + 1) * 1000, NUM_ITERS); + } + }); + } + + #[cfg(feature = "async-test")] + for i in 1..NUM_LOW_TASKS { + if i <= NUM_HIGH_TASKS { + #[cfg(feature = "atomic-sum")] + { + let high_prio_iters = high_prio_iters.clone(); + spawner().must_spawn(prio_add_high(i, i % 20 + 1, NUM_ITERS, high_prio_iters)); + } + #[cfg(feature = "iter-delay")] + { + spawner() + .spawn(prio_tick_high(i, (i % 20 + 1) * 1000, NUM_ITERS)) + .unwrap(); + } + } + #[cfg(feature = "atomic-sum")] + { + let low_prio_iters = low_prio_iters.clone(); + spawner().must_spawn(prio_add_low( + i + NUM_HIGH_TASKS, + i % 20 + 1, + NUM_ITERS, + low_prio_iters, + )); + } + #[cfg(feature = "iter-delay")] + { + spawner() + .spawn(prio_tick_low( + i + NUM_HIGH_TASKS, + (i % 20 + 1) * 1000, + NUM_ITERS, + )) + .unwrap(); + } + } + // Avoid shut down immediately + sleep(Duration::from_secs(TEST_SECS)); + + #[cfg(feature = "atomic-sum")] + { + let th_out = th_iters.load(core::sync::atomic::Ordering::Relaxed); + let high_out = high_prio_iters.load(core::sync::atomic::Ordering::Relaxed); + let low_out = low_prio_iters.load(core::sync::atomic::Ordering::Relaxed); + #[cfg(feature = "thread-test")] + log::info!( + "NATIVE_THREAD_REPORT: volume: {}, time: {}/s, works: {}, sum: {}, sum/s: {}", + NUM_THREADS, + TEST_SECS, + NUM_ITERS, + th_out, + th_out as u64 / TEST_SECS + ); + #[cfg(feature = "async-test")] + { + log::info!( + "ASYNC_TASK_REPORT_HIGH: volume: {}, time: {}/s, works: {}, sum: {}, sum/s: {}", + NUM_HIGH_TASKS, + TEST_SECS, + NUM_ITERS, + high_out, + high_out as u64 / TEST_SECS + ); + log::info!( + "ASYNC_TASK_REPORT_LOW: volume: {}, time: {}/s, works: {}, sum: {}, sum/s: {}", + NUM_LOW_TASKS, + TEST_SECS, + NUM_ITERS, + low_out, + low_out as u64 / TEST_SECS + ); + } + } +} diff --git a/examples/embassy-single/.axconfig.toml b/examples/embassy-single/.axconfig.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/examples/embassy-single/.axconfig.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/examples/embassy-single/Cargo.toml b/examples/embassy-single/Cargo.toml new file mode 100644 index 0000000000..a1fb9362bf --- /dev/null +++ b/examples/embassy-single/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "embassy-single" +authors = ["nostalgia "] +edition.workspace = true + +[dependencies] +axstd = { workspace = true, optional = true } +axasync = { workspace = true, features = ["single", "time"] } +embassy-executor = { git = "https://github.com/embassy-rs/embassy", branch = "main", default-features = false } \ No newline at end of file diff --git a/examples/embassy-single/src/main.rs b/examples/embassy-single/src/main.rs new file mode 100644 index 0000000000..6159379239 --- /dev/null +++ b/examples/embassy-single/src/main.rs @@ -0,0 +1,48 @@ +//! The embassy single-thread executor. +#![feature(impl_trait_in_assoc_type)] +#![cfg_attr(feature = "axstd", no_std)] +#![cfg_attr(feature = "axstd", no_main)] + +#[macro_use] +#[cfg(feature = "axstd")] +extern crate axstd as std; + +use axasync::cell::StaticCell; +use axasync::executor::Executor; +use axasync::time::Timer; +use core::hint::black_box; + +fn busy_work(nano: u64) -> u64 { + let mut total = 0; + for _ in 0..nano { + let mut x = 0; + for _ in 0..nano { + x += 1; + } + total = black_box(total + x); + } + total +} + +#[axasync::executor::task(pool_size = 4)] +async fn tick(_sec: u64, busy_nano: u64) { + for i in 0..10 { + println!("embassy tick {}: {}/s, {}", _sec, _sec * i, i); + busy_work(busy_nano); + Timer::after_secs(_sec).await; + } + panic!("tick finished"); +} + +static EXECUTOR: StaticCell = StaticCell::new(); + +#[cfg_attr(feature = "axstd", unsafe(no_mangle))] +fn main() { + println!("Embassy Test"); + let exec = EXECUTOR.init(Executor::new()); + exec.run(|sp| { + for i in 1..4 { + sp.spawn(tick(i, 0)).unwrap(); + } + }) +} diff --git a/examples/helloworld/.axconfig.toml b/examples/helloworld/.axconfig.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/examples/helloworld/.axconfig.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/examples/httpclient/.axconfig.toml b/examples/httpclient/.axconfig.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/examples/httpclient/.axconfig.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/examples/httpserver/.axconfig.toml b/examples/httpserver/.axconfig.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/examples/httpserver/.axconfig.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/modules/axembassy/.axconfig.toml b/modules/axembassy/.axconfig.toml new file mode 100644 index 0000000000..42465c83b4 --- /dev/null +++ b/modules/axembassy/.axconfig.toml @@ -0,0 +1,81 @@ +# Architecture identifier. +arch = "riscv64" # str +# Platform identifier. +platform = "riscv64-qemu-virt" # str +# Number of CPUs +smp = 1 # uint +# Stack size of each task. +task-stack-size = 0x40000 # uint +# Number of timer ticks per second (Hz). A timer tick may contain several timer +# interrupts. +ticks-per-sec = 100 # uint + +# +# Device specifications +# +[devices] +# MMIO regions with format (`base_paddr`, `size`). +mmio-regions = [ + [0x0010_1000, 0x1000], + [0x0c00_0000, 0x21_0000], + [0x1000_0000, 0x1000], + [0x1000_1000, 0x8000], + [0x3000_0000, 0x1000_0000], + [0x4000_0000, 0x4000_0000] +] # [(uint, uint)] +# End PCI bus number (`bus-range` property in device tree). +pci-bus-end = 0xff # uint +# Base physical address of the PCIe ECAM space. +pci-ecam-base = 0x3000_0000 # uint +# PCI device memory ranges (`ranges` property in device tree). +pci-ranges = [ + [0x0300_0000, 0x1_0000], + [0x4000_0000, 0x4000_0000], + [0x4_0000_0000, 0x4_0000_0000] +] # [(uint, uint)] +# rtc@101000 { +# interrupts = <0x0b>; +# interrupt-parent = <0x03>; +# reg = <0x00 0x101000 0x00 0x1000>; +# compatible = "google,goldfish-rtc"; +# }; +# RTC (goldfish) Address +rtc-paddr = 0x10_1000 # uint +# Timer interrupt frequency in Hz. +timer-frequency = 10_000_000 # uint +# VirtIO MMIO regions with format (`base_paddr`, `size`). +virtio-mmio-regions = [ + [0x1000_1000, 0x1000], + [0x1000_2000, 0x1000], + [0x1000_3000, 0x1000], + [0x1000_4000, 0x1000], + [0x1000_5000, 0x1000], + [0x1000_6000, 0x1000], + [0x1000_7000, 0x1000], + [0x1000_8000, 0x1000] +] # [(uint, uint)] + +# +# Platform configs +# +[plat] +# Platform family. +family = "riscv64-qemu-virt" # str +# Kernel address space base. +kernel-aspace-base = "0xffff_ffc0_0000_0000" # uint +# Kernel address space size. +kernel-aspace-size = "0x0000_003f_ffff_f000" # uint +# Base physical address of the kernel image. +kernel-base-paddr = 0x8020_0000 # uint +# Base virtual address of the kernel image. +kernel-base-vaddr = "0xffff_ffc0_8020_0000" # uint +# Offset of bus address and phys address. some boards, the bus address is +# different from the physical address. +phys-bus-offset = 0 # uint +# Base address of the whole physical memory. +phys-memory-base = 0x8000_0000 # uint +# Size of the whole physical memory. (128M) +phys-memory-size = 0x800_0000 # uint +# Linear mapping offset, for quick conversions between physical and virtual +# addresses. +phys-virt-offset = "0xffff_ffc0_0000_0000" # uint diff --git a/modules/axembassy/Cargo.toml b/modules/axembassy/Cargo.toml new file mode 100644 index 0000000000..1595baa973 --- /dev/null +++ b/modules/axembassy/Cargo.toml @@ -0,0 +1,50 @@ +[package] +name = "axembassy" +version.workspace = true +edition.workspace = true +authors = ["nostalgia "] +license.workspace = true +homepage.workspace = true + +[features] +driver = [ + "dep:embassy-time-driver", + "dep:embassy-time-queue-utils", + "dep:percpu", + "axhal/irq", +] +embassy-utils = [ + "dep:embassy-executor", + "dep:embassy-futures", + "dep:embassy-sync", +] + +# executor is in thread interaction +executor-preempt = ["executor-thread"] +executor-thread = ["embassy-utils", "axtask/multitask", "dep:axsync"] + +executor-single = ["embassy-utils", "axhal/irq"] + +default = ["driver"] + +[dependencies] +axconfig = { workspace = true } +axhal = { workspace = true, features = ["irq"] } +axsync = { workspace = true, features = ["multitask"], optional = true } +axtask = { workspace = true } + +embassy-time-driver = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true, features = [ + # annotate explicitly + # wait to be improved + "tick-hz-100", +] } +embassy-time-queue-utils = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true } +embassy-executor = { git = "https://github.com/embassy-rs/embassy", branch = "main", default-features = false, optional = true } +embassy-futures = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true } +embassy-sync = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true } + +percpu = { version = "0.2", optional = true } +cfg-if = "1.0" +log = "=0.4.21" +static_cell = "2.1.0" +kspin = "0.1" diff --git a/modules/axembassy/src/asynch.rs b/modules/axembassy/src/asynch.rs new file mode 100644 index 0000000000..4351a6f387 --- /dev/null +++ b/modules/axembassy/src/asynch.rs @@ -0,0 +1,87 @@ +use core::{ + cell::OnceCell, + pin::Pin, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, +}; + +use axsync::Mutex; +use axtask::yield_now; + +use embassy_executor::{SendSpawner, Spawner}; + +/// Global spawner for multi-thread executor +pub(crate) static SPAWNER: Mutex> = Mutex::new(OnceCell::new()); + +fn init_spawn() { + use axtask::spawn_raw; + spawn_raw(init, "async".into(), axconfig::TASK_STACK_SIZE); +} + +fn init() { + use crate::executor::Executor; + use static_cell::StaticCell; + + static EXECUTOR: StaticCell = StaticCell::new(); + EXECUTOR + .init_with(Executor::new) + .run(|sp| sp.must_spawn(init_task())); +} + +#[embassy_executor::task] +async fn init_task() { + use crate::asynch; + + let spawner = asynch::Spawner::for_current_executor().await; + asynch::set_spawner(spawner.make_send()); + log::info!("Initialize spawner... "); +} + +/// # Panics +/// +/// Panics if the system executor is not initialized. +pub fn spawner() -> SendSpawner { + let sp = SPAWNER.lock(); + if let Some(inner) = sp.get() { + *inner + } else { + drop(sp); + init_spawn(); + yield_now(); + // initialize the spawner if not + let sp = SPAWNER.lock(); + *sp.get().expect("Reinitialize the spawner failed") + } +} + +/// Set the spawner for the system executor. +/// +/// May only be called once. +pub(crate) fn set_spawner(spawner: SendSpawner) { + let sp = SPAWNER.lock(); + let _ = sp.set(spawner); +} + +fn wake(_ctx: *const ()) { + // let id = ctx as u64; + // unpark_task(id, true); +} + +static VTABLE: RawWakerVTable = + RawWakerVTable::new(|ctx| RawWaker::new(ctx, &VTABLE), wake, wake, wake); + +/// Panics if not called in a thread task +pub fn block_on(mut fut: F) -> F::Output { + let mut fut = unsafe { Pin::new_unchecked(&mut fut) }; + + let id = axtask::current().id().as_u64(); + let raw_waker = RawWaker::new(id as *const (), &VTABLE); + let waker = unsafe { Waker::from_raw(raw_waker) }; + let mut ctx = Context::from_waker(&waker); + + loop { + if let Poll::Ready(res) = fut.as_mut().poll(&mut ctx) { + return res; + } + yield_now(); + } +} diff --git a/modules/axembassy/src/delegate.rs b/modules/axembassy/src/delegate.rs new file mode 100644 index 0000000000..5dfc7367d6 --- /dev/null +++ b/modules/axembassy/src/delegate.rs @@ -0,0 +1,259 @@ +use core::{ + marker::PhantomData, + ptr, + sync::atomic::{AtomicU8, Ordering}, +}; + +use embassy_executor::Spawner; +use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, signal::Signal}; + +/// A cell that can only be used on the executor that created it +pub struct SameExecutorCell { + /// The executor id + id: usize, + inner: T, +} + +impl SameExecutorCell { + /// Creates a new `SameExecutorCell` with the provided `inner` value and associates it + /// with the executor identified by the given `spawner`. + /// + /// # Arguments + /// + /// * `inner` - The value to be stored inside the cell. + /// * `spawner` - The spawner used to identify the executor, from which the cell can be accessed. + pub fn new(inner: T, spawner: Spawner) -> Self { + Self { + id: spawner.executor_id(), + inner, + } + } + + /// Creates a new `SameExecutorCell` with the provided `inner` value and associates it with the current executor. + pub async fn new_async(inner: T) -> Self { + let spawner = Spawner::for_current_executor().await; + SameExecutorCell::new(inner, spawner) + } + + /// Returns a reference to the inner value if the `spawner` matches the executor id associated with `self`. + pub fn get(&self, spawner: Spawner) -> Option<&T> { + if self.id == spawner.executor_id() { + Some(&self.inner) + } else { + None + } + } + + /// Returns a mutable reference to the inner value if the `spawner` matches the executor id + /// associated with `self`. + pub fn get_mut(&mut self, spawner: Spawner) -> Option<&mut T> { + if self.id == spawner.executor_id() { + Some(&mut self.inner) + } else { + None + } + } + + /// Returns a reference to the inner value by the spawner of current async closure + pub async fn get_async(&self) -> Option<&T> { + let spawner = Spawner::for_current_executor().await; + self.get(spawner) + } + + /// Returns a mutable reference to the inner value by the spawner of current async closure + pub async fn get_mut_async(&mut self) -> Option<&mut T> { + let spawner = Spawner::for_current_executor().await; + self.get_mut(spawner) + } + + /// Consumes the `SameExecutorCell`, returning the inner value if the provided `spawner` + /// matches the executor id associated with `self`. + /// + /// else returns `self` as recovery + pub fn into_inner(self, spawner: Spawner) -> Result { + if spawner.executor_id() == self.id { + Ok(self.inner) + } else { + Err(self) + } + } + + /// Consumes the `SameExecutorCell`, returning the inner value by the spawner of current async closure + /// + /// else returns `self` as recovery + pub async fn into_inner_async(self) -> Result { + let spawner = Spawner::for_current_executor().await; + self.into_inner(spawner) + } + + /// Returns the executor id + pub fn executor_id(&self) -> usize { + self.id + } +} + +impl Clone for SameExecutorCell { + fn clone(&self) -> Self { + Self { + id: self.id, + inner: self.inner.clone(), + } + } +} + +impl core::fmt::Debug for SameExecutorCell { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("SameExecutorCell") + .field("executor_id", &self.id) + .field("inner", &self.inner) // Only if T: Debug + .finish() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DelegateError { + LendInvalid, + WithInvalid, + ConsumedInvalid, +} + +#[repr(u8)] +pub enum DelegateState { + New = 0, + Lent = 1, + Consumed = 2, +} + +impl From for DelegateState { + fn from(value: u8) -> Self { + match value { + 0 => Self::New, + 1 => Self::Lent, + 2 => Self::Consumed, + _ => unreachable!(), + } + } +} + +type MutexSignal = Signal; + +/// A delegate that can only be used on the executor that created it +pub struct Delegate { + send: MutexSignal>, + reply: MutexSignal<()>, + state: AtomicU8, + _not_send: PhantomData<*const ()>, +} + +unsafe impl Sync for Delegate {} + +impl Delegate { + #[must_use] + /// Creates a new `Delegate` + /// + /// **NOT** thread safe and only executor safe + pub const fn new() -> Self { + Self { + send: Signal::new(), + reply: Signal::new(), + state: AtomicU8::new(DelegateState::New as u8), + _not_send: PhantomData, + } + } + + /// lend target `T` to other task in the same executor + /// with lifetime longer than delegate itself + pub async fn lend<'a, 'b: 'a>(&'a self, target: &'b mut T) -> Result<(), DelegateError> { + use DelegateError::*; + use DelegateState::*; + + match self.state.compare_exchange( + New as u8, + Lent as u8, + core::sync::atomic::Ordering::AcqRel, + core::sync::atomic::Ordering::Acquire, + ) { + Ok(_) => {} + Err(_) => return Err(LendInvalid), + } + let sp = Spawner::for_current_executor().await; + let ptr = ptr::from_mut(target); + self.send.signal(SameExecutorCell::new(ptr, sp)); + + self.reply.wait().await; + let final_state = self.state.load(Ordering::Acquire); + if final_state != Consumed as u8 { + return Err(ConsumedInvalid); + } + Ok(()) + } + + /// lend target `T` to other task in the same executor + /// with lifetime longer than delegate itself + /// + /// reset state afterwards for reuse + pub async fn lend_new<'a, 'b: 'a>(&'a self, target: &'b mut T) -> Result<(), DelegateError> { + match self.lend(target).await { + Ok(()) => { + self.reset(); + Ok(()) + } + Err(e) => Err(e), + } + } + + /// Returns a mutable reference to the inner value lent + /// by other task in the same executor + pub async fn with(&self, func: impl FnOnce(&mut T) -> U) -> Result { + use DelegateError::*; + use DelegateState::*; + + let data = self.send.wait().await; + let sp = Spawner::for_current_executor().await; + let res = { + let ptr = unsafe { data.get(sp).unwrap().as_mut().unwrap() }; + func(ptr) + }; + + match self.state.compare_exchange( + Lent as u8, + Consumed as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => {} + Err(_) => return Err(WithInvalid), + } + + self.reply.signal(()); + Ok(res) + } + + /// Reset the delegate to reuse + pub fn reset(&self) { + use DelegateState::*; + + let cur_state = self.state.load(core::sync::atomic::Ordering::Acquire); + if cur_state == Lent as u8 { + panic!( + "Cannot reset Delegate while in LENT state: lend() called but with() has not completed." + ); + } + + // Case: + // 1. New: Refresh nothing. + // 2. Consumed: Refresh Send, Reset is `()`, refresh nothing. + if cur_state == New as u8 { + return; + } + + self.send.reset(); + self.state.store(New as u8, Ordering::Release); + } +} + +impl Default for Delegate { + fn default() -> Self { + Self::new() + } +} diff --git a/modules/axembassy/src/executor.rs b/modules/axembassy/src/executor.rs new file mode 100644 index 0000000000..2a3dd41af1 --- /dev/null +++ b/modules/axembassy/src/executor.rs @@ -0,0 +1,75 @@ +use core::marker::PhantomData; +use core::sync::atomic::{AtomicBool, Ordering}; +use embassy_executor::raw; + +#[cfg(feature = "executor-single")] +static SIGNAL_SINGLE: AtomicBool = AtomicBool::new(false); + +#[cfg(feature = "executor-thread")] +#[percpu::def_percpu] +static SINGAL_THREAD: AtomicBool = AtomicBool::new(false); + +#[unsafe(export_name = "__pender")] +fn __pender(_context: *mut ()) { + #[cfg(feature = "executor-single")] + SIGNAL_SINGLE.store(true, Ordering::SeqCst); + + #[cfg(feature = "executor-thread")] + SINGAL_THREAD.with_current(|m| { + m.store(true, Ordering::SeqCst); + }); +} + +/// An executor based on the [embassy_executor](https://docs.rs/embassy-executor/latest/embassy_executor/) crate +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + /// Create a new executor and initialize context with current task id + pub fn new() -> Self { + Self { + inner: raw::Executor::new(core::ptr::null_mut()), + not_send: PhantomData, + } + } + + /// Runs the executor. + /// + /// The `init` closure is called with a [`embassy_executor::Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + pub fn run(&'static mut self, init: impl FnOnce(embassy_executor::Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + + #[cfg(feature = "executor-single")] + { + if SIGNAL_SINGLE.load(Ordering::SeqCst) { + SIGNAL_SINGLE.store(false, Ordering::SeqCst); + } else { + axhal::asm::wait_for_irqs(); + } + } + + #[cfg(feature = "executor-thread")] + { + let polled = SINGAL_THREAD.with_current(|m| m.load(Ordering::Acquire)); + if polled { + SINGAL_THREAD.with_current(|m| { + m.store(false, Ordering::Release); + }); + } else { + // park_current_task(); + axtask::yield_now(); + } + } + }; + } + } +} diff --git a/modules/axembassy/src/lib.rs b/modules/axembassy/src/lib.rs new file mode 100644 index 0000000000..13acf4dae8 --- /dev/null +++ b/modules/axembassy/src/lib.rs @@ -0,0 +1,69 @@ +//! [ArceOS](https://github.com/arceos-org/arceos) embassy integration +//! +//! This module provides embassy asynchronous runtime integration, including +//! time driver, and executor with single-thread, multi-thread, preemptive(partially) +//! which are configurable by cargo features. +//! +//! # Cargo Features +//! +//! - `driver`: Enable time driver support. If it's enabled, time driver is used. +//! Usually used by `axruntime` module in `irq` initiation. +//! - `executor-single`: Use the [single-thread executor][1]. It also enables the +//! related utils modules. +//! - `executor-thread`: Use the [multi-thread executor][2]. It also enables the +//! related utils modules and enables the `multitask` feature if it is enabled. +//! - `executor-preempt`: Use the [preemptive executor][3]. It also enables the +//! related utils modules and enables the `executor-thread` feature if it is +//! enabled. +//! +//! [1]: crate::executor::Executor +//! [2]: crate::asynch::spawner +//! [3]: crate::preempt::PrioFuture + +#![cfg_attr(not(test), no_std)] +#![feature(doc_cfg)] +#![feature(doc_auto_cfg)] + +cfg_if::cfg_if! { + if #[cfg(any(feature = "executor-thread", feature = "executor-single"))] { + extern crate alloc; + extern crate log; + + mod delegate; + #[cfg(feature= "executor-thread")] + mod asynch; + #[cfg(feature= "executor-preempt")] + mod preempt; + + mod executor; + mod executor_exports { + pub use crate::executor::Executor; + pub use crate::delegate::{Delegate, SameExecutorCell}; + pub use embassy_executor::Spawner; + + #[cfg(feature = "executor-thread")] + pub use crate::asynch::{spawner,block_on}; + #[cfg(feature = "executor-thread")] + pub use embassy_executor::SendSpawner; + + #[cfg(feature = "executor-preempt")] + pub use crate::preempt::PrioFuture; + } + + pub use executor_exports::*; + } +} + +#[cfg(feature = "driver")] +mod time_driver; + +#[cfg(feature = "driver")] +pub use crate::time_driver::AxDriverAPI; + +// #[cfg(all( +// any(feature = "executor-thread", feature = "executor-preempt"), +// feature = "executor-single" +// ))] +// compile_error!( +// "feature `executor-thread`/`executor-preempt` and `executor-single` are mutually exclusive" +// ); diff --git a/modules/axembassy/src/preempt.rs b/modules/axembassy/src/preempt.rs new file mode 100644 index 0000000000..e64b0b4711 --- /dev/null +++ b/modules/axembassy/src/preempt.rs @@ -0,0 +1,303 @@ +extern crate alloc; + +use alloc::collections::BTreeMap; +use axsync::Mutex; +use core::{ + pin::Pin, + sync::atomic::{AtomicU64, Ordering}, + task::{Context, Poll}, +}; + +/// Represents a user-defined priority, ranging from 0 to 255. +type UserPrio = u8; +const MIN_USER_PRIO: UserPrio = u8::MAX; +const MAX_USER_PRIO: UserPrio = u8::MIN; + +/// A fixed-point representation of priority to handle fractional values. +/// This type ensures consistent scaling and avoids magic numbers in calculations. +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct Prio(u64); + +impl From for Prio { + fn from(value: UserPrio) -> Self { + // Scale the user-priority to the internal fixed-point representation. + Self((value as u64).saturating_mul(Prio::SCALING_FACTOR)) + } +} + +impl From for u64 { + fn from(prio: Prio) -> u64 { + prio.0 + } +} + +impl Prio { + const MAX_PRIO: Prio = Prio::raw_new(MAX_USER_PRIO as u64 * Prio::SCALING_FACTOR); + const MIN_PRIO: Prio = Prio::raw_new(MIN_USER_PRIO as u64 * Prio::SCALING_FACTOR); + /// The scaling factor for converting `UserPrio` to `Prio`'s internal `u64` representation. + const SCALING_FACTOR: u64 = 100; + + /// Weights for factors in priority adjustment. + const PRIO_EFFECT: u64 = 70; + const ACTIVE_EFFECT: u64 = 30; + + /// Clamping range for normalized factors. + const CLAMP_MIN: u64 = 10; + const CLAMP_MAX: u64 = 100; + + /// Tolerance range for priority check, as a percentage of the priority. + const TOL: u64 = 10; + + const fn raw_new(prio: u64) -> Self { + Self(prio) + } + + /// Returns the raw `u64` value of the priority. + pub const fn as_u64(&self) -> u64 { + self.0 + } + + /// Converts Prio to `UserPrio` (u8). + /// + /// Returns `None` if the priority overflows `UserPrio`. + #[allow(dead_code)] + pub const fn as_user_prio(&self) -> Option { + let prio = self.0.div_euclid(Prio::SCALING_FACTOR); + if prio > MIN_USER_PRIO as u64 { + None + } else { + Some(prio as u8) + } + } + + /// Normalizes a current value within a given bound to a range [CLAMP_MIN_FACTOR, CLAMP_MAX_FACTOR]. + /// + /// # Returns + /// `(norm_pos, norm_neg)` where: + /// - `norm_pos` indicates closeness to the lower bound (higher for values closer to low). + /// - `norm_neg` indicates closeness to the upper bound (higher for values closer to high). + pub fn norm_factor, G: Into>(bound: (F, F), cur: G) -> (u64, u64) { + let (mut low, mut high) = (bound.0.into(), bound.1.into()); + if low > high { + core::mem::swap(&mut low, &mut high); + } + let cur = cur.into(); + + if high == low { + // Avoid division by zero if bounds are the same + return (Prio::CLAMP_MAX, Prio::CLAMP_MAX); + } + + let range = high - low; + let norm_pos = ((cur - low).saturating_mul(Prio::CLAMP_MAX) / range) + .clamp(Prio::CLAMP_MIN, Prio::CLAMP_MAX); + let norm_neg = ((high - cur).saturating_mul(Prio::CLAMP_MAX) / range) + .clamp(Prio::CLAMP_MIN, Prio::CLAMP_MAX); + (norm_pos, norm_neg) + } + + /// Applies a weight to a normalized factor. + pub fn weight, G: Into>(factor: F, weight: G) -> u64 { + factor + .into() + .saturating_mul(weight.into()) + .div_euclid(Prio::CLAMP_MAX) + } +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct FutureId(u64); + +impl FutureId { + pub fn new() -> Self { + static ID_COUNTER: AtomicU64 = AtomicU64::new(1); + Self(ID_COUNTER.fetch_add(1, Ordering::Relaxed)) + } + /// Convert the task ID to a `u64`. + #[allow(dead_code)] + pub const fn as_u64(&self) -> u64 { + self.0 + } +} + +/// A future with a user priority. +pub struct PrioFuture { + inner: F, + id: FutureId, + prio: Prio, +} + +impl PrioFuture { + /// Creates a new `PrioFuture` with an initial user priority. + pub fn new(fut: F, prio: UserPrio) -> Self { + let id = FutureId::new(); + SCHEDULER.lock().insert(id, prio.into()); + Self { + inner: fut, + id, + prio: prio.into(), + } + } + + /// Sets a new user priority for the future. + pub fn set_prio(&mut self, prio: UserPrio) { + self.prio = prio.into(); + SCHEDULER.lock().insert(self.id, prio.into()); + } +} + +impl Future for PrioFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + + let mut s = SCHEDULER.lock(); + let cur_prio = s.cur_prio; + let prio = this.prio; + let tol = Prio::weight(prio, Prio::TOL); + + s.adjust_cur_prio(prio); + + // If future prio > cur_prio + tolerance, park it + let threshold: u64 = prio.as_u64().saturating_sub(cur_prio); + if threshold > tol { + // info!("prio: {}, cur_prio: {}", this.prio, s.cur_prio); + s.park_future(this.id); + cx.waker().wake_by_ref(); + return Poll::Pending; + } else { + s.unpark_future(this.id); + } + + // SAFETY: Just projecting the pin + let inner = unsafe { Pin::new_unchecked(&mut this.inner) }; + match inner.poll(cx) { + Poll::Ready(output) => Poll::Ready(output), + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for PrioFuture { + fn drop(&mut self) { + SCHEDULER.lock().remove(self.id); + } +} + +static SCHEDULER: Mutex = Mutex::new(PrioScheduler::new()); + +type Active = usize; +type All = usize; +type PrioStatus = (Active, All); +type FutureInfo = (Prio, bool); // (pirority, is_active) +struct PrioScheduler { + pub cur_prio: u64, + tasks: BTreeMap, + status: BTreeMap, +} + +impl PrioScheduler { + pub const fn new() -> Self { + Self { + cur_prio: MIN_USER_PRIO as u64 * Prio::SCALING_FACTOR, + tasks: BTreeMap::new(), + status: BTreeMap::new(), + } + } + /// Parks a future, marking it as inactive. + pub fn park_future(&mut self, id: FutureId) { + if let Some((prio, true)) = self.tasks.get(&id) { + self.status.entry(*prio).and_modify(|(active, _)| { + *active = active.saturating_sub(1); + }); + self.tasks.insert(id, (*prio, false)); + } + } + /// Unparks a future, marking it as active. + pub fn unpark_future(&mut self, id: FutureId) { + if let Some((prio, false)) = self.tasks.get(&id) { + self.status.entry(*prio).and_modify(|(active, _)| { + *active += 1; + }); + self.tasks.insert(id, (*prio, true)); + } + } + + /// Return the highest and lowest priority in the active future + /// + /// # Output + /// + /// `(high, low)` + fn prio_range(&self) -> (Prio, Prio) { + let high = self + .status + .iter() + .filter(|(_, (active, _))| *active > 0) + .next() + .map(|(prio, _)| *prio) + .unwrap_or(Prio::MAX_PRIO); + let low = self + .status + .iter() + .filter(|(_, (active, _))| *active > 0) + .last() + .map(|(prio, _)| *prio) + .unwrap_or(Prio::MIN_PRIO); + (high, low) + } + + fn get_prio_status(&self, prio: Prio) -> PrioStatus { + self.status.get(&prio).cloned().unwrap_or((1, 1)) + } + pub fn adjust_cur_prio(&mut self, prio: Prio) { + let (active, all) = self.get_prio_status(prio); + let (_, norm_active) = Prio::norm_factor((all as u64, 0), active as u64); + + let (highest, lowest) = self.prio_range(); + let (norm_prio, _) = Prio::norm_factor((highest, lowest), prio); + + let prio: u64 = prio.into(); + let cur_prio = self.cur_prio.into(); + let factor = (Prio::weight(norm_prio, Prio::PRIO_EFFECT) + .saturating_add(Prio::weight(norm_active, Prio::ACTIVE_EFFECT))) + .clamp(Prio::CLAMP_MIN, Prio::CLAMP_MAX); + if prio > cur_prio { + self.cur_prio += Prio::weight(prio - cur_prio, factor); + } else { + self.cur_prio -= Prio::weight(cur_prio - prio, factor); + } + } + + pub fn insert(&mut self, id: FutureId, prio: Prio) { + self.status + .entry(prio) + .and_modify(|(active, all)| { + *active += 1; + *all += 1; + }) + .or_insert((1, 1)); + self.tasks + .entry(id) + .and_modify(|(old_prio, _)| { + self.status.entry(*old_prio).and_modify(|(_, cnt)| { + *cnt = cnt.saturating_sub(1); + }); + *old_prio = prio; + }) + .or_insert((prio, true)); + } + + pub fn remove(&mut self, id: FutureId) { + let Some((prio, activated)) = self.tasks.remove(&id) else { + return; + }; + self.status.entry(prio).and_modify(|(active, all)| { + if activated { + *active = active.saturating_sub(1); + } + *all = all.saturating_sub(1); + }); + self.status.retain(|_, (_, all)| *all > 0); + } +} diff --git a/modules/axembassy/src/time_driver.rs b/modules/axembassy/src/time_driver.rs new file mode 100644 index 0000000000..b2023f0b17 --- /dev/null +++ b/modules/axembassy/src/time_driver.rs @@ -0,0 +1,98 @@ +use core::cell::RefCell; +use core::sync::atomic::{AtomicU64, Ordering}; +use core::task; + +use axhal::time::{self, NANOS_PER_SEC, set_oneshot_timer}; +use kspin::SpinNoIrq; + +use embassy_time_driver::TICK_HZ; +use embassy_time_driver::{Driver, time_driver_impl}; +use embassy_time_queue_utils::Queue; + +/// Manipulation of Global `AxDriver` +pub struct AxDriverAPI; + +impl AxDriverAPI { + /// Dequeue expired timer and return nanos of next expiration + pub fn next_expiration(period: u64) -> u64 { + AX_DRIVER.next_expiration(period) + } +} + +fn ticks_to_nanos(ticks: u64) -> u64 { + (ticks as u128 * NANOS_PER_SEC as u128 / TICK_HZ as u128) as u64 +} + +fn nanos_to_ticks(nanos: u64) -> u64 { + (nanos as u128 * TICK_HZ as u128 / NANOS_PER_SEC as u128) as u64 +} + +struct AxDriver { + queue: SpinNoIrq>, + // static period interval + period_nanos: AtomicU64, +} + +time_driver_impl!(static AX_DRIVER: AxDriver = AxDriver::new()); + +impl AxDriver { + pub const fn new() -> Self { + AxDriver { + queue: SpinNoIrq::new(RefCell::new(Queue::new())), + period_nanos: AtomicU64::new(0), + } + } + + pub fn nanos_now() -> u64 { + time::monotonic_time_nanos() + } + + pub fn ticks_now() -> u64 { + let nanos_now = time::monotonic_time_nanos(); + nanos_to_ticks(nanos_now) + } + + /// schedule waker and set timer only if the next expiration interval is shorter than the periodic interval + pub fn schedule_wake(&self, at: u64, waker: &task::Waker) { + let queue_guard = self.queue.lock(); + let mut queue = queue_guard.borrow_mut(); + + if queue.schedule_wake(at, waker) { + let ticks_next_at = queue.next_expiration(self.now()); + let nanos_next_at = ticks_to_nanos(ticks_next_at); + let nanos_next_interval = nanos_next_at - Self::nanos_now(); + let nanos_period = self.period_nanos.load(Ordering::Relaxed); + if nanos_next_interval < nanos_period { + // only set timer if it is less than the periodic interval + set_oneshot_timer(nanos_next_at); + } + } + } + + /// Dequeue expired timer and return nanos of next expiration + pub fn next_expiration(&self, period: u64) -> u64 { + let queue_guard = self.queue.lock(); + let mut queue = queue_guard.borrow_mut(); + self.period_nanos.store(period, Ordering::Release); + + let ticks_now = self.now(); + + let ticks_next_expired = queue.next_expiration(ticks_now); + let nanos_next_expired = ticks_to_nanos(ticks_next_expired); + nanos_next_expired + } +} + +impl Driver for AxDriver { + // Returns the current time in **embassy ticks**. + fn now(&self) -> u64 { + Self::ticks_now() + } + + /// Schedule to wake up task by **embassy ticks** + /// + /// Set timer only if the newest expiration interval is before the period interval + fn schedule_wake(&self, at: u64, waker: &task::Waker) { + self.schedule_wake(at, waker); + } +} diff --git a/modules/axruntime/Cargo.toml b/modules/axruntime/Cargo.toml index 335ca4fb11..4d10e2cee7 100644 --- a/modules/axruntime/Cargo.toml +++ b/modules/axruntime/Cargo.toml @@ -24,6 +24,8 @@ net = ["axdriver", "axnet"] display = ["axdriver", "axdisplay"] rtc = [] +embassy-timer = ["axembassy/driver"] + [dependencies] axhal = { workspace = true } axlog = { workspace = true } @@ -35,6 +37,7 @@ axfs = { workspace = true, optional = true } axnet = { workspace = true, optional = true } axdisplay = { workspace = true, optional = true } axtask = { workspace = true, optional = true } +axembassy = { workspace = true, optional = true } axplat = "0.1" crate_interface = "0.1" diff --git a/modules/axruntime/src/lib.rs b/modules/axruntime/src/lib.rs index a1462cda15..0d1719c245 100644 --- a/modules/axruntime/src/lib.rs +++ b/modules/axruntime/src/lib.rs @@ -158,7 +158,9 @@ pub fn rust_main(cpu_id: usize, arg: usize) -> ! { axhal::init_later(cpu_id, arg); #[cfg(feature = "multitask")] - axtask::init_scheduler(); + { + axtask::init_scheduler(); + } #[cfg(any(feature = "fs", feature = "net", feature = "display"))] { @@ -252,9 +254,17 @@ fn init_interrupt() { let now_ns = axhal::time::monotonic_time_nanos(); // Safety: we have disabled preemption in IRQ handler. let mut deadline = unsafe { NEXT_DEADLINE.read_current_raw() }; - if now_ns >= deadline { + while now_ns >= deadline { deadline = now_ns + PERIODIC_INTERVAL_NANOS; } + #[cfg(feature = "embassy-timer")] + { + use axembassy::AxDriverAPI; + let next_expired = AxDriverAPI::next_expiration(PERIODIC_INTERVAL_NANOS); + if deadline >= next_expired { + deadline = next_expired; + } + } unsafe { NEXT_DEADLINE.write_current_raw(deadline + PERIODIC_INTERVAL_NANOS) }; axhal::time::set_oneshot_timer(deadline); } diff --git a/modules/axsync/Cargo.toml b/modules/axsync/Cargo.toml index dfa51f688a..bbbe639aaa 100644 --- a/modules/axsync/Cargo.toml +++ b/modules/axsync/Cargo.toml @@ -10,9 +10,10 @@ repository = "https://github.com/arceos-org/arceos/tree/main/modules/axsync" documentation = "https://arceos-org.github.io/arceos/axsync/index.html" [features] -multitask = ["axtask/multitask"] default = [] +multitask = ["axtask/multitask"] + [dependencies] kspin = "0.1" lock_api = { version = "0.4", default-features = false } diff --git a/modules/axsync/src/condvar.rs b/modules/axsync/src/condvar.rs new file mode 100644 index 0000000000..9b2f631b2f --- /dev/null +++ b/modules/axsync/src/condvar.rs @@ -0,0 +1,79 @@ +extern crate alloc; + +use crate::MutexGuard; +use axtask::{Futex, futex_wait, futex_wake, futex_wake_all}; + +use core::{sync::atomic::AtomicU32, time::Duration}; + +/// A condition variable used for synchronizing threads based on a shared condition. +pub struct Condvar { + futex: Futex, +} + +impl Condvar { + /// Creates a new [`Condvar`]. + #[inline(always)] + pub const fn new() -> Self { + Self { + futex: AtomicU32::new(0), + } + } + + /// Notifies one waiting thread. + /// + /// If there are multiple threads waiting on this condition variable, + /// only one of them will be woken up. The specific thread chosen is + /// up to the scheduler's policy for the underlying `WaitQueue`. + pub fn notify_one(&self) { + self.futex + .fetch_add(1, core::sync::atomic::Ordering::Relaxed); + futex_wake(&self.futex); + } + + /// Notifies all waiting threads. + /// + /// All threads currently waiting on this condition variable will be woken up. + pub fn notify_all(&self) { + self.futex + .fetch_add(1, core::sync::atomic::Ordering::Relaxed); + futex_wake_all(&self.futex); + } + + /// Atomically unlocks the provided mutex guard and waits for a notification + /// on this condition variable. + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { + // wait with no timeout should always return the guard + self.wait_optional_timeout(guard, None) + .expect("Condvar::wait with no timeout should not return None on timeout") + } + + /// Atomically unlocks the provided mutex guard and waits for a notification + /// on this condition variable, with a specified timeout. + pub fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + timeout: Duration, + ) -> Option> { + self.wait_optional_timeout(guard, Some(timeout)) + } + + #[inline] + fn wait_optional_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + timeout: Option, + ) -> Option> { + let expected = self.futex.load(core::sync::atomic::Ordering::Relaxed); + let mutex = lock_api::MutexGuard::mutex(&guard); + + let suc = futex_wait(&self.futex, expected, timeout); + + let new_guard = mutex.lock(); + + if !suc && timeout.is_some() { + None + } else { + Some(new_guard) + } + } +} diff --git a/modules/axsync/src/lib.rs b/modules/axsync/src/lib.rs index d1edd61dd1..e45960796b 100644 --- a/modules/axsync/src/lib.rs +++ b/modules/axsync/src/lib.rs @@ -16,9 +16,14 @@ pub use kspin as spin; +#[cfg(feature = "multitask")] +mod condvar; #[cfg(feature = "multitask")] mod mutex; +#[cfg(feature = "multitask")] +#[doc(cfg(feature = "multitask"))] +pub use self::condvar::Condvar; #[cfg(feature = "multitask")] #[doc(cfg(feature = "multitask"))] pub use self::mutex::{Mutex, MutexGuard, RawMutex}; diff --git a/modules/axsync/src/mutex.rs b/modules/axsync/src/mutex.rs index 4daa7ed753..4d5b2bcc82 100644 --- a/modules/axsync/src/mutex.rs +++ b/modules/axsync/src/mutex.rs @@ -147,4 +147,4 @@ mod tests { assert_eq!(*M.lock(), NUM_ITERS * NUM_TASKS * 3); println!("Mutex test OK"); } -} +} \ No newline at end of file diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 14939564e7..82e7668864 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -12,6 +12,8 @@ pub use crate::task::{CurrentTask, TaskId, TaskInner}; pub use crate::task_ext::{TaskExtMut, TaskExtRef}; #[doc(cfg(feature = "multitask"))] pub use crate::wait_queue::WaitQueue; +#[doc(cfg(feature = "multitask"))] +pub use crate::wait_queues::{Futex, futex_wait, futex_wake, futex_wake_all}; /// The reference type of a task. pub type AxTaskRef = Arc; @@ -213,7 +215,7 @@ pub fn exit(exit_code: i32) -> ! { pub fn run_idle() -> ! { loop { yield_now(); - debug!("idle task: waiting for IRQs..."); + // debug!("idle task: waiting for IRQs..."); #[cfg(feature = "irq")] axhal::asm::wait_for_irqs(); } diff --git a/modules/axtask/src/lib.rs b/modules/axtask/src/lib.rs index b75c562f70..fcd90b1dae 100644 --- a/modules/axtask/src/lib.rs +++ b/modules/axtask/src/lib.rs @@ -45,6 +45,7 @@ cfg_if::cfg_if! { mod task_ext; mod api; mod wait_queue; + mod wait_queues; #[cfg(feature = "irq")] mod timers; diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 42a72bfe4d..06f9aaa9cf 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -259,7 +259,7 @@ impl AxRunQueueRef<'_, G> { let cpu_id = self.inner.cpu_id; debug!("task unblock: {} on run_queue {}", task_id_name, cpu_id); // Note: when the task is unblocked on another CPU's run queue, - // we just ingiore the `resched` flag. + // we just ingnore the `resched` flag. if resched && cpu_id == this_cpu_id() { #[cfg(feature = "preempt")] crate::current().set_preempt_pending(true); @@ -663,4 +663,4 @@ pub(crate) fn init_secondary() { unsafe { RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); } -} +} \ No newline at end of file diff --git a/modules/axtask/src/wait_queues.rs b/modules/axtask/src/wait_queues.rs new file mode 100644 index 0000000000..4d9eeea945 --- /dev/null +++ b/modules/axtask/src/wait_queues.rs @@ -0,0 +1,72 @@ +use core::{sync::atomic::AtomicU32, time::Duration}; + +use alloc::collections::BTreeMap; + +use kspin::SpinNoIrq; + +use crate::WaitQueue; + +/// The address of a futex +pub type Futex = AtomicU32; + +/// A global map that associates the memory address of a `Futex` instance +/// with a `WaitQueue`. +/// +/// When tasks need to wait on a specific `Futex`, they register themselves +/// with the `WaitQueue` corresponding to that `Futex`'s memory address. +static FUTEX_WAIT_QUEUES: SpinNoIrq> = SpinNoIrq::new(BTreeMap::new()); + +/// Wakes up a single task that is currently waiting on the given `Futex`. +pub fn futex_wake(futex: &Futex) { + let futex_addr = futex as *const _ as usize; + let mut wait_queues = FUTEX_WAIT_QUEUES.lock(); + if let Some(queue) = wait_queues.get_mut(&futex_addr) { + // Wake up one task waiting on this queue. + // `notify_one(true)` means it will potentially yield the CPU + // if the woken task has higher priority. + queue.notify_one(true); + } +} + +/// Wakes up all tasks that are currently waiting on the given `Futex`. +pub fn futex_wake_all(futex: &Futex) { + let futex_addr = futex as *const _ as usize; + let mut wait_queues = FUTEX_WAIT_QUEUES.lock(); + if let Some(queue) = wait_queues.get_mut(&futex_addr) { + // Wake up all tasks waiting on this queue. + queue.notify_all(true); + } +} + +/// Attempts to wait on a `Futex` until its value is no longer `expected`. +pub fn futex_wait(futex: &Futex, expected: u32, timeout: Option) -> bool { + let futex_addr = futex as *const _ as usize; + + let current_val = futex.load(core::sync::atomic::Ordering::Relaxed); + if current_val != expected { + return false; + } + + let mut wait_queues = FUTEX_WAIT_QUEUES.lock(); + let queue = wait_queues + .entry(futex_addr) + .or_insert_with(|| WaitQueue::new()); + + let waited = match timeout { + Some(dur) => { + #[cfg(feature = "irq")] + { + !queue.wait_timeout(dur) + } + #[cfg(not(feature = "irq"))] + { + panic!("wait_timeout is not supported without irq feature"); + } + } + None => { + queue.wait(); + true + } + }; + waited +} diff --git a/ulib/axasync/Cargo.toml b/ulib/axasync/Cargo.toml new file mode 100644 index 0000000000..50ad4ee9f5 --- /dev/null +++ b/ulib/axasync/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "axasync" +version.workspace = true +edition.workspace = true +authors = [""] +description = "ArceOS user library with an interface similar to rust std" +license.workspace = true +homepage.workspace = true +repository = "https://github.com/arceos-org/arceos/tree/main/ulib/axasync" + +[features] +default = [] + +preempt = [ + "axfeat/async-preempt", + "arceos_api/async-preempt", + "thread", +] +thread = [ + "axfeat/async-thread", + "arceos_api/async-thread", + "dep:embassy-executor", +] +single = [ + "axfeat/async-single", + "arceos_api/async-single", + "dep:embassy-executor", +] + +time = ["dep:embassy-time"] +sync = ["dep:embassy-sync"] + +[dependencies] +axfeat = { workspace = true } +arceos_api = { workspace = true } + +embassy-executor = { git = "https://github.com/embassy-rs/embassy", branch = "main", default-features = false, optional = true } +embassy-futures = { git = "https://github.com/embassy-rs/embassy", branch = "main" } +embassy-sync = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true } +embassy-time = { git = "https://github.com/embassy-rs/embassy", branch = "main", optional = true } + +cfg-if = "1.0.0" +static_cell = "2.1.0" diff --git a/ulib/axasync/src/lib.rs b/ulib/axasync/src/lib.rs new file mode 100644 index 0000000000..54e29a8284 --- /dev/null +++ b/ulib/axasync/src/lib.rs @@ -0,0 +1,56 @@ +//! # The ArceOS Asynchronous Library +//! +//! The [ArceOS] Asynchronous Library is a library for ArceOS to provide async +//! functionality other than standard library. Currently is mainly used for embassy runtime. +//! +//! ## Cargo Features +//! +//! - Utils +//! - `time`: Enable `Embassy` time related support. +//! - `sync`: Enable `Embassy` sync related support. +//! - Executor +//! - `single`: Use the single-thread executor. +//! - `thread`: Use the multi-thread executor. +//! - `preempt`: Use the preemptive executor. +//! +//! [ArceOS]: https://github.com/arceos-org/arceos +//! +#![cfg_attr(all(not(test), not(doc)), no_std)] +#![feature(doc_cfg)] +#![feature(doc_auto_cfg)] + +/// The embassy executor. +#[cfg(any(feature = "thread", feature = "preempt", feature = "single"))] +pub mod executor { + use arceos_api::embassy_async as api; + + pub use api::AxExecutor as Executor; + pub use embassy_executor::*; + pub use embassy_futures::*; + + #[cfg(feature = "preempt")] + pub use api::AxPrioFuture as PrioFuture; + + #[cfg(feature = "thread")] + pub use api::{ax_block_on as block_on, ax_spawner as spawner}; +} + +/// The embassy time related functionality. +#[cfg(feature = "time")] +pub mod time { + pub use embassy_time::*; +} + +/// The embassy sync related functionality. +#[cfg(feature = "sync")] +pub mod sync { + pub use embassy_sync::*; +} + +/// The static cell. +pub mod cell { + pub use static_cell::{ConstStaticCell, StaticCell}; +} + +// #[cfg(all(any(feature = "thread", feature = "preempt"), feature = "single"))] +// compile_error!(r#"feature "executor-thread" and "executor-single" are mutually exclusive"#); diff --git a/ulib/axstd/src/sync/condvar.rs b/ulib/axstd/src/sync/condvar.rs new file mode 100644 index 0000000000..ae17c94fd3 --- /dev/null +++ b/ulib/axstd/src/sync/condvar.rs @@ -0,0 +1,79 @@ +extern crate alloc; + +use arceos_api::task::{AxFutex, ax_futex_wait, ax_futex_wake, ax_futex_wake_all}; +use core::time::Duration; + +use crate::sync::MutexGuard; + +/// A condition variable used for synchronizing threads based on a shared condition. +pub struct Condvar { + futex: AxFutex, +} + +impl Condvar { + /// Creates a new [`Condvar`]. + #[inline(always)] + pub const fn new() -> Self { + Self { + futex: AxFutex::new(0), + } + } + + /// Notifies one waiting thread. + /// + /// If there are multiple threads waiting on this condition variable, + /// only one of them will be woken up. The specific thread chosen is + /// up to the scheduler's policy for the underlying `WaitQueue`. + pub fn notify_one(&self) { + self.futex + .fetch_add(1, core::sync::atomic::Ordering::Relaxed); + ax_futex_wake(&self.futex); + } + + /// Notifies all waiting threads. + /// + /// All threads currently waiting on this condition variable will be woken up. + pub fn notify_all(&self) { + self.futex + .fetch_add(1, core::sync::atomic::Ordering::Relaxed); + ax_futex_wake_all(&self.futex); + } + + /// Atomically unlocks the provided mutex guard and waits for a notification + /// on this condition variable. + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { + // wait with no timeout should always return the guard + self.wait_optional_timeout(guard, None) + .expect("Condvar::wait with no timeout should not return None on timeout") + } + + /// Atomically unlocks the provided mutex guard and waits for a notification + /// on this condition variable, with a specified timeout. + pub fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + timeout: Duration, + ) -> Option> { + self.wait_optional_timeout(guard, Some(timeout)) + } + + #[inline(always)] + fn wait_optional_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + timeout: Option, + ) -> Option> { + let expected = self.futex.load(core::sync::atomic::Ordering::Relaxed); + let mutex = lock_api::MutexGuard::mutex(&guard); + + let suc = ax_futex_wait(&self.futex, expected, timeout); + + let new_guard = mutex.lock(); + + if !suc && timeout.is_some() { + None + } else { + Some(new_guard) + } + } +} diff --git a/ulib/axstd/src/sync/mod.rs b/ulib/axstd/src/sync/mod.rs index 967f280956..a589fa8250 100644 --- a/ulib/axstd/src/sync/mod.rs +++ b/ulib/axstd/src/sync/mod.rs @@ -7,9 +7,14 @@ pub use core::sync::atomic; #[doc(no_inline)] pub use alloc::sync::{Arc, Weak}; +#[cfg(feature = "multitask")] +mod condvar; #[cfg(feature = "multitask")] mod mutex; +#[cfg(feature = "multitask")] +#[doc(cfg(feature = "multitask"))] +pub use self::condvar::Condvar; #[cfg(feature = "multitask")] #[doc(cfg(feature = "multitask"))] pub use self::mutex::{Mutex, MutexGuard, RawMutex};