Skip to content

Commit 9482897

Browse files
committed
[hyperactor] mesh: HostMesh::process, test_bench
Pull Request resolved: #1661 HostMesh::process constructs a host mesh by forking a process for each host in the mesh. It can be used to bootstrap a large host mesh from a single binary on a single host. The intent is to use for testing, benchmarking, tool development, etc. test_bench is meant as a template "test bench" for purposes of testing, development, and benchmarking. ghstack-source-id: 318984354 @exported-using-ghexport Differential Revision: [D85476028](https://our.internmc.facebook.com/intern/diff/D85476028/)
1 parent 59b5131 commit 9482897

File tree

5 files changed

+163
-11
lines changed

5 files changed

+163
-11
lines changed

hyperactor_mesh/examples/dining_philosophers.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use hyperactor::Named;
2323
use hyperactor::PortRef;
2424
use hyperactor::Unbind;
2525
use hyperactor::context;
26+
use hyperactor_mesh::bootstrap::BootstrapCommand;
2627
use hyperactor_mesh::comm::multicast::CastInfo;
2728
use hyperactor_mesh::extent;
2829
use hyperactor_mesh::proc_mesh::global_root_client;
@@ -233,6 +234,12 @@ impl Waiter {
233234
#[tokio::main]
234235
async fn main() -> Result<ExitCode> {
235236
hyperactor_telemetry::initialize_logging_for_test();
237+
238+
// Option: run as a local process mesh
239+
// let host_mesh = HostMesh::process(extent!(hosts = 1), BootstrapCommand::current().unwrap())
240+
// .await
241+
// .unwrap();
242+
236243
let host_mesh = HostMesh::local().await?;
237244

238245
let group_size = 5;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
/// This program is meant as a test bed for exercising the various
10+
/// (v1) mesh APIs.
11+
///
12+
/// It can also be used as the basis for benchmarks, functionality testing,
13+
/// etc.
14+
use std::collections::HashSet;
15+
use std::time::Duration;
16+
17+
use async_trait::async_trait;
18+
use hyperactor::Actor;
19+
use hyperactor::Bind;
20+
use hyperactor::Context;
21+
use hyperactor::Handler;
22+
use hyperactor::Named;
23+
use hyperactor::PortRef;
24+
use hyperactor::Unbind;
25+
use hyperactor_mesh::bootstrap::BootstrapCommand;
26+
use hyperactor_mesh::comm::multicast::CastInfo;
27+
use hyperactor_mesh::proc_mesh::global_root_client;
28+
use hyperactor_mesh::v1::host_mesh::HostMesh;
29+
use ndslice::Point;
30+
use ndslice::ViewExt;
31+
use ndslice::extent;
32+
use serde::Deserialize;
33+
use serde::Serialize;
34+
use tokio::time::Instant;
35+
36+
#[derive(Actor, Default, Debug)]
37+
#[hyperactor::export(
38+
spawn = true,
39+
handlers = [
40+
TestMessage { cast = true },
41+
],
42+
)]
43+
struct TestActor {}
44+
45+
#[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
46+
enum TestMessage {
47+
Ping(#[binding(include)] PortRef<Point>),
48+
}
49+
50+
#[async_trait]
51+
impl Handler<TestMessage> for TestActor {
52+
async fn handle(
53+
&mut self,
54+
cx: &Context<Self>,
55+
message: TestMessage,
56+
) -> Result<(), anyhow::Error> {
57+
match message {
58+
TestMessage::Ping(reply) => reply.send(cx, cx.cast_point())?,
59+
}
60+
Ok(())
61+
}
62+
}
63+
64+
#[tokio::main]
65+
async fn main() {
66+
hyperactor_telemetry::initialize_logging_for_test();
67+
68+
let host_mesh = HostMesh::process(extent!(hosts = 8), BootstrapCommand::current().unwrap())
69+
.await
70+
.unwrap();
71+
72+
let instance = global_root_client();
73+
74+
let proc_mesh = host_mesh
75+
.spawn(instance, "test", extent!(procs = 2))
76+
.await
77+
.unwrap();
78+
79+
let actor_mesh = proc_mesh
80+
.spawn::<TestActor>(instance, "test", &())
81+
.await
82+
.unwrap();
83+
84+
loop {
85+
let mut received = HashSet::new();
86+
let (port, mut rx) = instance.open_port();
87+
let begin = Instant::now();
88+
actor_mesh
89+
.cast(instance, TestMessage::Ping(port.bind()))
90+
.unwrap();
91+
while received.len() < actor_mesh.extent().num_ranks() {
92+
received.insert(rx.recv().await.unwrap());
93+
}
94+
95+
eprintln!("ping {}ms", begin.elapsed().as_millis());
96+
tokio::time::sleep(Duration::from_secs(1)).await;
97+
}
98+
}

hyperactor_mesh/src/bootstrap.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,6 +1357,22 @@ impl BootstrapCommand {
13571357
})
13581358
}
13591359

1360+
/// Create a new `Command` reflecting this bootstrap command
1361+
/// configuration.
1362+
pub fn new(&self) -> Command {
1363+
let mut cmd = Command::new(&self.program);
1364+
if let Some(arg0) = &self.arg0 {
1365+
cmd.arg0(arg0);
1366+
}
1367+
for arg in &self.args {
1368+
cmd.arg(arg);
1369+
}
1370+
for (k, v) in &self.env {
1371+
cmd.env(k, v);
1372+
}
1373+
cmd
1374+
}
1375+
13601376
/// Bootstrap command used for testing, invoking the Buck-built
13611377
/// `monarch/hyperactor_mesh/bootstrap` binary.
13621378
///
@@ -1672,16 +1688,7 @@ impl ProcManager for BootstrapProcManager {
16721688
callback_addr,
16731689
config: Some(config.client_config_override),
16741690
};
1675-
let mut cmd = Command::new(&self.command.program);
1676-
if let Some(arg0) = &self.command.arg0 {
1677-
cmd.arg0(arg0);
1678-
}
1679-
for arg in &self.command.args {
1680-
cmd.arg(arg);
1681-
}
1682-
for (k, v) in &self.command.env {
1683-
cmd.env(k, v);
1684-
}
1691+
let mut cmd = self.command.new();
16851692
cmd.env(
16861693
"HYPERACTOR_MESH_BOOTSTRAP_MODE",
16871694
mode.to_env_safe_string()

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ impl HostMesh {
225225
/// and to ensure that it is reached unconditionally.
226226
///
227227
/// This is intended for testing, development, examples.
228+
///
229+
/// TODO: fix up ownership
228230
pub async fn local() -> v1::Result<HostMesh> {
229231
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
230232
let err = boot.bootstrap().await;
@@ -251,6 +253,44 @@ impl HostMesh {
251253
Ok(HostMesh::take(host_mesh_ref))
252254
}
253255

256+
/// Create a new process-based host mesh. Each host is represented by a local process,
257+
/// which manages its set of procs. This is not a true host mesh the sense that each host
258+
/// is not independent. The intent of `process` is for testing, examples, and experimentation.
259+
///
260+
/// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
261+
/// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
262+
/// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
263+
/// is called early in the lifecycle of the process and reached unconditionally.
264+
///
265+
/// TODO: thread through ownership
266+
pub async fn process(extent: Extent, command: BootstrapCommand) -> v1::Result<HostMesh> {
267+
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
268+
let err = boot.bootstrap().await;
269+
tracing::error!("failed to bootstrap process host mesh process: {}", err);
270+
std::process::exit(1);
271+
}
272+
273+
let transport = config::global::get_cloned(DEFAULT_TRANSPORT);
274+
let mut hosts = Vec::with_capacity(extent.num_ranks());
275+
for _ in 0..extent.num_ranks() {
276+
// Note: this can be racy. Possibly we should have a callback channel.
277+
let addr = transport.any();
278+
let bootstrap = Bootstrap::Host {
279+
addr: addr.clone(),
280+
command: Some(command.clone()),
281+
config: Some(config::global::attrs()),
282+
};
283+
284+
let mut cmd = command.new();
285+
bootstrap.to_env(&mut cmd);
286+
cmd.spawn()?;
287+
hosts.push(HostRef(addr));
288+
}
289+
290+
let host_mesh_ref = HostMeshRef::new(Name::new("process"), extent.into(), hosts)?;
291+
Ok(HostMesh::take(host_mesh_ref))
292+
}
293+
254294
/// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
255295
/// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
256296
/// proc allocated by it.

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ fn actor_state_to_supervision_events(
366366
let events = match state.status {
367367
// If the actor was killed, it might not have a Failed status
368368
// or supervision events, and it can't tell us which rank
369-
// it was.
370369
resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
370+
// it was.
371371
if !events.is_empty() {
372372
events
373373
} else {

0 commit comments

Comments
 (0)