Skip to content

Commit 59b5131

Browse files
committed
[hyperactor] HostMesh::fork; port 'dining philosophers' to v1
Pull Request resolved: #1660 HostMesh::fork provides a way to create a singleton host mesh from the current process. The process itself becomes the host manager, spawning procs using the current command, reaching HostMesh::fork itself. This allows us to easily create new host meshes for examples, testing, etc. This is then used to port the 'dining philosophers' example to the v1 API. ghstack-source-id: 318984351 @exported-using-ghexport Differential Revision: [D85476026](https://our.internmc.facebook.com/intern/diff/D85476026/)
1 parent 2a48809 commit 59b5131

File tree

3 files changed

+101
-51
lines changed

3 files changed

+101
-51
lines changed

hyperactor_mesh/examples/dining_philosophers.rs

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
/// A naive implementation of the Dining Philosophers problem using Hyperactor.
1010
/// https://en.wikipedia.org/wiki/Dining_philosophers_problem
1111
use std::collections::HashMap;
12+
use std::ops::Deref;
1213
use std::process::ExitCode;
1314

1415
use anyhow::Result;
@@ -20,19 +21,14 @@ use hyperactor::Handler;
2021
use hyperactor::Instance;
2122
use hyperactor::Named;
2223
use hyperactor::PortRef;
23-
use hyperactor::Proc;
2424
use hyperactor::Unbind;
25-
use hyperactor::channel::ChannelTransport;
26-
use hyperactor_mesh::ProcMesh;
27-
use hyperactor_mesh::actor_mesh::ActorMesh;
28-
use hyperactor_mesh::alloc::AllocSpec;
29-
use hyperactor_mesh::alloc::Allocator;
30-
use hyperactor_mesh::alloc::LocalAllocator;
25+
use hyperactor::context;
3126
use hyperactor_mesh::comm::multicast::CastInfo;
3227
use hyperactor_mesh::extent;
33-
use hyperactor_mesh::selection::dsl::all;
34-
use hyperactor_mesh::selection::dsl::true_;
35-
use ndslice::selection::selection_from;
28+
use hyperactor_mesh::proc_mesh::global_root_client;
29+
use hyperactor_mesh::v1::ActorMeshRef;
30+
use hyperactor_mesh::v1::host_mesh::HostMesh;
31+
use ndslice::ViewExt;
3632
use serde::Deserialize;
3733
use serde::Serialize;
3834
use tokio::sync::OnceCell;
@@ -124,9 +120,11 @@ impl PhilosopherActor {
124120

125121
async fn release_chopsticks(&mut self, cx: &Instance<Self>) -> Result<()> {
126122
let (left, right) = self.chopstick_indices();
127-
eprintln!(
123+
tracing::debug!(
128124
"philosopher {} releasing chopsticks, {} and {}",
129-
self.rank, left, right
125+
self.rank,
126+
left,
127+
right
130128
);
131129
self.waiter
132130
.get()
@@ -145,14 +143,16 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
145143
message: PhilosopherMessage,
146144
) -> Result<(), anyhow::Error> {
147145
let point = cx.cast_point();
148-
self.rank = point.rank();
149146
match message {
150147
PhilosopherMessage::Start(waiter) => {
151148
self.waiter.set(waiter)?;
152149
self.request_chopsticks(cx).await?;
150+
// Start is always broadcasted to all philosophers; so this is
151+
// our global rank.
152+
self.rank = point.rank();
153153
}
154154
PhilosopherMessage::GrantChopstick(chopstick) => {
155-
eprintln!("philosopher {} granted chopstick {}", self.rank, chopstick);
155+
tracing::debug!("philosopher {} granted chopstick {}", self.rank, chopstick);
156156
let (left, right) = self.chopstick_indices();
157157
if left == chopstick {
158158
self.chopsticks = (ChopstickStatus::Granted, self.chopsticks.1.clone());
@@ -162,7 +162,7 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
162162
unreachable!("shouldn't be granted a chopstick that is not left or right");
163163
}
164164
if self.chopsticks == (ChopstickStatus::Granted, ChopstickStatus::Granted) {
165-
eprintln!("philosopher {} starts dining", self.rank);
165+
tracing::debug!("philosopher {} starts dining", self.rank);
166166
self.release_chopsticks(cx).await?;
167167
self.request_chopsticks(cx).await?;
168168
}
@@ -172,20 +172,17 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
172172
}
173173
}
174174

175-
struct Waiter<A> {
175+
struct Waiter {
176176
/// A map from chopstick to the rank of the philosopher who holds it.
177177
chopstick_assignments: HashMap<usize, usize>,
178178
/// A map from chopstick to the rank of the philosopher who requested it.
179179
chopstick_requests: HashMap<usize, usize>,
180180
/// ActorMesh of the philosophers.
181-
philosophers: A,
181+
philosophers: ActorMeshRef<PhilosopherActor>,
182182
}
183183

184-
impl<A> Waiter<A>
185-
where
186-
A: ActorMesh<Actor = PhilosopherActor>,
187-
{
188-
fn new(philosophers: A) -> Self {
184+
impl Waiter {
185+
fn new(philosophers: ActorMeshRef<PhilosopherActor>) -> Self {
189186
Self {
190187
chopstick_assignments: Default::default(),
191188
chopstick_requests: Default::default(),
@@ -202,68 +199,70 @@ where
202199
self.chopstick_assignments.insert(chopstick, rank);
203200
}
204201

205-
fn handle_request_chopstick(&mut self, rank: usize, chopstick: usize) -> Result<()> {
202+
fn handle_request_chopstick(
203+
&mut self,
204+
cx: &impl context::Actor,
205+
rank: usize,
206+
chopstick: usize,
207+
) -> Result<()> {
206208
if self.is_chopstick_available(chopstick) {
207209
self.grant_chopstick(chopstick, rank);
208-
self.philosophers.cast(
209-
self.philosophers.proc_mesh().client(),
210-
selection_from(self.philosophers.shape(), &[("replica", rank..rank + 1)])?,
211-
PhilosopherMessage::GrantChopstick(chopstick),
212-
)?
210+
self.philosophers
211+
.range("replica", rank)?
212+
.cast(cx, PhilosopherMessage::GrantChopstick(chopstick))?
213213
} else {
214214
self.chopstick_requests.insert(chopstick, rank);
215215
}
216216
Ok(())
217217
}
218218

219-
fn handle_release_chopstick(&mut self, chopstick: usize) -> Result<()> {
219+
fn handle_release_chopstick(
220+
&mut self,
221+
cx: &impl context::Actor,
222+
chopstick: usize,
223+
) -> Result<()> {
220224
self.chopstick_assignments.remove(&chopstick);
221225
if let Some(rank) = self.chopstick_requests.remove(&chopstick) {
222226
// now just handle the request again to grant the chopstick
223-
self.handle_request_chopstick(rank, chopstick)?;
227+
self.handle_request_chopstick(cx, rank, chopstick)?;
224228
}
225229
Ok(())
226230
}
227231
}
228232

229233
#[tokio::main]
230234
async fn main() -> Result<ExitCode> {
235+
hyperactor_telemetry::initialize_logging_for_test();
236+
let host_mesh = HostMesh::local().await?;
237+
231238
let group_size = 5;
232-
let alloc = LocalAllocator
233-
.allocate(AllocSpec {
234-
extent: extent! {replica = group_size},
235-
constraints: Default::default(),
236-
proc_name: None,
237-
transport: ChannelTransport::Local,
238-
})
239+
let instance = global_root_client();
240+
let proc_mesh = host_mesh
241+
.spawn(instance, "philosophers", extent!(replica = group_size))
239242
.await?;
240243

241-
let (instance, _) = Proc::local().instance("client").unwrap();
242-
243-
let proc_mesh = ProcMesh::allocate(alloc).await?;
244244
let params = PhilosopherActorParams { size: group_size };
245245
let actor_mesh = proc_mesh
246246
.spawn::<PhilosopherActor>(&instance, "philosopher", &params)
247247
.await?;
248-
let (dining_message_handle, mut dining_message_rx) = proc_mesh.client().open_port();
248+
let (dining_message_handle, mut dining_message_rx) = instance.open_port();
249249
actor_mesh
250250
.cast(
251-
proc_mesh.client(),
252-
all(true_()),
251+
instance,
253252
PhilosopherMessage::Start(dining_message_handle.bind()),
254253
)
255254
.unwrap();
256-
let mut waiter = Waiter::new(actor_mesh);
255+
let mut waiter = Waiter::new(actor_mesh.deref().clone());
257256
while let Ok(message) = dining_message_rx.recv().await {
258-
eprintln!("waiter received message: {:?}", &message);
257+
tracing::debug!("waiter received message: {:?}", &message);
259258
match message {
260259
WaiterMessage::RequestChopsticks((rank, left, right)) => {
261-
waiter.handle_request_chopstick(rank, left)?;
262-
waiter.handle_request_chopstick(rank, right)?;
260+
waiter.handle_request_chopstick(instance, rank, left)?;
261+
waiter.handle_request_chopstick(instance, rank, right)?;
263262
}
264263
WaiterMessage::ReleaseChopsticks((left, right)) => {
265-
waiter.handle_release_chopstick(left)?;
266-
waiter.handle_release_chopstick(right)?;
264+
waiter.handle_release_chopstick(instance, left)?;
265+
waiter.handle_release_chopstick(instance, right)?;
267266
}
268267
}
269268
let mut sorted_chopstick_assignments = waiter
@@ -272,7 +271,7 @@ async fn main() -> Result<ExitCode> {
272271
.map(|(k, v)| (*k, *v))
273272
.collect::<Vec<_>>();
274273
sorted_chopstick_assignments.sort();
275-
eprintln!(
274+
tracing::debug!(
276275
"assignments [(CHO, PHI)]: {:?}",
277276
sorted_chopstick_assignments
278277
);
@@ -282,7 +281,7 @@ async fn main() -> Result<ExitCode> {
282281
.map(|(k, v)| (*k, *v))
283282
.collect::<Vec<_>>();
284283
sorted_chopstick_requests.sort();
285-
eprintln!(
284+
tracing::debug!(
286285
"pending requests [(CHO, PHI)]:: {:?}",
287286
sorted_chopstick_requests
288287
);

hyperactor_mesh/src/v1.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod testactor;
1717
pub mod testing;
1818
pub mod value_mesh;
1919

20+
use std::io;
2021
use std::str::FromStr;
2122

2223
pub use actor_mesh::ActorMesh;
@@ -25,6 +26,7 @@ use enum_as_inner::EnumAsInner;
2526
pub use host_mesh::HostMeshRef;
2627
use hyperactor::ActorId;
2728
use hyperactor::ActorRef;
29+
use hyperactor::host::HostError;
2830
use hyperactor::mailbox::MailboxSenderError;
2931
use ndslice::view;
3032
pub use proc_mesh::ProcMesh;
@@ -138,8 +140,17 @@ pub enum Error {
138140
)]
139141
ActorStopError { statuses: RankedValues<Status> },
140142

143+
#[error("error spawning actor: {0}")]
144+
SingletonActorSpawnError(anyhow::Error),
145+
141146
#[error("error: {0} does not exist")]
142147
NotExist(Name),
148+
149+
#[error(transparent)]
150+
Io(#[from] io::Error),
151+
152+
#[error(transparent)]
153+
Host(#[from] HostError),
143154
}
144155

145156
/// Errors that occur during serialization and deserialization.

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use hyperactor::config;
1414
use hyperactor::config::CONFIG;
1515
use hyperactor::config::ConfigAttr;
1616
use hyperactor::declare_attrs;
17+
use hyperactor::host::Host;
1718
use ndslice::view::CollectMeshExt;
1819

1920
pub mod mesh_agent;
@@ -39,8 +40,11 @@ use ndslice::view::RegionParseError;
3940
use serde::Deserialize;
4041
use serde::Serialize;
4142

43+
use crate::Bootstrap;
4244
use crate::alloc::Alloc;
4345
use crate::bootstrap::BootstrapCommand;
46+
use crate::bootstrap::BootstrapProcManager;
47+
use crate::proc_mesh::DEFAULT_TRANSPORT;
4448
use crate::resource;
4549
use crate::resource::CreateOrUpdateClient;
4650
use crate::resource::GetRankStatus;
@@ -55,6 +59,7 @@ use crate::v1::ProcMesh;
5559
use crate::v1::ProcMeshRef;
5660
use crate::v1::StatusMesh;
5761
use crate::v1::ValueMesh;
62+
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
5863
pub use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
5964
use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline;
6065
use crate::v1::host_mesh::mesh_agent::ProcState;
@@ -211,6 +216,41 @@ enum HostMeshAllocation {
211216
}
212217

213218
impl HostMesh {
219+
/// Fork a new `HostMesh` from this process, returning the new `HostMesh`
220+
/// to the parent (owning) process, while running forever in child processes
221+
/// (i.e., individual procs).
222+
///
223+
/// All of the code preceding the call to `local` will run in each child proc;
224+
/// thus it is important to call `local` early in the lifetime of the program,
225+
/// and to ensure that it is reached unconditionally.
226+
///
227+
/// This is intended for testing, development, examples.
228+
pub async fn local() -> v1::Result<HostMesh> {
229+
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
230+
let err = boot.bootstrap().await;
231+
tracing::error!("failed to bootstrap local host mesh process: {}", err);
232+
std::process::exit(1);
233+
}
234+
235+
let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any();
236+
237+
let manager = BootstrapProcManager::new(BootstrapCommand::current()?);
238+
let (host, _handle) = Host::serve(manager, addr).await?;
239+
let addr = host.addr().clone();
240+
let host_mesh_agent = host
241+
.system_proc()
242+
.clone()
243+
.spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
244+
.await
245+
.map_err(v1::Error::SingletonActorSpawnError)?;
246+
host_mesh_agent.bind::<HostMeshAgent>();
247+
248+
let host = HostRef(addr);
249+
let host_mesh_ref =
250+
HostMeshRef::new(Name::new("local"), extent!(hosts = 1).into(), vec![host])?;
251+
Ok(HostMesh::take(host_mesh_ref))
252+
}
253+
214254
/// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
215255
/// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
216256
/// proc allocated by it.

0 commit comments

Comments
 (0)