Skip to content

Commit e535c79

Browse files
committed
feat(agent): implement exec detached mode
1 parent acf770f commit e535c79

File tree

4 files changed

+137
-74
lines changed

4 files changed

+137
-74
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

devolutions-session/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true }
4444

4545
[dependencies.now-proto-pdu]
4646
optional = true
47-
version = "0.3.2"
47+
git = "https://github.com/Devolutions/now-proto"
48+
branch = "feat/exec-detached"
4849
features = ["std"]
4950

5051
[target.'cfg(windows)'.build-dependencies]

devolutions-session/src/dvc/process.rs

Lines changed: 92 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ pub enum ServerChannelEvent {
9898
pub struct WinApiProcessCtx {
9999
session_id: u32,
100100

101-
io_notification_tx: Sender<ServerChannelEvent>,
102-
103101
stdout_read_pipe: Option<Pipe>,
104102
stderr_read_pipe: Option<Pipe>,
105103
stdin_write_pipe: Option<Pipe>,
@@ -123,7 +121,7 @@ impl WinApiProcessCtx {
123121
Ok(())
124122
}
125123

126-
pub fn process_cancel(&mut self) -> Result<(), ExecError> {
124+
pub fn process_cancel(&mut self, io_notification_tx: &Sender<ServerChannelEvent>) -> Result<(), ExecError> {
127125
info!(
128126
session_id = self.session_id,
129127
"Cancelling process execution by user request"
@@ -135,15 +133,18 @@ impl WinApiProcessCtx {
135133

136134
// Acknowledge client that cancel request has been processed
137135
// successfully.
138-
self.io_notification_tx
139-
.blocking_send(ServerChannelEvent::SessionCancelSuccess {
140-
session_id: self.session_id,
141-
})?;
136+
io_notification_tx.blocking_send(ServerChannelEvent::SessionCancelSuccess {
137+
session_id: self.session_id,
138+
})?;
142139

143140
Ok(())
144141
}
145142

146-
pub fn wait(mut self, mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>) -> Result<u32, ExecError> {
143+
pub fn wait(
144+
mut self,
145+
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
146+
io_notification_tx: Sender<ServerChannelEvent>,
147+
) -> Result<u32, ExecError> {
147148
let session_id = self.session_id;
148149

149150
info!(session_id, "Waiting for process to exit");
@@ -153,8 +154,7 @@ impl WinApiProcessCtx {
153154
const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0;
154155
const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1);
155156

156-
self.io_notification_tx
157-
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
157+
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
158158

159159
loop {
160160
// SAFETY: No preconditions.
@@ -179,7 +179,7 @@ impl WinApiProcessCtx {
179179
return Err(ExecError::Aborted);
180180
}
181181
ProcessIoInputEvent::CancelExecution => {
182-
self.process_cancel()?;
182+
self.process_cancel(&io_notification_tx)?;
183183

184184
// wait for process to exit
185185
continue;
@@ -209,6 +209,7 @@ impl WinApiProcessCtx {
209209
pub fn wait_with_io_redirection(
210210
mut self,
211211
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
212+
io_notification_tx: Sender<ServerChannelEvent>,
212213
) -> Result<u32, ExecError> {
213214
let session_id = self.session_id;
214215

@@ -277,8 +278,7 @@ impl WinApiProcessCtx {
277278

278279
// Signal client side about started execution
279280

280-
self.io_notification_tx
281-
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
281+
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
282282

283283
info!(session_id, "Process IO is ready for async loop execution");
284284
loop {
@@ -304,7 +304,7 @@ impl WinApiProcessCtx {
304304
return Err(ExecError::Aborted);
305305
}
306306
ProcessIoInputEvent::CancelExecution => {
307-
self.process_cancel()?;
307+
self.process_cancel(&io_notification_tx)?;
308308

309309
// wait for process to exit
310310
continue;
@@ -369,26 +369,24 @@ impl WinApiProcessCtx {
369369
// EOF on stdout pipe, close it and send EOF message to message_tx
370370
self.stdout_read_pipe = None;
371371

372-
self.io_notification_tx
373-
.blocking_send(ServerChannelEvent::SessionDataOut {
374-
session_id,
375-
stream: NowExecDataStreamKind::Stdout,
376-
last: true,
377-
data: Vec::new(),
378-
})?;
372+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
373+
session_id,
374+
stream: NowExecDataStreamKind::Stdout,
375+
last: true,
376+
data: Vec::new(),
377+
})?;
379378
}
380379
_code => return Err(err.into()),
381380
}
382381
continue;
383382
}
384383

385-
self.io_notification_tx
386-
.blocking_send(ServerChannelEvent::SessionDataOut {
387-
session_id,
388-
stream: NowExecDataStreamKind::Stdout,
389-
last: false,
390-
data: stdout_buffer[..bytes_read as usize].to_vec(),
391-
})?;
384+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
385+
session_id,
386+
stream: NowExecDataStreamKind::Stdout,
387+
last: false,
388+
data: stdout_buffer[..bytes_read as usize].to_vec(),
389+
})?;
392390

393391
// Schedule next overlapped read
394392
// SAFETY: pipe is valid to read from, as long as it is not closed.
@@ -432,26 +430,24 @@ impl WinApiProcessCtx {
432430
ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => {
433431
// EOF on stderr pipe, close it and send EOF message to message_tx
434432
self.stderr_read_pipe = None;
435-
self.io_notification_tx
436-
.blocking_send(ServerChannelEvent::SessionDataOut {
437-
session_id,
438-
stream: NowExecDataStreamKind::Stderr,
439-
last: true,
440-
data: Vec::new(),
441-
})?;
433+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
434+
session_id,
435+
stream: NowExecDataStreamKind::Stderr,
436+
last: true,
437+
data: Vec::new(),
438+
})?;
442439
}
443440
_code => return Err(err.into()),
444441
}
445442
continue;
446443
}
447444

448-
self.io_notification_tx
449-
.blocking_send(ServerChannelEvent::SessionDataOut {
450-
session_id,
451-
stream: NowExecDataStreamKind::Stderr,
452-
last: false,
453-
data: stderr_buffer[..bytes_read as usize].to_vec(),
454-
})?;
445+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
446+
session_id,
447+
stream: NowExecDataStreamKind::Stderr,
448+
last: false,
449+
data: stderr_buffer[..bytes_read as usize].to_vec(),
450+
})?;
455451

456452
// Schedule next overlapped read
457453
// SAFETY: pipe is valid to read from, as long as it is not closed.
@@ -527,12 +523,13 @@ impl WinApiProcessBuilder {
527523
self
528524
}
529525

530-
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
531-
pub fn run(
526+
/// Internal implementation for process execution.
527+
fn run_impl(
532528
mut self,
533529
session_id: u32,
534-
io_notification_tx: Sender<ServerChannelEvent>,
535-
) -> Result<WinApiProcess, ExecError> {
530+
io_notification_tx: Option<Sender<ServerChannelEvent>>,
531+
detached: bool,
532+
) -> Result<Option<WinApiProcess>, ExecError> {
536533
let command_line = format!("\"{}\" {}", self.executable, self.command_line)
537534
.trim_end()
538535
.to_owned();
@@ -557,31 +554,42 @@ impl WinApiProcessBuilder {
557554
let io_redirection = self.enable_io_redirection;
558555

559556
let process_ctx = if io_redirection {
560-
prepare_process_with_io_redirection(
561-
session_id,
562-
command_line,
563-
current_directory,
564-
self.env,
565-
io_notification_tx.clone(),
566-
)?
557+
prepare_process_with_io_redirection(session_id, command_line, current_directory, self.env)?
567558
} else {
568-
prepare_process(
569-
session_id,
570-
command_line,
571-
current_directory,
572-
self.env,
573-
io_notification_tx.clone(),
574-
)?
559+
prepare_process(session_id, command_line, current_directory, self.env)?
575560
};
576561

562+
if detached {
563+
// For detached mode, spawn a thread that waits for process exit and keeps temp files alive
564+
std::thread::spawn(move || {
565+
let _temp_files = temp_files; // Keep temp files alive
566+
567+
// Wait for process to exit (indefinitely)
568+
if let Err(error) = process_ctx.process.wait(None) {
569+
error!(%error, session_id, "Failed to wait for detached process");
570+
return;
571+
}
572+
573+
info!(session_id, "Detached process exited");
574+
575+
// Temp files will be cleaned up when this thread exits
576+
});
577+
578+
info!(session_id, "Detached process started successfully");
579+
return Ok(None);
580+
}
581+
577582
// Create channel for `task` -> `Process IO thread` communication
578583
let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?;
579584

585+
let io_notification_tx =
586+
io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");
587+
580588
let join_handle = std::thread::spawn(move || {
581589
let run_result = if io_redirection {
582-
process_ctx.wait_with_io_redirection(input_event_rx)
590+
process_ctx.wait_with_io_redirection(input_event_rx, io_notification_tx.clone())
583591
} else {
584-
process_ctx.wait(input_event_rx)
592+
process_ctx.wait(input_event_rx, io_notification_tx.clone())
585593
};
586594

587595
let notification = match run_result {
@@ -594,11 +602,30 @@ impl WinApiProcessBuilder {
594602
}
595603
});
596604

597-
Ok(WinApiProcess {
605+
Ok(Some(WinApiProcess {
598606
input_event_tx,
599607
join_handle,
600608
_temp_files: temp_files,
601-
})
609+
}))
610+
}
611+
612+
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
613+
pub fn run(
614+
self,
615+
session_id: u32,
616+
io_notification_tx: Sender<ServerChannelEvent>,
617+
) -> Result<WinApiProcess, ExecError> {
618+
Ok(self
619+
.run_impl(session_id, Some(io_notification_tx), false)?
620+
.expect("BUG: run_impl should return Some when detached=false"))
621+
}
622+
623+
/// Starts process in detached mode (fire-and-forget).
624+
/// No IO redirection. Process exit is monitored in a background thread to manage temp file cleanup.
625+
/// Returns immediately after spawning.
626+
pub fn run_detached(self, session_id: u32) -> Result<(), ExecError> {
627+
self.run_impl(session_id, None, true)?;
628+
Ok(())
602629
}
603630
}
604631

@@ -607,7 +634,6 @@ fn prepare_process(
607634
mut command_line: WideString,
608635
current_directory: WideString,
609636
env: HashMap<String, String>,
610-
io_notification_tx: Sender<ServerChannelEvent>,
611637
) -> Result<WinApiProcessCtx, ExecError> {
612638
let mut process_information = PROCESS_INFORMATION::default();
613639

@@ -620,6 +646,7 @@ fn prepare_process(
620646
let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?;
621647

622648
let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE;
649+
623650
if environment_block.is_some() {
624651
creation_flags |= CREATE_UNICODE_ENVIRONMENT;
625652
}
@@ -657,7 +684,6 @@ fn prepare_process(
657684

658685
Ok(WinApiProcessCtx {
659686
session_id,
660-
io_notification_tx,
661687
stdout_read_pipe: None,
662688
stderr_read_pipe: None,
663689
stdin_write_pipe: None,
@@ -671,7 +697,6 @@ fn prepare_process_with_io_redirection(
671697
mut command_line: WideString,
672698
current_directory: WideString,
673699
env: HashMap<String, String>,
674-
io_notification_tx: Sender<ServerChannelEvent>,
675700
) -> Result<WinApiProcessCtx, ExecError> {
676701
let mut process_information = PROCESS_INFORMATION::default();
677702

@@ -741,7 +766,6 @@ fn prepare_process_with_io_redirection(
741766

742767
let process_ctx = WinApiProcessCtx {
743768
session_id,
744-
io_notification_tx,
745769
stdout_read_pipe: Some(stdout_read_pipe),
746770
stderr_read_pipe: Some(stderr_read_pipe),
747771
stdin_write_pipe: Some(stdin_write_pipe),

0 commit comments

Comments
 (0)