Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions hyperactor_mesh/examples/dining_philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::Unbind;
use hyperactor::context;
use hyperactor_mesh::bootstrap::BootstrapCommand;
use hyperactor_mesh::comm::multicast::CastInfo;
use hyperactor_mesh::extent;
use hyperactor_mesh::proc_mesh::global_root_client;
Expand Down Expand Up @@ -233,6 +234,12 @@ impl Waiter {
#[tokio::main]
async fn main() -> Result<ExitCode> {
hyperactor_telemetry::initialize_logging_for_test();

// Option: run as a local process mesh
// let host_mesh = HostMesh::process(extent!(hosts = 1), BootstrapCommand::current().unwrap())
// .await
// .unwrap();

let host_mesh = HostMesh::local().await?;

let group_size = 5;
Expand Down
98 changes: 98 additions & 0 deletions hyperactor_mesh/examples/test_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

/// This program is meant as a test bed for exercising the various
/// (v1) mesh APIs.
///
/// It can also be used as the basis for benchmarks, functionality testing,
/// etc.
use std::collections::HashSet;
use std::time::Duration;

use async_trait::async_trait;
use hyperactor::Actor;
use hyperactor::Bind;
use hyperactor::Context;
use hyperactor::Handler;
use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::Unbind;
use hyperactor_mesh::bootstrap::BootstrapCommand;
use hyperactor_mesh::comm::multicast::CastInfo;
use hyperactor_mesh::proc_mesh::global_root_client;
use hyperactor_mesh::v1::host_mesh::HostMesh;
use ndslice::Point;
use ndslice::ViewExt;
use ndslice::extent;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::Instant;

#[derive(Actor, Default, Debug)]
#[hyperactor::export(
spawn = true,
handlers = [
TestMessage { cast = true },
],
)]
struct TestActor {}

#[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
enum TestMessage {
Ping(#[binding(include)] PortRef<Point>),
}

#[async_trait]
impl Handler<TestMessage> for TestActor {
async fn handle(
&mut self,
cx: &Context<Self>,
message: TestMessage,
) -> Result<(), anyhow::Error> {
match message {
TestMessage::Ping(reply) => reply.send(cx, cx.cast_point())?,
}
Ok(())
}
}

#[tokio::main]
async fn main() {
hyperactor_telemetry::initialize_logging_for_test();

let host_mesh = HostMesh::process(extent!(hosts = 8), BootstrapCommand::current().unwrap())
.await
.unwrap();

let instance = global_root_client();

let proc_mesh = host_mesh
.spawn(instance, "test", extent!(procs = 2))
.await
.unwrap();

let actor_mesh = proc_mesh
.spawn::<TestActor>(instance, "test", &())
.await
.unwrap();

loop {
let mut received = HashSet::new();
let (port, mut rx) = instance.open_port();
let begin = Instant::now();
actor_mesh
.cast(instance, TestMessage::Ping(port.bind()))
.unwrap();
while received.len() < actor_mesh.extent().num_ranks() {
received.insert(rx.recv().await.unwrap());
}

eprintln!("ping {}ms", begin.elapsed().as_millis());
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
27 changes: 17 additions & 10 deletions hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,22 @@ impl BootstrapCommand {
})
}

/// Create a new `Command` reflecting this bootstrap command
/// configuration.
pub fn new(&self) -> Command {
let mut cmd = Command::new(&self.program);
if let Some(arg0) = &self.arg0 {
cmd.arg0(arg0);
}
for arg in &self.args {
cmd.arg(arg);
}
for (k, v) in &self.env {
cmd.env(k, v);
}
cmd
}

/// Bootstrap command used for testing, invoking the Buck-built
/// `monarch/hyperactor_mesh/bootstrap` binary.
///
Expand Down Expand Up @@ -1674,16 +1690,7 @@ impl ProcManager for BootstrapProcManager {
callback_addr,
config: Some(config.client_config_override),
};
let mut cmd = Command::new(&self.command.program);
if let Some(arg0) = &self.command.arg0 {
cmd.arg0(arg0);
}
for arg in &self.command.args {
cmd.arg(arg);
}
for (k, v) in &self.command.env {
cmd.env(k, v);
}
let mut cmd = self.command.new();
cmd.env(
"HYPERACTOR_MESH_BOOTSTRAP_MODE",
mode.to_env_safe_string()
Expand Down
40 changes: 40 additions & 0 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ impl HostMesh {
/// and to ensure that it is reached unconditionally.
///
/// This is intended for testing, development, examples.
///
/// TODO: fix up ownership
pub async fn local() -> v1::Result<HostMesh> {
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
let err = boot.bootstrap().await;
Expand All @@ -251,6 +253,44 @@ impl HostMesh {
Ok(HostMesh::take(host_mesh_ref))
}

/// Create a new process-based host mesh. Each host is represented by a local process,
/// which manages its set of procs. This is not a true host mesh the sense that each host
/// is not independent. The intent of `process` is for testing, examples, and experimentation.
///
/// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
/// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
/// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
/// is called early in the lifecycle of the process and reached unconditionally.
///
/// TODO: thread through ownership
pub async fn process(extent: Extent, command: BootstrapCommand) -> v1::Result<HostMesh> {
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
let err = boot.bootstrap().await;
tracing::error!("failed to bootstrap process host mesh process: {}", err);
std::process::exit(1);
}

let transport = config::global::get_cloned(DEFAULT_TRANSPORT);
let mut hosts = Vec::with_capacity(extent.num_ranks());
for _ in 0..extent.num_ranks() {
// Note: this can be racy. Possibly we should have a callback channel.
let addr = transport.any();
let bootstrap = Bootstrap::Host {
addr: addr.clone(),
command: Some(command.clone()),
config: Some(config::global::attrs()),
};

let mut cmd = command.new();
bootstrap.to_env(&mut cmd);
cmd.spawn()?;
hosts.push(HostRef(addr));
}

let host_mesh_ref = HostMeshRef::new(Name::new("process"), extent.into(), hosts)?;
Ok(HostMesh::take(host_mesh_ref))
}

/// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
/// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
/// proc allocated by it.
Expand Down
2 changes: 1 addition & 1 deletion monarch_hyperactor/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ fn actor_state_to_supervision_events(
let events = match state.status {
// If the actor was killed, it might not have a Failed status
// or supervision events, and it can't tell us which rank
// it was.
resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
// it was.
if !events.is_empty() {
events
} else {
Expand Down