diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 81deeff1450f5..b0f08bacfa557 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -716,6 +716,9 @@ mod tests { AssetWatcher, Reader, }, loader::{AssetLoader, LoadContext}, + processor::{ + AssetProcessor, LogEntry, ProcessorTransactionLog, ProcessorTransactionLogFactory, + }, saver::AssetSaver, transformer::{AssetTransformer, TransformedAsset}, Asset, AssetApp, AssetEvent, AssetId, AssetLoadError, AssetLoadFailedEvent, AssetMode, @@ -741,6 +744,7 @@ mod tests { sync::Mutex, }; use bevy_reflect::TypePath; + use bevy_tasks::BoxedFuture; use core::{marker::PhantomData, time::Duration}; use crossbeam_channel::Sender; use futures_lite::AsyncWriteExt; @@ -2289,6 +2293,52 @@ mod tests { }, )); + /// A dummy transaction log factory that just creates [`FakeTransactionLog`]. + struct FakeTransactionLogFactory; + + impl ProcessorTransactionLogFactory for FakeTransactionLogFactory { + fn read(&self) -> BoxedFuture<'_, Result, BevyError>> { + Box::pin(async move { Ok(vec![]) }) + } + + fn create_new_log( + &self, + ) -> BoxedFuture<'_, Result, BevyError>> { + Box::pin(async move { Ok(Box::new(FakeTransactionLog) as _) }) + } + } + + /// A dummy transaction log that just drops every log. + // TODO: In the future it's possible for us to have a test of the transaction log, so making + // this more complex may be necessary. + struct FakeTransactionLog; + + impl ProcessorTransactionLog for FakeTransactionLog { + fn begin_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>> { + Box::pin(async move { Ok(()) }) + } + + fn end_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>> { + Box::pin(async move { Ok(()) }) + } + + fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> { + Box::pin(async move { Ok(()) }) + } + } + + app.world() + .resource::() + .data() + .set_log_factory(Box::new(FakeTransactionLogFactory)) + .unwrap(); + AppWithProcessor { app, source_dir, @@ -2379,7 +2429,7 @@ mod tests { } #[cfg(feature = "multi_threaded")] - use crate::processor::{AssetProcessor, LoadTransformAndSave}; + use crate::processor::LoadTransformAndSave; // The asset processor currently requires multi_threaded. #[cfg(feature = "multi_threaded")] diff --git a/crates/bevy_asset/src/processor/log.rs b/crates/bevy_asset/src/processor/log.rs index f4a0f81862166..0c20a2d091883 100644 --- a/crates/bevy_asset/src/processor/log.rs +++ b/crates/bevy_asset/src/processor/log.rs @@ -1,11 +1,14 @@ use crate::AssetPath; use alloc::{ + boxed::Box, format, string::{String, ToString}, vec::Vec, }; use async_fs::File; +use bevy_ecs::error::BevyError; use bevy_platform::collections::HashSet; +use bevy_tasks::BoxedFuture; use futures_lite::{AsyncReadExt, AsyncWriteExt}; use std::path::PathBuf; use thiserror::Error; @@ -13,22 +16,224 @@ use tracing::error; /// An in-memory representation of a single [`ProcessorTransactionLog`] entry. #[derive(Debug)] -pub(crate) enum LogEntry { +pub enum LogEntry { BeginProcessing(AssetPath<'static>), EndProcessing(AssetPath<'static>), UnrecoverableError, } +/// A factory of [`ProcessorTransactionLog`] that handles the state before the log has been started. +/// +/// This trait also assists in recovering from partial processing by fetching the previous state of +/// the transaction log. +pub trait ProcessorTransactionLogFactory: Send + Sync + 'static { + /// Reads all entries in a previous transaction log if present. + /// + /// If there is no previous transaction log, this method should return an empty Vec of entries. + fn read(&self) -> BoxedFuture<'_, Result, BevyError>>; + + /// Creates a new transaction log to write to. + /// + /// This should remove any previous entries if they exist. + fn create_new_log( + &self, + ) -> BoxedFuture<'_, Result, BevyError>>; +} + +/// A "write ahead" logger that helps ensure asset importing is transactional. +/// +/// Prior to processing an asset, we write to the log to indicate it has started. After processing +/// an asset, we write to the log to indicate it has finished. On startup, the log can be read +/// through [`ProcessorTransactionLogFactory`] to determine if any transactions were incomplete. +pub trait ProcessorTransactionLog: Send + Sync + 'static { + /// Logs the start of an asset being processed. + /// + /// If this is not followed at some point in the log by a closing + /// [`ProcessorTransactionLog::end_processing`], in the next run of the processor the asset + /// processing will be considered "incomplete" and it will be reprocessed. + fn begin_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>>; + + /// Logs the end of an asset being successfully processed. See + /// [`ProcessorTransactionLog::begin_processing`]. + fn end_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>>; + + /// Logs an unrecoverable error. + /// + /// On the next run of the processor, all assets will be regenerated. This should only be used + /// as a last resort. Every call to this should be considered with scrutiny and ideally replaced + /// with something more granular. + fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>>; +} + +/// Validate the previous state of the transaction log and determine any assets that need to be +/// reprocessed. +pub(crate) async fn validate_transaction_log( + log_factory: &dyn ProcessorTransactionLogFactory, +) -> Result<(), ValidateLogError> { + let mut transactions: HashSet> = Default::default(); + let mut errors: Vec = Vec::new(); + let entries = log_factory + .read() + .await + .map_err(ValidateLogError::ReadLogError)?; + for entry in entries { + match entry { + LogEntry::BeginProcessing(path) => { + // There should never be duplicate "start transactions" in a log + // Every start should be followed by: + // * nothing (if there was an abrupt stop) + // * an End (if the transaction was completed) + if !transactions.insert(path.clone()) { + errors.push(LogEntryError::DuplicateTransaction(path)); + } + } + LogEntry::EndProcessing(path) => { + if !transactions.remove(&path) { + errors.push(LogEntryError::EndedMissingTransaction(path)); + } + } + LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError), + } + } + for transaction in transactions { + errors.push(LogEntryError::UnfinishedTransaction(transaction)); + } + if !errors.is_empty() { + return Err(ValidateLogError::EntryErrors(errors)); + } + Ok(()) +} + +/// A transaction log factory that uses a file as its storage. +pub struct FileTransactionLogFactory { + /// The file path that the transaction log should write to. + pub file_path: PathBuf, +} + +const LOG_PATH: &str = "imported_assets/log"; + +impl Default for FileTransactionLogFactory { + fn default() -> Self { + #[cfg(not(target_arch = "wasm32"))] + let base_path = crate::io::file::get_base_path(); + #[cfg(target_arch = "wasm32")] + let base_path = PathBuf::new(); + let file_path = base_path.join(LOG_PATH); + Self { file_path } + } +} + +impl ProcessorTransactionLogFactory for FileTransactionLogFactory { + fn read(&self) -> BoxedFuture<'_, Result, BevyError>> { + let path = self.file_path.clone(); + Box::pin(async move { + let mut log_lines = Vec::new(); + let mut file = match File::open(path).await { + Ok(file) => file, + Err(err) => { + if err.kind() == futures_io::ErrorKind::NotFound { + // if the log file doesn't exist, this is equivalent to an empty file + return Ok(log_lines); + } + return Err(err.into()); + } + }; + let mut string = String::new(); + file.read_to_string(&mut string).await?; + for line in string.lines() { + if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) { + log_lines.push(LogEntry::BeginProcessing( + AssetPath::parse(path_str).into_owned(), + )); + } else if let Some(path_str) = line.strip_prefix(ENTRY_END) { + log_lines.push(LogEntry::EndProcessing( + AssetPath::parse(path_str).into_owned(), + )); + } else if line.is_empty() { + continue; + } else { + return Err(ReadLogError::InvalidLine(line.to_string()).into()); + } + } + Ok(log_lines) + }) + } + + fn create_new_log( + &self, + ) -> BoxedFuture<'_, Result, BevyError>> { + let path = self.file_path.clone(); + Box::pin(async move { + match async_fs::remove_file(&path).await { + Ok(_) => { /* successfully removed file */ } + Err(err) => { + // if the log file is not found, we assume we are starting in a fresh (or good) state + if err.kind() != futures_io::ErrorKind::NotFound { + error!("Failed to remove previous log file {}", err); + } + } + } + + if let Some(parent_folder) = path.parent() { + async_fs::create_dir_all(parent_folder).await?; + } + + Ok(Box::new(FileProcessorTransactionLog { + log_file: File::create(path).await?, + }) as _) + }) + } +} + /// A "write ahead" logger that helps ensure asset importing is transactional. /// /// Prior to processing an asset, we write to the log to indicate it has started /// After processing an asset, we write to the log to indicate it has finished. /// On startup, the log can be read to determine if any transactions were incomplete. -// TODO: this should be a trait -pub struct ProcessorTransactionLog { +struct FileProcessorTransactionLog { + /// The file to write logs to. log_file: File, } +impl FileProcessorTransactionLog { + /// Write `line` to the file and flush it. + async fn write(&mut self, line: &str) -> Result<(), BevyError> { + self.log_file.write_all(line.as_bytes()).await?; + self.log_file.flush().await?; + Ok(()) + } +} + +const ENTRY_BEGIN: &str = "Begin "; +const ENTRY_END: &str = "End "; +const UNRECOVERABLE_ERROR: &str = "UnrecoverableError"; + +impl ProcessorTransactionLog for FileProcessorTransactionLog { + fn begin_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>> { + Box::pin(async move { self.write(&format!("{ENTRY_BEGIN}{asset}\n")).await }) + } + + fn end_processing<'a>( + &'a mut self, + asset: &'a AssetPath<'_>, + ) -> BoxedFuture<'a, Result<(), BevyError>> { + Box::pin(async move { self.write(&format!("{ENTRY_END}{asset}\n")).await }) + } + + fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> { + Box::pin(async move { self.write(UNRECOVERABLE_ERROR).await }) + } +} + /// An error that occurs when reading from the [`ProcessorTransactionLog`] fails. #[derive(Error, Debug)] pub enum ReadLogError { @@ -45,9 +250,9 @@ pub enum ReadLogError { #[error( "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}" )] -pub struct WriteLogError { - log_entry: LogEntry, - error: futures_io::Error, +pub(crate) struct WriteLogError { + pub(crate) log_entry: LogEntry, + pub(crate) error: BevyError, } /// An error that occurs when validating the [`ProcessorTransactionLog`] fails. @@ -57,8 +262,8 @@ pub enum ValidateLogError { #[error("Encountered an unrecoverable error. All assets will be reprocessed.")] UnrecoverableError, /// A [`ReadLogError`]. - #[error(transparent)] - ReadLogError(#[from] ReadLogError), + #[error("Failed to read log entries: {0}")] + ReadLogError(BevyError), /// Duplicated process asset transactions occurred. #[error("Encountered a duplicate process asset transaction: {0:?}")] EntryErrors(Vec), @@ -77,147 +282,3 @@ pub enum LogEntryError { #[error("An asset started processing but never finished: {0}")] UnfinishedTransaction(AssetPath<'static>), } - -const LOG_PATH: &str = "imported_assets/log"; -const ENTRY_BEGIN: &str = "Begin "; -const ENTRY_END: &str = "End "; -const UNRECOVERABLE_ERROR: &str = "UnrecoverableError"; - -impl ProcessorTransactionLog { - fn full_log_path() -> PathBuf { - #[cfg(not(target_arch = "wasm32"))] - let base_path = crate::io::file::get_base_path(); - #[cfg(target_arch = "wasm32")] - let base_path = PathBuf::new(); - base_path.join(LOG_PATH) - } - /// Create a new, fresh log file. This will delete the previous log file if it exists. - pub(crate) async fn new() -> Result { - let path = Self::full_log_path(); - match async_fs::remove_file(&path).await { - Ok(_) => { /* successfully removed file */ } - Err(err) => { - // if the log file is not found, we assume we are starting in a fresh (or good) state - if err.kind() != futures_io::ErrorKind::NotFound { - error!("Failed to remove previous log file {}", err); - } - } - } - - if let Some(parent_folder) = path.parent() { - async_fs::create_dir_all(parent_folder).await?; - } - - Ok(Self { - log_file: File::create(path).await?, - }) - } - - pub(crate) async fn read() -> Result, ReadLogError> { - let mut log_lines = Vec::new(); - let mut file = match File::open(Self::full_log_path()).await { - Ok(file) => file, - Err(err) => { - if err.kind() == futures_io::ErrorKind::NotFound { - // if the log file doesn't exist, this is equivalent to an empty file - return Ok(log_lines); - } - return Err(err.into()); - } - }; - let mut string = String::new(); - file.read_to_string(&mut string).await?; - for line in string.lines() { - if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) { - log_lines.push(LogEntry::BeginProcessing( - AssetPath::parse(path_str).into_owned(), - )); - } else if let Some(path_str) = line.strip_prefix(ENTRY_END) { - log_lines.push(LogEntry::EndProcessing( - AssetPath::parse(path_str).into_owned(), - )); - } else if line.is_empty() { - continue; - } else { - return Err(ReadLogError::InvalidLine(line.to_string())); - } - } - Ok(log_lines) - } - - pub(crate) async fn validate() -> Result<(), ValidateLogError> { - let mut transactions: HashSet> = Default::default(); - let mut errors: Vec = Vec::new(); - let entries = Self::read().await?; - for entry in entries { - match entry { - LogEntry::BeginProcessing(path) => { - // There should never be duplicate "start transactions" in a log - // Every start should be followed by: - // * nothing (if there was an abrupt stop) - // * an End (if the transaction was completed) - if !transactions.insert(path.clone()) { - errors.push(LogEntryError::DuplicateTransaction(path)); - } - } - LogEntry::EndProcessing(path) => { - if !transactions.remove(&path) { - errors.push(LogEntryError::EndedMissingTransaction(path)); - } - } - LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError), - } - } - for transaction in transactions { - errors.push(LogEntryError::UnfinishedTransaction(transaction)); - } - if !errors.is_empty() { - return Err(ValidateLogError::EntryErrors(errors)); - } - Ok(()) - } - - /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`ProcessorTransactionLog::end_processing`], - /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed. - pub(crate) async fn begin_processing( - &mut self, - path: &AssetPath<'_>, - ) -> Result<(), WriteLogError> { - self.write(&format!("{ENTRY_BEGIN}{path}\n")) - .await - .map_err(|e| WriteLogError { - log_entry: LogEntry::BeginProcessing(path.clone_owned()), - error: e, - }) - } - - /// Logs the end of an asset being successfully processed. See [`ProcessorTransactionLog::begin_processing`]. - pub(crate) async fn end_processing( - &mut self, - path: &AssetPath<'_>, - ) -> Result<(), WriteLogError> { - self.write(&format!("{ENTRY_END}{path}\n")) - .await - .map_err(|e| WriteLogError { - log_entry: LogEntry::EndProcessing(path.clone_owned()), - error: e, - }) - } - - /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort. - /// Every call to this should be considered with scrutiny and ideally replaced with something more granular. - pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> { - self.write(UNRECOVERABLE_ERROR) - .await - .map_err(|e| WriteLogError { - log_entry: LogEntry::UnrecoverableError, - error: e, - }) - } - - async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> { - self.log_file.write_all(line.as_bytes()).await?; - self.log_file.flush().await?; - Ok(()) - } -} diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index f5f071cfb9561..79d7305da146f 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -65,7 +65,10 @@ use bevy_platform::{ use bevy_tasks::IoTaskPool; use futures_io::ErrorKind; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::Mutex, +}; use thiserror::Error; use tracing::{debug, error, trace, warn}; @@ -100,7 +103,13 @@ pub struct AssetProcessor { /// Internal data stored inside an [`AssetProcessor`]. pub struct AssetProcessorData { pub(crate) asset_infos: async_lock::RwLock, - log: async_lock::RwLock>, + /// The factory that creates the transaction log. + /// + /// Note: we use a regular Mutex instead of an async mutex since we expect users to only set + /// this once, and before the asset processor starts - there is no reason to await (and it + /// avoids needing to use [`block_on`](bevy_tasks::block_on) to set the factory). + log_factory: Mutex>>, + log: async_lock::RwLock>>, processors: RwLock>>, /// Default processors for file extensions default_processors: RwLock, &'static str>>, @@ -175,7 +184,13 @@ impl AssetProcessor { async fn log_unrecoverable(&self) { let mut log = self.data.log.write().await; let log = log.as_mut().unwrap(); - log.unrecoverable().await.unwrap(); + log.unrecoverable() + .await + .map_err(|error| WriteLogError { + log_entry: LogEntry::UnrecoverableError, + error, + }) + .unwrap(); } /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`], @@ -183,14 +198,26 @@ impl AssetProcessor { async fn log_begin_processing(&self, path: &AssetPath<'_>) { let mut log = self.data.log.write().await; let log = log.as_mut().unwrap(); - log.begin_processing(path).await.unwrap(); + log.begin_processing(path) + .await + .map_err(|error| WriteLogError { + log_entry: LogEntry::BeginProcessing(path.clone_owned()), + error, + }) + .unwrap(); } /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`]. async fn log_end_processing(&self, path: &AssetPath<'_>) { let mut log = self.data.log.write().await; let log = log.as_mut().unwrap(); - log.end_processing(path).await.unwrap(); + log.end_processing(path) + .await + .map_err(|error| WriteLogError { + log_entry: LogEntry::EndProcessing(path.clone_owned()), + error, + }) + .unwrap(); } /// Starts the processor in a background thread. @@ -996,7 +1023,16 @@ impl AssetProcessor { } async fn validate_transaction_log_and_recover(&self) { - if let Err(err) = ProcessorTransactionLog::validate().await { + let log_factory = self + .data + .log_factory + .lock() + .unwrap_or_else(PoisonError::into_inner) + // Take the log factory to indicate we've started and this should disable setting a new + // log factory. + .take() + .expect("the asset processor only starts once"); + if let Err(err) = validate_transaction_log(log_factory.as_ref()).await { let state_is_valid = match err { ValidateLogError::ReadLogError(err) => { error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}"); @@ -1074,7 +1110,7 @@ impl AssetProcessor { } } let mut log = self.data.log.write().await; - *log = match ProcessorTransactionLog::new().await { + *log = match log_factory.create_new_log().await { Ok(log) => Some(log), Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err), }; @@ -1098,6 +1134,7 @@ impl AssetProcessorData { initialized_sender, initialized_receiver, state: async_lock::RwLock::new(ProcessorState::Initializing), + log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))), log: Default::default(), processors: Default::default(), asset_infos: Default::default(), @@ -1105,6 +1142,28 @@ impl AssetProcessorData { } } + /// Sets the transaction log factory for the processor. + /// + /// If this is called after asset processing has begun (in the `Startup` schedule), it will + /// return an error. If not called, the default transaction log will be used. + pub fn set_log_factory( + &self, + factory: Box, + ) -> Result<(), SetTransactionLogFactoryError> { + let mut log_factory = self + .log_factory + .lock() + .unwrap_or_else(PoisonError::into_inner); + if log_factory.is_none() { + // This indicates the asset processor has already started, so setting the factory does + // nothing here. + return Err(SetTransactionLogFactoryError::AlreadyInUse); + } + + *log_factory = Some(factory); + Ok(()) + } + /// Returns a future that will not finish until the path has been processed. pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus { self.wait_until_initialized().await; @@ -1497,3 +1556,10 @@ pub enum InitializeError { #[error("Failed to validate asset log: {0}")] ValidateLogError(#[from] ValidateLogError), } + +/// An error when attempting to set the transaction log factory. +#[derive(Error, Debug)] +pub enum SetTransactionLogFactoryError { + #[error("Transaction log is already in use so setting the factory does nothing")] + AlreadyInUse, +}