diff --git a/Cargo.lock b/Cargo.lock index 3dca9d2466246..3a69c1335a67a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,6 +328,18 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.3.15" @@ -2214,6 +2226,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -9881,14 +9903,19 @@ name = "turbopack-node" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "async-stream", "async-trait", "base64 0.21.4", "const_format", + "dashmap 6.1.0", "either", "futures", "futures-retry", "indoc", + "napi", + "napi-build", + "napi-derive", "once_cell", "owo-colors", "parking_lot", @@ -9908,6 +9935,7 @@ dependencies = [ "turbopack-core", "turbopack-ecmascript", "turbopack-resolve", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e685d439e41fe..ce489135bf852 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -423,6 +423,8 @@ napi = { version = "2", default-features = false, features = [ "napi5", "compat-mode", ] } +napi-derive = "2" +napi-build = "2" notify = "8.1.0" once_cell = "1.17.1" owo-colors = "4.2.2" diff --git a/crates/napi/Cargo.toml b/crates/napi/Cargo.toml index a0a3ddec1f039..76f049f083ac6 100644 --- a/crates/napi/Cargo.toml +++ b/crates/napi/Cargo.toml @@ -46,12 +46,12 @@ workspace = true [package.metadata.cargo-shear] ignored = [ - # we need to set features on these packages when building for WASM, but we don't directly use them - "getrandom", - "iana-time-zone", - # the plugins feature needs to set a feature on this transitively depended-on package, we never - # directly import it - "turbopack-ecmascript-plugins", + # we need to set features on these packages when building for WASM, but we don't directly use them + "getrandom", + "iana-time-zone", + # the plugins feature needs to set a feature on this transitively depended-on package, we never + # directly import it + "turbopack-ecmascript-plugins", ] [dependencies] @@ -63,7 +63,7 @@ flate2 = { workspace = true } futures-util = { workspace = true } owo-colors = { workspace = true } napi = { workspace = true } -napi-derive = "2" +napi-derive = { workspace = true } next-custom-transforms = { workspace = true } next-taskless = { workspace = true } rand = { workspace = true } @@ -115,7 +115,7 @@ next-core = { workspace = true } mdxjs = { workspace = true, features = ["serializable"] } turbo-tasks-malloc = { workspace = true, default-features = false, features = [ - "custom_allocator" + "custom_allocator", ] } turbopack-core = { workspace = true } @@ -143,8 +143,7 @@ tokio = { workspace = true, features = ["full"] } [build-dependencies] anyhow = { workspace = true } -napi-build = "2" +napi-build = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } vergen-gitcl = { workspace = true } - diff --git a/crates/next-api/src/project.rs b/crates/next-api/src/project.rs index 92caa1d190df6..9c4a2985cbdc9 100644 --- a/crates/next-api/src/project.rs +++ b/crates/next-api/src/project.rs @@ -1056,11 +1056,12 @@ impl Project { // At this point all modules have been computed and we can get rid of the node.js // process pools - if *self.is_watch_enabled().await? { - turbopack_node::evaluate::scale_down(); - } else { - turbopack_node::evaluate::scale_zero(); - } + // TODO: + // if *self.is_watch_enabled().await? { + // turbopack_node::evaluate::scale_down(); + // } else { + // turbopack_node::evaluate::scale_zero(); + // } Ok(module_graphs_vc) } diff --git a/packages/next/src/build/swc/generated-native.d.ts b/packages/next/src/build/swc/generated-native.d.ts index 08fed01ec5009..4d8b188f01036 100644 --- a/packages/next/src/build/swc/generated-native.d.ts +++ b/packages/next/src/build/swc/generated-native.d.ts @@ -34,6 +34,18 @@ export declare class ExternalObject { [K: symbol]: T } } +export declare function recvPoolRequest(): Promise +export declare function notifyPoolCreated(filename: string): Promise +export declare function recvWorkerRequest(poolId: string): Promise +export declare function notifyWorkerAck(poolId: string): Promise +export declare function recvEvaluation(poolId: string): Promise> +export declare function recvMessageInWorker( + workerId: number +): Promise> +export declare function sendTaskResponse( + taskId: string, + data: Array +): Promise export declare function lockfileTryAcquireSync( path: string ): { __napiType: 'Lockfile' } | null diff --git a/turbopack/crates/turbopack-node/Cargo.toml b/turbopack/crates/turbopack-node/Cargo.toml index 646d64b259793..e22ec55d63fba 100644 --- a/turbopack/crates/turbopack-node/Cargo.toml +++ b/turbopack/crates/turbopack-node/Cargo.toml @@ -10,8 +10,11 @@ autobenches = false bench = false [features] +default = ["worker_thread"] # enable "HMR" for embedded assets dynamic_embed_contents = ["turbo-tasks-fs/dynamic_embed_contents"] +child_process = ["tokio"] +worker_thread = ["napi", "napi-derive"] [lints] workspace = true @@ -20,6 +23,8 @@ workspace = true anyhow = { workspace = true } async-stream = "0.3.4" async-trait = { workspace = true } +async-channel = "2.5.0" +dashmap = { workspace = true } base64 = "0.21.0" const_format = { workspace = true } either = { workspace = true, features = ["serde"] } @@ -34,7 +39,10 @@ rustc-hash = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } -tokio = { workspace = true, features = ["full"] } +tokio = { workspace = true, optional = true, features = ["full"] } +uuid = { workspace = true, features = ["v4"] } +napi = { workspace = true, optional = true, features = ["anyhow"] } +napi-derive = { workspace = true, optional = true } tracing = { workspace = true } turbo-rcstr = { workspace = true } turbo-tasks = { workspace = true } @@ -46,3 +54,5 @@ turbopack-core = { workspace = true } turbopack-ecmascript = { workspace = true } turbopack-resolve = { workspace = true } +[build-dependencies] +napi-build = { workspace = true } diff --git a/turbopack/crates/turbopack-node/build.rs b/turbopack/crates/turbopack-node/build.rs new file mode 100644 index 0000000000000..8efb97e10c4e7 --- /dev/null +++ b/turbopack/crates/turbopack-node/build.rs @@ -0,0 +1,6 @@ +fn main() { + #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] + if std::env::var("CARGO_FEATURE_WORKER_THREAD").is_ok() { + napi_build::setup(); + } +} diff --git a/turbopack/crates/turbopack-node/src/evaluate.rs b/turbopack/crates/turbopack-node/src/evaluate.rs index 7d8aed4651ec1..9df17e85ceac1 100644 --- a/turbopack/crates/turbopack-node/src/evaluate.rs +++ b/turbopack/crates/turbopack-node/src/evaluate.rs @@ -10,7 +10,7 @@ use turbo_tasks::{ TryJoinIterExt, Vc, duration_span, fxindexmap, get_effects, trace::TraceRawVcs, }; use turbo_tasks_env::{EnvMap, ProcessEnv}; -use turbo_tasks_fs::{File, FileSystemPath, to_sys_path}; +use turbo_tasks_fs::{File, FileSystemPath, json::parse_json_with_source_context, to_sys_path}; use turbopack_core::{ asset::AssetContent, changed::content_changed, @@ -31,17 +31,18 @@ use turbopack_core::{ virtual_source::VirtualSource, }; +#[cfg(feature = "child_process")] +use crate::process_pool::ChildProcessPool as Pool; +#[cfg(feature = "worker_thread")] +use crate::worker_pool::WorkerThreadPool as Pool; use crate::{ - AssetsForSourceMapping, - embed_js::embed_file_path, - emit, emit_package_json, internal_assets_for_source_mapping, - pool::{FormattingMode, NodeJsOperation, NodeJsPool}, - source_map::StructuredError, + AssetsForSourceMapping, embed_js::embed_file_path, emit, emit_package_json, + format::FormattingMode, internal_assets_for_source_mapping, source_map::StructuredError, }; #[derive(Serialize)] #[serde(tag = "type", rename_all = "camelCase")] -enum EvalJavaScriptOutgoingMessage<'a> { +pub enum EvalJavaScriptOutgoingMessage<'a> { #[serde(rename_all = "camelCase")] Evaluate { args: Vec<&'a JsonValue> }, Result { @@ -53,13 +54,59 @@ enum EvalJavaScriptOutgoingMessage<'a> { #[derive(Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] -enum EvalJavaScriptIncomingMessage { +pub enum EvalJavaScriptIncomingMessage { Info { data: JsonValue }, Request { id: u64, data: JsonValue }, End { data: Option }, Error(StructuredError), } +#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)] +pub struct EvaluatePool { + pub id: RcStr, + #[turbo_tasks(trace_ignore, debug_ignore)] + pool: Box, + pub assets_for_source_mapping: ResolvedVc, + pub assets_root: FileSystemPath, + pub project_dir: FileSystemPath, +} + +impl EvaluatePool { + pub async fn operation(&self) -> Result> { + self.pool.operation().await + } +} + +impl EvaluatePool { + pub(crate) fn new( + id: RcStr, + pool: Box, + assets_for_source_mapping: ResolvedVc, + assets_root: FileSystemPath, + project_dir: FileSystemPath, + ) -> Self { + Self { + id, + pool, + assets_for_source_mapping, + assets_root, + project_dir, + } + } +} + +#[async_trait::async_trait] +pub trait EvaluateOperation: Send + Sync { + async fn operation(&self) -> Result>; +} + +#[async_trait::async_trait] +pub trait Operation: Send { + async fn recv(&mut self) -> Result>; + + async fn send(&mut self, data: Vec) -> Result<()>; +} + #[turbo_tasks::value] struct EmittedEvaluatePoolAssets { bootstrap: ResolvedVc>, @@ -161,7 +208,7 @@ pub async fn get_evaluate_pool( additional_invalidation: ResolvedVc, debug: bool, env_var_tracking: EnvVarTracking, -) -> Result> { +) -> Result> { let operation = emit_evaluate_pool_assets_with_effects_operation(entries, chunking_context, module_graph); let EmittedEvaluatePoolAssetsWithEffects { assets, effects } = @@ -199,7 +246,7 @@ pub async fn get_evaluate_pool( env.read_all().untracked().await? } }; - let pool = NodeJsPool::new( + let pool = Pool::create( cwd, entrypoint, env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(), @@ -256,7 +303,7 @@ pub trait EvaluateContext { type ResponseMessage: Serialize; type State: Default; - fn pool(&self) -> OperationVc; + fn pool(&self) -> OperationVc; fn keep_alive(&self) -> bool { false } @@ -265,24 +312,24 @@ pub trait EvaluateContext { fn emit_error( &self, error: StructuredError, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn info( &self, state: &mut Self::State, data: Self::InfoMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn request( &self, state: &mut Self::State, data: Self::RequestMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn finish( &self, state: Self::State, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; } @@ -297,7 +344,7 @@ pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result Result Result( - operation: &mut NodeJsOperation, - pool: &NodeJsPool, + operation: &mut Box, + pool: &EvaluatePool, evaluate_context: &T, state: &mut T::State, ) -> Result> { let _guard = duration_span!("Node.js evaluation"); loop { - match operation.recv().await? { + let buf = operation.recv().await?; + let message = parse_json_with_source_context(std::str::from_utf8(&buf)?)?; + + match message { EvalJavaScriptIncomingMessage::Error(error) => { evaluate_context.emit_error(error, pool).await?; // Do not reuse the process in case of error - operation.disallow_reuse(); + // operation.disallow_reuse(); // Issue emitted, we want to break but don't want to return an error return Ok(None); } @@ -484,20 +536,24 @@ async fn pull_operation( { Ok(response) => { operation - .send(EvalJavaScriptOutgoingMessage::Result { - id, - error: None, - data: Some(serde_json::to_value(response)?), - }) + .send(serde_json::to_vec( + &EvalJavaScriptOutgoingMessage::Result { + id, + error: None, + data: Some(serde_json::to_value(response)?), + }, + )?) .await?; } Err(e) => { operation - .send(EvalJavaScriptOutgoingMessage::Result { - id, - error: Some(PrettyPrintError(&e).to_string()), - data: None, - }) + .send(serde_json::to_vec( + &EvalJavaScriptOutgoingMessage::Result { + id, + error: Some(PrettyPrintError(&e).to_string()), + data: None, + }, + )?) .await?; } } @@ -525,7 +581,7 @@ impl EvaluateContext for BasicEvaluateContext { type ResponseMessage = (); type State = (); - fn pool(&self) -> OperationVc { + fn pool(&self) -> OperationVc { get_evaluate_pool( self.entries, self.cwd.clone(), @@ -550,7 +606,7 @@ impl EvaluateContext for BasicEvaluateContext { !self.args.is_empty() } - async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> { + async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> { EvaluationIssue { error, source: IssueSource::from_source_only(self.context_source_for_issue), @@ -567,7 +623,7 @@ impl EvaluateContext for BasicEvaluateContext { &self, _state: &mut Self::State, _data: Self::InfoMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result<()> { bail!("BasicEvaluateContext does not support info messages") } @@ -576,24 +632,16 @@ impl EvaluateContext for BasicEvaluateContext { &self, _state: &mut Self::State, _data: Self::RequestMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result { bail!("BasicEvaluateContext does not support request messages") } - async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()> { + async fn finish(&self, _state: Self::State, _pool: &EvaluatePool) -> Result<()> { Ok(()) } } -pub fn scale_zero() { - NodeJsPool::scale_zero(); -} - -pub fn scale_down() { - NodeJsPool::scale_down(); -} - /// An issue that occurred while evaluating node code. #[turbo_tasks::value(shared)] pub struct EvaluationIssue { diff --git a/turbopack/crates/turbopack-node/src/format.rs b/turbopack/crates/turbopack-node/src/format.rs new file mode 100644 index 0000000000000..27dd550c28119 --- /dev/null +++ b/turbopack/crates/turbopack-node/src/format.rs @@ -0,0 +1,36 @@ +use std::fmt::Display; + +use owo_colors::{OwoColorize, Style}; + +#[derive(Clone, Copy)] +pub enum FormattingMode { + /// No formatting, just print the output + Plain, + /// Use ansi colors to format the output + AnsiColors, +} + +impl FormattingMode { + pub fn magic_identifier<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => format!("{{{content}}}"), + FormattingMode::AnsiColors => format!("{{{content}}}").italic().to_string(), + } + } + + pub fn lowlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => Style::new(), + FormattingMode::AnsiColors => Style::new().dimmed(), + } + .style(content) + } + + pub fn highlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => Style::new(), + FormattingMode::AnsiColors => Style::new().bold().underline(), + } + .style(content) + } +} diff --git a/turbopack/crates/turbopack-node/src/lib.rs b/turbopack/crates/turbopack-node/src/lib.rs index 315e3b9347e6a..0008c5475d493 100644 --- a/turbopack/crates/turbopack-node/src/lib.rs +++ b/turbopack/crates/turbopack-node/src/lib.rs @@ -2,35 +2,28 @@ #![feature(arbitrary_self_types)] #![feature(arbitrary_self_types_pointers)] -use std::thread::available_parallelism; - -use anyhow::{Result, bail}; +use anyhow::Result; use rustc_hash::FxHashMap; -use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ResolvedVc, TryFlatJoinIterExt, Vc}; -use turbo_tasks_env::ProcessEnv; -use turbo_tasks_fs::{File, FileSystemPath, to_sys_path}; +use turbo_tasks_fs::{File, FileSystemPath}; use turbopack_core::{ asset::{Asset, AssetContent}, - changed::content_changed, - chunk::{ChunkingContext, ChunkingContextExt, EvaluatableAsset, EvaluatableAssets}, - module::Module, - module_graph::{ModuleGraph, chunk_group_info::ChunkGroupEntry}, output::{ExpandOutputAssetsInput, OutputAsset, OutputAssets, expand_output_assets}, source_map::GenerateSourceMap, virtual_output::VirtualOutputAsset, }; -use self::pool::NodeJsPool; - pub mod debug; pub mod embed_js; pub mod evaluate; pub mod execution_context; -mod heap_queue; -mod pool; +mod format; +#[cfg(feature = "child_process")] +mod process_pool; pub mod source_map; pub mod transforms; +#[cfg(feature = "worker_thread")] +mod worker_pool; #[turbo_tasks::function] async fn emit( @@ -112,93 +105,3 @@ fn emit_package_json(dir: FileSystemPath) -> Result> { dir, )) } - -/// Creates a node.js renderer pool for an entrypoint. -#[turbo_tasks::function(operation)] -pub async fn get_renderer_pool_operation( - cwd: FileSystemPath, - env: ResolvedVc>, - intermediate_asset: ResolvedVc>, - intermediate_output_path: FileSystemPath, - output_root: FileSystemPath, - project_dir: FileSystemPath, - debug: bool, -) -> Result> { - emit_package_json(intermediate_output_path.clone())?.await?; - - emit(*intermediate_asset, output_root.clone()) - .as_side_effect() - .await?; - let assets_for_source_mapping = - internal_assets_for_source_mapping(*intermediate_asset, output_root.clone()); - - let entrypoint = intermediate_asset.path().owned().await?; - - let Some(cwd) = to_sys_path(cwd.clone()).await? else { - bail!( - "can only render from a disk filesystem, but `cwd = {}`", - cwd.value_to_string().await? - ); - }; - let Some(entrypoint) = to_sys_path(entrypoint.clone()).await? else { - bail!( - "can only render from a disk filesystem, but `entrypoint = {}`", - entrypoint.value_to_string().await? - ); - }; - // Invalidate pool when code content changes - content_changed(*ResolvedVc::upcast(intermediate_asset)).await?; - - Ok(NodeJsPool::new( - cwd, - entrypoint, - env.read_all() - .await? - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - assets_for_source_mapping.to_resolved().await?, - output_root, - project_dir, - available_parallelism().map_or(1, |v| v.get()), - debug, - ) - .cell()) -} - -/// Converts a module graph into node.js executable assets -#[turbo_tasks::function] -pub async fn get_intermediate_asset( - chunking_context: Vc>, - main_entry: ResolvedVc>, - other_entries: Vc, -) -> Result>> { - Ok(chunking_context.root_entry_chunk_group_asset( - chunking_context - .chunk_path(None, main_entry.ident(), None, rcstr!(".js")) - .owned() - .await?, - other_entries.with_entry(*main_entry), - ModuleGraph::from_modules( - Vc::cell(vec![ChunkGroupEntry::Entry( - other_entries - .await? - .into_iter() - .copied() - .chain(std::iter::once(main_entry)) - .map(ResolvedVc::upcast) - .collect(), - )]), - false, - ), - OutputAssets::empty(), - OutputAssets::empty(), - )) -} - -#[derive(Clone, Debug)] -#[turbo_tasks::value(shared)] -pub struct ResponseHeaders { - pub status: u16, - pub headers: Vec<(RcStr, RcStr)>, -} diff --git a/turbopack/crates/turbopack-node/src/heap_queue.rs b/turbopack/crates/turbopack-node/src/process_pool/heap_queue.rs similarity index 100% rename from turbopack/crates/turbopack-node/src/heap_queue.rs rename to turbopack/crates/turbopack-node/src/process_pool/heap_queue.rs diff --git a/turbopack/crates/turbopack-node/src/pool.rs b/turbopack/crates/turbopack-node/src/process_pool/mod.rs similarity index 87% rename from turbopack/crates/turbopack-node/src/pool.rs rename to turbopack/crates/turbopack-node/src/process_pool/mod.rs index 78fce6f7b58b9..2e852714cae06 100644 --- a/turbopack/crates/turbopack-node/src/pool.rs +++ b/turbopack/crates/turbopack-node/src/process_pool/mod.rs @@ -1,7 +1,6 @@ use std::{ - borrow::Cow, cmp::max, - fmt::{Debug, Display}, + fmt::Debug, future::Future, mem::take, path::{Path, PathBuf}, @@ -13,10 +12,9 @@ use std::{ use anyhow::{Context, Result, bail}; use futures::join; use once_cell::sync::Lazy; -use owo_colors::{OwoColorize, Style}; +use owo_colors::OwoColorize; use parking_lot::Mutex; use rustc_hash::FxHashMap; -use serde::{Serialize, de::DeserializeOwned}; use tokio::{ io::{ AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr, @@ -30,50 +28,22 @@ use tokio::{ }; use turbo_rcstr::RcStr; use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span}; -use turbo_tasks_fs::{FileSystemPath, json::parse_json_with_source_context}; +use turbo_tasks_fs::FileSystemPath; use turbopack_ecmascript::magic_identifier::unmangle_identifiers; -use crate::{AssetsForSourceMapping, heap_queue::HeapQueue, source_map::apply_source_mapping}; - -#[derive(Clone, Copy)] -pub enum FormattingMode { - /// No formatting, just print the output - Plain, - /// Use ansi colors to format the output - AnsiColors, -} - -impl FormattingMode { - pub fn magic_identifier<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => format!("{{{content}}}"), - FormattingMode::AnsiColors => format!("{{{content}}}").italic().to_string(), - } - } - - pub fn lowlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => Style::new(), - FormattingMode::AnsiColors => Style::new().dimmed(), - } - .style(content) - } +use crate::{ + AssetsForSourceMapping, + evaluate::{EvaluateOperation, EvaluatePool, Operation}, + format::FormattingMode, + source_map::apply_source_mapping, +}; - pub fn highlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => Style::new(), - FormattingMode::AnsiColors => Style::new().bold().underline(), - } - .style(content) - } -} +mod heap_queue; +use heap_queue::HeapQueue; struct NodeJsPoolProcess { child: Option, connection: TcpStream, - assets_for_source_mapping: ResolvedVc, - assets_root: FileSystemPath, - project_dir: FileSystemPath, stdout_handler: OutputStreamHandler, stderr_handler: OutputStreamHandler, debug: bool, @@ -107,39 +77,6 @@ impl PartialEq for NodeJsPoolProcess { } } -impl NodeJsPoolProcess { - pub async fn apply_source_mapping<'a>( - &self, - text: &'a str, - formatting_mode: FormattingMode, - ) -> Result> { - let text = unmangle_identifiers(text, |content| formatting_mode.magic_identifier(content)); - match text { - Cow::Borrowed(text) => { - apply_source_mapping( - text, - *self.assets_for_source_mapping, - self.assets_root.clone(), - self.project_dir.clone(), - formatting_mode, - ) - .await - } - Cow::Owned(ref text) => { - let cow = apply_source_mapping( - text, - *self.assets_for_source_mapping, - self.assets_root.clone(), - self.project_dir.clone(), - formatting_mode, - ) - .await?; - Ok(Cow::Owned(cow.into_owned())) - } - } - } -} - const CONNECT_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Clone, PartialEq, Eq, Hash)] @@ -456,9 +393,6 @@ impl NodeJsPoolProcess { let mut process = Self { child: Some(child), connection, - assets_for_source_mapping, - assets_root: assets_root.clone(), - project_dir: project_dir.clone(), stdout_handler, stderr_handler, debug, @@ -726,7 +660,7 @@ static ACTIVE_POOLS: Lazy = Lazy::new(Default::default); /// The worker will *not* use the env of the parent process by default. All env /// vars need to be provided to make the execution as pure as possible. #[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)] -pub struct NodeJsPool { +pub struct ChildProcessPool { cwd: PathBuf, entrypoint: PathBuf, env: FxHashMap, @@ -751,10 +685,10 @@ pub struct NodeJsPool { stats: Arc>, } -impl NodeJsPool { +impl ChildProcessPool { /// * debug: Whether to automatically enable Node's `--inspect-brk` when spawning it. Note: /// automatically overrides concurrency to 1. - pub(super) fn new( + pub fn create( cwd: PathBuf, entrypoint: PathBuf, env: FxHashMap, @@ -763,24 +697,53 @@ impl NodeJsPool { project_dir: FileSystemPath, concurrency: usize, debug: bool, - ) -> Self { - Self { - cwd, - entrypoint, - env, + ) -> EvaluatePool { + EvaluatePool::new( + entrypoint.to_string_lossy().to_string().into(), + Box::new(Self { + cwd, + entrypoint, + env, + assets_for_source_mapping, + assets_root: assets_root.clone(), + project_dir: project_dir.clone(), + concurrency_semaphore: Arc::new(Semaphore::new(if debug { + 1 + } else { + concurrency + })), + bootup_semaphore: Arc::new(Semaphore::new(1)), + idle_processes: Arc::new(HeapQueue::new()), + shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())), + shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())), + debug, + stats: Default::default(), + }), assets_for_source_mapping, assets_root, project_dir, - concurrency_semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })), - bootup_semaphore: Arc::new(Semaphore::new(1)), - idle_processes: Arc::new(HeapQueue::new()), - shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())), - shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())), - debug, - stats: Default::default(), - } + ) } +} + +#[async_trait::async_trait] +impl EvaluateOperation for ChildProcessPool { + async fn operation(&self) -> Result> { + // Acquire a running process (handles concurrency limits, boots up the process) + let (process, permits) = self.acquire_process().await?; + Ok(Box::new(NodeJsOperation { + process: Some(process), + permits, + idle_processes: self.idle_processes.clone(), + start: Instant::now(), + stats: self.stats.clone(), + allow_process_reuse: true, + })) + } +} + +impl ChildProcessPool { async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> { { self.stats.lock().add_queued_task(); @@ -837,20 +800,7 @@ impl NodeJsPool { Ok((process, start.elapsed())) } - pub async fn operation(&self) -> Result { - // Acquire a running process (handles concurrency limits, boots up the process) - let (process, permits) = self.acquire_process().await?; - - Ok(NodeJsOperation { - process: Some(process), - permits, - idle_processes: self.idle_processes.clone(), - start: Instant::now(), - stats: self.stats.clone(), - allow_process_reuse: true, - }) - } - + #[allow(dead_code)] pub fn scale_down() { let pools = ACTIVE_POOLS.lock().clone(); for pool in pools { @@ -858,6 +808,7 @@ impl NodeJsPool { } } + #[allow(dead_code)] pub fn scale_zero() { let pools = ACTIVE_POOLS.lock().clone(); for pool in pools { @@ -877,6 +828,27 @@ pub struct NodeJsOperation { allow_process_reuse: bool, } +#[async_trait::async_trait] +impl Operation for NodeJsOperation { + async fn recv(&mut self) -> Result> { + self.with_process(|process| async move { + process.recv().await.context("failed to receive message") + }) + .await + } + + async fn send(&mut self, data: Vec) -> Result<()> { + self.with_process(|process| async move { + timeout(Duration::from_secs(30), process.send(data)) + .await + .context("timeout while sending message")? + .context("failed to send message")?; + Ok(()) + }) + .await + } +} + impl NodeJsOperation { async fn with_process<'a, F: Future> + Send + 'a, T>( &'a mut self, @@ -899,34 +871,7 @@ impl NodeJsOperation { result } - pub async fn recv(&mut self) -> Result - where - M: DeserializeOwned, - { - let message = self - .with_process(|process| async move { - process.recv().await.context("failed to receive message") - }) - .await?; - let message = std::str::from_utf8(&message).context("message is not valid UTF-8")?; - parse_json_with_source_context(message).context("failed to deserialize message") - } - - pub async fn send(&mut self, message: M) -> Result<()> - where - M: Serialize, - { - let message = serde_json::to_vec(&message).context("failed to serialize message")?; - self.with_process(|process| async move { - timeout(Duration::from_secs(30), process.send(message)) - .await - .context("timeout while sending message")? - .context("failed to send message")?; - Ok(()) - }) - .await - } - + #[allow(dead_code)] pub async fn wait_or_kill(mut self) -> Result { let mut process = self .process @@ -952,24 +897,13 @@ impl NodeJsOperation { Ok(status) } + #[allow(dead_code)] pub fn disallow_reuse(&mut self) { if self.allow_process_reuse { self.stats.lock().remove_worker(); self.allow_process_reuse = false; } } - - pub async fn apply_source_mapping<'a>( - &self, - text: &'a str, - formatting_mode: FormattingMode, - ) -> Result> { - if let Some(process) = self.process.as_ref() { - process.apply_source_mapping(text, formatting_mode).await - } else { - Ok(Cow::Borrowed(text)) - } - } } impl Drop for NodeJsOperation { diff --git a/turbopack/crates/turbopack-node/src/source_map/mod.rs b/turbopack/crates/turbopack-node/src/source_map/mod.rs index 58b2584b095a8..13d486da523ef 100644 --- a/turbopack/crates/turbopack-node/src/source_map/mod.rs +++ b/turbopack/crates/turbopack-node/src/source_map/mod.rs @@ -9,7 +9,6 @@ use const_format::concatcp; use once_cell::sync::Lazy; use regex::Regex; pub use trace::{StackFrame, TraceResult, trace_source_map}; -use tracing::{Level, instrument}; use turbo_tasks::{ReadRef, Vc}; use turbo_tasks_fs::{ FileLinesContent, FileSystemPath, source_context::get_source_context, to_sys_path, @@ -17,12 +16,11 @@ use turbo_tasks_fs::{ use turbopack_cli_utils::source_context::format_source_context_lines; use turbopack_core::{ PROJECT_FILESYSTEM_NAME, SOURCE_URL_PROTOCOL, - output::OutputAsset, source_map::{GenerateSourceMap, SourceMap}, }; use turbopack_ecmascript::magic_identifier::unmangle_identifiers; -use crate::{AssetsForSourceMapping, internal_assets_for_source_mapping, pool::FormattingMode}; +use crate::{AssetsForSourceMapping, format::FormattingMode}; pub mod trace; @@ -332,38 +330,3 @@ impl StructuredError { Ok(message) } } - -pub async fn trace_stack( - error: StructuredError, - root_asset: Vc>, - output_path: FileSystemPath, - project_dir: FileSystemPath, -) -> Result { - let assets_for_source_mapping = - internal_assets_for_source_mapping(root_asset, output_path.clone()); - - trace_stack_with_source_mapping_assets( - error, - assets_for_source_mapping, - output_path, - project_dir, - ) - .await -} - -#[instrument(level = Level::TRACE, skip_all)] -pub async fn trace_stack_with_source_mapping_assets( - error: StructuredError, - assets_for_source_mapping: Vc, - output_path: FileSystemPath, - project_dir: FileSystemPath, -) -> Result { - error - .print( - assets_for_source_mapping, - output_path, - project_dir, - FormattingMode::Plain, - ) - .await -} diff --git a/turbopack/crates/turbopack-node/src/transforms/webpack.rs b/turbopack/crates/turbopack-node/src/transforms/webpack.rs index 899849fb15520..e013669230729 100644 --- a/turbopack/crates/turbopack-node/src/transforms/webpack.rs +++ b/turbopack/crates/turbopack-node/src/transforms/webpack.rs @@ -55,11 +55,11 @@ use crate::{ debug::should_debug, embed_js::embed_file_path, evaluate::{ - EnvVarTracking, EvaluateContext, EvaluateEntries, EvaluationIssue, custom_evaluate, - get_evaluate_entries, get_evaluate_pool, + EnvVarTracking, EvaluateContext, EvaluateEntries, EvaluatePool, EvaluationIssue, + custom_evaluate, get_evaluate_entries, get_evaluate_pool, }, execution_context::ExecutionContext, - pool::{FormattingMode, NodeJsPool}, + format::FormattingMode, source_map::{StackFrame, StructuredError}, }; @@ -433,7 +433,7 @@ impl EvaluateContext for WebpackLoaderContext { type ResponseMessage = ResponseMessage; type State = Vec; - fn pool(&self) -> OperationVc { + fn pool(&self) -> OperationVc { get_evaluate_pool( self.entries, self.cwd.clone(), @@ -461,7 +461,7 @@ impl EvaluateContext for WebpackLoaderContext { true } - async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> { + async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> { EvaluationIssue { error, source: IssueSource::from_source_only(self.context_source_for_issue), @@ -478,7 +478,7 @@ impl EvaluateContext for WebpackLoaderContext { &self, state: &mut Self::State, data: Self::InfoMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> Result<()> { match data { InfoMessage::Dependencies { @@ -554,7 +554,7 @@ impl EvaluateContext for WebpackLoaderContext { &self, _state: &mut Self::State, data: Self::RequestMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result { match data { RequestMessage::Resolve { @@ -608,7 +608,7 @@ impl EvaluateContext for WebpackLoaderContext { } } - async fn finish(&self, state: Self::State, pool: &NodeJsPool) -> Result<()> { + async fn finish(&self, state: Self::State, pool: &EvaluatePool) -> Result<()> { let has_errors = state.iter().any(|log| log.log_type == LogType::Error); let has_warnings = state.iter().any(|log| log.log_type == LogType::Warn); if has_errors || has_warnings { diff --git a/turbopack/crates/turbopack-node/src/worker_pool/mod.rs b/turbopack/crates/turbopack-node/src/worker_pool/mod.rs new file mode 100644 index 0000000000000..ad69187022a0d --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/mod.rs @@ -0,0 +1,79 @@ +use std::path::PathBuf; + +use anyhow::Result; +use rustc_hash::FxHashMap; +use turbo_rcstr::RcStr; +use turbo_tasks::ResolvedVc; +use turbo_tasks_fs::FileSystemPath; + +use crate::{ + AssetsForSourceMapping, + evaluate::{EvaluateOperation, EvaluatePool, Operation}, + worker_pool::operation::{WorkerOperation, connect_to_worker, create_pool}, +}; + +mod operation; +mod worker_thread; + +#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)] +pub struct WorkerThreadPool { + cwd: PathBuf, + entrypoint: PathBuf, + env: FxHashMap, + concurrency: usize, + pub assets_for_source_mapping: ResolvedVc, + pub assets_root: FileSystemPath, + pub project_dir: FileSystemPath, +} + +impl WorkerThreadPool { + pub fn create( + cwd: PathBuf, + entrypoint: PathBuf, + env: FxHashMap, + assets_for_source_mapping: ResolvedVc, + assets_root: FileSystemPath, + project_dir: FileSystemPath, + concurrency: usize, + _debug: bool, + ) -> EvaluatePool { + EvaluatePool::new( + entrypoint.to_string_lossy().to_string().into(), + Box::new(Self { + cwd, + entrypoint, + env, + concurrency, + assets_for_source_mapping, + assets_root: assets_root.clone(), + project_dir: project_dir.clone(), + }), + assets_for_source_mapping, + assets_root, + project_dir, + ) + } +} + +#[async_trait::async_trait] +impl EvaluateOperation for WorkerThreadPool { + async fn operation(&self) -> Result> { + create_pool( + self.entrypoint.to_string_lossy().to_string(), + self.concurrency, + ) + .await?; + + let task_id = uuid::Uuid::new_v4().to_string(); + + let worker_id = connect_to_worker( + self.entrypoint.to_string_lossy().to_string(), + task_id.clone(), + ) + .await?; + + let worker_operation = WorkerOperation { task_id, worker_id }; + + Ok(Box::new(worker_operation)) + } +} diff --git a/turbopack/crates/turbopack-node/src/worker_pool/operation.rs b/turbopack/crates/turbopack-node/src/worker_pool/operation.rs new file mode 100644 index 0000000000000..c6f1bbc4d6a10 --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/operation.rs @@ -0,0 +1,141 @@ +use std::sync::LazyLock; + +use anyhow::{Context, Result}; +use async_channel::{Receiver, Sender, bounded, unbounded}; +use dashmap::DashMap; + +use crate::evaluate::Operation; + +pub(crate) struct MessageChannel { + sender: Sender, + receiver: Receiver, +} + +impl MessageChannel { + pub(super) fn unbounded() -> Self { + let (sender, receiver) = unbounded::(); + Self { sender, receiver } + } + + pub(super) fn bounded(cap: usize) -> Self { + let (sender, receiver) = bounded::(cap); + Self { sender, receiver } + } +} + +impl MessageChannel { + pub(crate) async fn send(&self, data: T) -> Result<()> { + Ok(self.sender.send(data).await?) + } + + pub(crate) async fn recv(&self) -> Result { + Ok(self.receiver.recv().await?) + } +} + +pub(crate) static POOL_REQUEST_CHANNEL: LazyLock> = + LazyLock::new(MessageChannel::unbounded); + +pub(crate) static POOL_CREATION_CHANNEL: LazyLock>> = + LazyLock::new(DashMap::new); + +pub(crate) async fn create_pool(filename: String, concurrency: usize) -> anyhow::Result<()> { + POOL_REQUEST_CHANNEL + .send(filename.clone()) + .await + .context("failed to send pool request")?; + + let mut created_worker_count = 0; + + { + let channel = POOL_CREATION_CHANNEL + .entry(filename.clone()) + .or_insert_with(|| MessageChannel::bounded(concurrency)); + + while created_worker_count < concurrency { + channel + .recv() + .await + .context("failed to recv worker creation")?; + created_worker_count += 1; + } + }; + + POOL_CREATION_CHANNEL.remove(&filename); + + Ok(()) +} + +pub(crate) static EVALUATION_REQUEST_CHANNAL: LazyLock>>> = + LazyLock::new(DashMap::new); + +pub(crate) static WORKER_REQUEST_CHANNAL: LazyLock>> = + LazyLock::new(DashMap::new); + +pub(crate) static WORKER_ACK_CHANNAL: LazyLock>> = + LazyLock::new(DashMap::new); + +pub(crate) async fn connect_to_worker(pool_id: String, task_id: String) -> Result { + let channel = WORKER_REQUEST_CHANNAL + .entry(pool_id.clone()) + .or_insert_with(MessageChannel::unbounded); + channel + .send(()) + .await + .context("failed to send evaluation request")?; + let worker_id = async move { + let channel = WORKER_ACK_CHANNAL + .entry(task_id.clone()) + .or_insert_with(MessageChannel::unbounded); + channel + .recv() + .await + .context("failed to recv evaluation ack") + } + .await?; + Ok(worker_id) +} + +pub(crate) static WORKER_ROUTED_CHANNEL: LazyLock>>> = + LazyLock::new(DashMap::new); + +pub(crate) async fn send_message_to_worker(worker_id: u32, data: Vec) -> Result<()> { + let entry = WORKER_ROUTED_CHANNEL + .entry(worker_id) + .or_insert_with(MessageChannel::unbounded); + entry + .send(data) + .await + .with_context(|| format!("failed to send message to worker {worker_id}"))?; + Ok(()) +} + +pub(crate) static TASK_ROUTERD_CHANNEL: LazyLock>>> = + LazyLock::new(DashMap::new); + +pub async fn recv_task_response(task_id: String) -> Result> { + let channel = TASK_ROUTERD_CHANNEL + .entry(task_id.clone()) + .or_insert_with(MessageChannel::unbounded); + let data = channel + .recv() + .await + .with_context(|| format!("failed to send message to worker {task_id}"))?; + Ok(data) +} + +pub(crate) struct WorkerOperation { + pub(crate) task_id: String, + pub(crate) worker_id: u32, +} + +#[async_trait::async_trait] +impl Operation for WorkerOperation { + async fn recv(&mut self) -> Result> { + recv_task_response(self.task_id.clone()).await + } + + async fn send(&mut self, data: Vec) -> Result<()> { + send_message_to_worker(self.worker_id, data).await + } +} diff --git a/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs b/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs new file mode 100644 index 0000000000000..22be69b3a13f9 --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs @@ -0,0 +1,100 @@ +use anyhow::Context; +use napi_derive::napi; + +use crate::worker_pool::operation::{ + EVALUATION_REQUEST_CHANNAL, MessageChannel, POOL_CREATION_CHANNEL, POOL_REQUEST_CHANNEL, + TASK_ROUTERD_CHANNEL, WORKER_REQUEST_CHANNAL, WORKER_ROUTED_CHANNEL, +}; + +#[napi] +pub async fn recv_pool_request() -> napi::Result { + Ok(POOL_REQUEST_CHANNEL + .recv() + .await + .context("failed to recv pool request")?) +} + +#[napi] +pub async fn notify_pool_created(filename: String) -> napi::Result<()> { + let channel = if let Some(channel) = POOL_CREATION_CHANNEL.get(&filename) { + channel + } else { + return Err(napi::Error::from_reason(format!( + "pool creation channel for {filename} not found" + ))); + }; + Ok(channel + .send(()) + .await + .context("failed to notify pool created")?) +} + +#[napi] +pub async fn recv_worker_request(pool_id: String) -> napi::Result<()> { + let channel = if let Some(channel) = WORKER_REQUEST_CHANNAL.get(&pool_id) { + channel + } else { + return Err(napi::Error::from_reason(format!( + "worker request channel for {pool_id} not found" + ))); + }; + Ok(channel + .send(()) + .await + .context("failed to recv worker request")?) +} + +#[napi] +pub async fn notify_worker_ack(pool_id: String) -> napi::Result<()> { + let channel = if let Some(channel) = POOL_CREATION_CHANNEL.get(&pool_id) { + channel + } else { + return Err(napi::Error::from_reason(format!( + "evaluation ack channel for {pool_id} not found" + ))); + }; + Ok(channel + .send(()) + .await + .context("failed to notify evaluation ack")?) +} + +#[napi] +pub async fn recv_evaluation(pool_id: String) -> napi::Result> { + let channel = if let Some(channel) = EVALUATION_REQUEST_CHANNAL.get(&pool_id) { + channel + } else { + return Err(napi::Error::from_reason(format!( + "evaluation request channel for {pool_id} not found" + ))); + }; + + Ok(channel + .recv() + .await + .context("failed to recv evaluate request")?) +} + +#[napi] +pub async fn recv_message_in_worker(worker_id: u32) -> napi::Result> { + let channel = WORKER_ROUTED_CHANNEL + .entry(worker_id) + .or_insert_with(MessageChannel::unbounded); + let data = channel + .recv() + .await + .with_context(|| format!("failed to recv message in worker {worker_id}"))?; + Ok(data) +} + +#[napi] +pub async fn send_task_response(task_id: String, data: Vec) -> napi::Result<()> { + let channel = TASK_ROUTERD_CHANNEL + .entry(task_id.clone()) + .or_insert_with(MessageChannel::unbounded); + channel + .send(data) + .await + .with_context(|| format!("failed to recv message in worker {task_id}"))?; + Ok(()) +}