-
Notifications
You must be signed in to change notification settings - Fork 185
virtio-pmem: migrate from VirtioQueueWorkerContext to direct VirtioQueue #2945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,39 +7,37 @@ | |
| pub mod resolver; | ||
|
|
||
| use anyhow::Context; | ||
| use async_trait::async_trait; | ||
| use futures::StreamExt; | ||
| use guestmem::GuestMemory; | ||
| use inspect::InspectMut; | ||
| use pal_async::wait::PolledWait; | ||
| use std::fs; | ||
| use std::sync::Arc; | ||
| use std::task::Poll; | ||
| use std::task::ready; | ||
| use task_control::AsyncRun; | ||
| use task_control::Cancelled; | ||
| use task_control::InspectTaskMut; | ||
| use task_control::StopTask; | ||
| use task_control::TaskControl; | ||
| use virtio::DeviceTraits; | ||
| use virtio::DeviceTraitsSharedMemory; | ||
| use virtio::Resources; | ||
| use virtio::VirtioDevice; | ||
| use virtio::VirtioQueue; | ||
| use virtio::VirtioQueueCallbackWork; | ||
| use virtio::VirtioQueueState; | ||
| use virtio::VirtioQueueWorker; | ||
| use virtio::VirtioQueueWorkerContext; | ||
| use virtio::spec::VirtioDeviceFeatures; | ||
| use vmcore::vm_task::VmTaskDriver; | ||
| use vmcore::vm_task::VmTaskDriverSource; | ||
|
|
||
| #[derive(InspectMut)] | ||
| pub struct Device { | ||
| driver: VmTaskDriver, | ||
| file: Arc<fs::File>, | ||
| #[inspect(skip)] | ||
| mappable: sparse_mmap::Mappable, | ||
| len: u64, | ||
| writable: bool, | ||
| #[inspect(skip)] | ||
| worker: Option<TaskControl<VirtioQueueWorker, VirtioQueueState>>, | ||
| memory: GuestMemory, | ||
| #[inspect(skip)] | ||
| exit_event: event_listener::Event, | ||
| #[inspect(mut)] | ||
| worker: TaskControl<PmemWorker, PmemQueue>, | ||
| } | ||
|
|
||
| impl Device { | ||
|
|
@@ -55,13 +53,14 @@ impl Device { | |
| .context("failed to create file mapping")?; | ||
| Ok(Self { | ||
| driver: driver_source.simple(), | ||
| file: Arc::new(file), | ||
| worker: TaskControl::new(PmemWorker { | ||
| writable, | ||
| file, | ||
| mem: memory, | ||
| }), | ||
| mappable, | ||
| len, | ||
| writable, | ||
| worker: None, | ||
| memory, | ||
| exit_event: event_listener::Event::new(), | ||
| }) | ||
| } | ||
| } | ||
|
|
@@ -95,7 +94,6 @@ impl VirtioDevice for Device { | |
| fn write_registers_u32(&mut self, _offset: u16, _val: u32) {} | ||
|
|
||
| fn enable(&mut self, mut resources: Resources) -> anyhow::Result<()> { | ||
| assert!(self.worker.is_none()); | ||
| if !resources.queues[0].params.enable { | ||
| return Ok(()); | ||
| } | ||
|
|
@@ -109,76 +107,102 @@ impl VirtioDevice for Device { | |
| .map(0, &self.mappable, 0, self.len as usize, self.writable) | ||
| .context("failed to map shared memory region")?; | ||
|
|
||
| self.worker = { | ||
| let worker = PmemWorker { | ||
| writable: self.writable, | ||
| file: self.file.clone(), | ||
| mem: self.memory.clone(), | ||
| }; | ||
|
|
||
| let worker = VirtioQueueWorker::new(self.driver.clone(), Box::new(worker)); | ||
| Some(worker.into_running_task( | ||
| "virtio-pmem-queue".to_string(), | ||
| self.memory.clone(), | ||
| resources.features, | ||
| resources.queues.remove(0), | ||
| self.exit_event.listen(), | ||
| )) | ||
| }; | ||
| let qr = resources.queues.remove(0); | ||
| let queue_event = | ||
| PolledWait::new(&self.driver, qr.event).context("failed to create polled wait")?; | ||
| let queue = VirtioQueue::new( | ||
|
Comment on lines
+110
to
+113
|
||
| resources.features, | ||
| qr.params, | ||
| self.worker.task().mem.clone(), | ||
| qr.notify, | ||
| queue_event, | ||
| ) | ||
| .context("failed to create virtio queue")?; | ||
|
|
||
| self.worker.insert( | ||
| self.driver.clone(), | ||
| "virtio-pmem-queue", | ||
| PmemQueue { queue }, | ||
| ); | ||
| self.worker.start(); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn poll_disable(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { | ||
| self.exit_event.notify(usize::MAX); | ||
| if let Some(worker) = &mut self.worker { | ||
| ready!(worker.poll_stop(cx)); | ||
| ready!(self.worker.poll_stop(cx)); | ||
| if self.worker.has_state() { | ||
| self.worker.remove(); | ||
| } | ||
| self.worker = None; | ||
| Poll::Ready(()) | ||
| } | ||
| } | ||
|
|
||
| #[derive(InspectMut)] | ||
| struct PmemWorker { | ||
| writable: bool, | ||
| file: Arc<fs::File>, | ||
| file: fs::File, | ||
| mem: GuestMemory, | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl VirtioQueueWorkerContext for PmemWorker { | ||
| async fn process_work(&mut self, work: anyhow::Result<VirtioQueueCallbackWork>) -> bool { | ||
| if let Err(err) = work { | ||
| tracing::error!(err = err.as_ref() as &dyn std::error::Error, "queue error"); | ||
| return false; | ||
| } | ||
| impl InspectTaskMut<PmemQueue> for PmemWorker { | ||
| fn inspect_mut(&mut self, req: inspect::Request<'_>, state: Option<&mut PmemQueue>) { | ||
| req.respond().merge(self).merge(state); | ||
| } | ||
| } | ||
|
|
||
| let mut work = work.unwrap(); | ||
| let mut req = [0; 4]; | ||
| let err = match work.read(&self.mem, &mut req) { | ||
| Ok(_) => match u32::from_le_bytes(req) { | ||
| 0 if !self.writable => { | ||
| // Ignore the request for read-only devices. | ||
| 0 | ||
| #[derive(InspectMut)] | ||
| struct PmemQueue { | ||
| queue: VirtioQueue, | ||
| } | ||
|
|
||
| impl AsyncRun<PmemQueue> for PmemWorker { | ||
| async fn run( | ||
| &mut self, | ||
| stop: &mut StopTask<'_>, | ||
| state: &mut PmemQueue, | ||
| ) -> Result<(), Cancelled> { | ||
| loop { | ||
| let work = stop.until_stopped(state.queue.next()).await?; | ||
| let Some(work) = work else { break }; | ||
| match work { | ||
| Ok(work) => { | ||
| process_pmem_request(self, work); | ||
| } | ||
| Err(err) => { | ||
| tracing::error!(error = &err as &dyn std::error::Error, "queue error"); | ||
| break; | ||
| } | ||
| 0 => match self.file.sync_all() { | ||
| Ok(()) => 0, | ||
| Err(err) => { | ||
| tracing::error!(error = &err as &dyn std::error::Error, "flush error"); | ||
| 1 | ||
| } | ||
| }, | ||
| n => { | ||
| tracing::error!(n, "unsupported request"); | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| fn process_pmem_request(worker: &PmemWorker, mut work: VirtioQueueCallbackWork) { | ||
| let mut req = [0; 4]; | ||
| let err = match work.read(&worker.mem, &mut req) { | ||
| Ok(_) => match u32::from_le_bytes(req) { | ||
| 0 if !worker.writable => { | ||
| // Ignore the request for read-only devices. | ||
| 0 | ||
| } | ||
| 0 => match worker.file.sync_all() { | ||
| Ok(()) => 0, | ||
| Err(err) => { | ||
| tracing::error!(error = &err as &dyn std::error::Error, "flush error"); | ||
| 1 | ||
| } | ||
|
Comment on lines
+189
to
194
|
||
| }, | ||
| Err(err) => { | ||
| tracing::error!(error = &err as &dyn std::error::Error, "invalid descriptor"); | ||
| n => { | ||
| tracing::error!(n, "unsupported request"); | ||
| 1 | ||
| } | ||
| }; | ||
| let _ = work.write(&self.mem, &u32::to_le_bytes(err)); | ||
| work.complete(4); | ||
| true | ||
| } | ||
| }, | ||
| Err(err) => { | ||
| tracing::error!(error = &err as &dyn std::error::Error, "invalid descriptor"); | ||
| 1 | ||
| } | ||
| }; | ||
| let _ = work.write(&worker.mem, &u32::to_le_bytes(err)); | ||
| work.complete(4); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size_of::<PmemConfig>()is used later in this file (intraits()), butsize_ofis not in scope here (nouse core::mem::size_of/std::mem::size_ofand it’s not in the Rust prelude). This should fail to compile unless it’s fully qualified or imported.