From 1f5e2db52c3089fe017e038420b87af6d1975147 Mon Sep 17 00:00:00 2001 From: gaopengf Date: Tue, 20 Jan 2026 09:26:40 +0000 Subject: [PATCH 1/7] support async mode of torch for shm allreduce --- gloo/allreduce_shm.cc | 61 +++++++++++++++++++++++++++++++------------ gloo/context.h | 2 ++ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/gloo/allreduce_shm.cc b/gloo/allreduce_shm.cc index a81d376e5..e2953ac96 100644 --- a/gloo/allreduce_shm.cc +++ b/gloo/allreduce_shm.cc @@ -1,4 +1,5 @@ #include "gloo/allreduce_shm.h" +#include "gloo/types.h" #include #include @@ -444,11 +445,6 @@ AllreduceSharedMemoryData::~AllreduceSharedMemoryData() { void shm(const detail::AllreduceOptionsImpl& opts) { const auto& context = opts.context; - if (context->shmData == nullptr) { - context->shmData = std::make_shared( - context->rank, context->size); - context->shmData->initialize(); - } const size_t data_size = opts.elements * opts.elementSize; auto& in = opts.in; auto& out = opts.out; @@ -485,20 +481,51 @@ void shm(const detail::AllreduceOptionsImpl& opts) { } void* data = out[0].get()->ptr; + auto tag = opts.tag; + std::unique_ptr tagBuffer = + context->createUnboundBuffer(&tag, sizeof(tag)); + transport::UnboundBuffer* tag_ptr = tagBuffer.get(); + const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag); + + { + // Use mutex to make context->shmData thread safe. + std::unique_lock lock(context->shmDataMutex); + + if (context->shmData == nullptr) { + context->shmData = std::make_shared( + context->rank, context->size); + context->shmData->initialize(); + } - for (int offset = 0; offset < data_size; - offset += Allreduceworkspace::MAX_BUF_SIZE) { - auto data_ptr = ((char*)(data) + offset); - size_t chunk_size = data_size - offset > Allreduceworkspace::MAX_BUF_SIZE - ? Allreduceworkspace::MAX_BUF_SIZE - : data_size - offset; - size_t chunk_el = chunk_size / (data_size / opts.elements); - if (chunk_size < Allreduceworkspace::NAIVE_ALLREDUCE_THRESHOLD) { - symmetric_naive_all_reduce( - data_ptr, opts.elementSize, chunk_size, chunk_el, opts); + // In async mode there may be many allreduce ops executing simultaneously. + // However shmData is expected to occupied exclusively. We use unique tag to + // do synchronization among different ranks. + if (context->rank == 0) { + for (int i = 1; i < context->size; i++) { + tag_ptr->send(i, slot); + tag_ptr->waitSend(); + } } else { - distributed_naive_reduce( - data_ptr, opts.elementSize, chunk_size, chunk_el, opts); + lock.unlock(); + tag_ptr->recv(0, slot); + tag_ptr->waitRecv(); + lock.lock(); + } + + for (int offset = 0; offset < data_size; + offset += Allreduceworkspace::MAX_BUF_SIZE) { + auto data_ptr = ((char*)(data) + offset); + size_t chunk_size = data_size - offset > Allreduceworkspace::MAX_BUF_SIZE + ? Allreduceworkspace::MAX_BUF_SIZE + : data_size - offset; + size_t chunk_el = chunk_size / (data_size / opts.elements); + if (chunk_size < Allreduceworkspace::NAIVE_ALLREDUCE_THRESHOLD) { + symmetric_naive_all_reduce( + data_ptr, opts.elementSize, chunk_size, chunk_el, opts); + } else { + distributed_naive_reduce( + data_ptr, opts.elementSize, chunk_size, chunk_el, opts); + } } } diff --git a/gloo/context.h b/gloo/context.h index c114b8866..c7f8ae217 100644 --- a/gloo/context.h +++ b/gloo/context.h @@ -37,6 +37,8 @@ class Context { std::shared_ptr shmData; + std::mutex shmDataMutex; + std::shared_ptr& getDevice(); std::unique_ptr& getPair(int i); From 874795c64f7b6947aff1d45950070d1b6410742f Mon Sep 17 00:00:00 2001 From: gaopengf Date: Fri, 6 Feb 2026 15:55:39 +0000 Subject: [PATCH 2/7] generate unique shm buffer for each group --- gloo/allreduce_shm.cc | 44 ++++++++++++++++++++++++++++--------------- gloo/allreduce_shm.h | 10 +++++++++- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/gloo/allreduce_shm.cc b/gloo/allreduce_shm.cc index e2953ac96..4c0668282 100644 --- a/gloo/allreduce_shm.cc +++ b/gloo/allreduce_shm.cc @@ -37,6 +37,12 @@ struct SharedData { size_t nbytes; }; +// SHM buffer identifier +struct BufferIdentifier { + pid_t pid; + uintptr_t contextAddress; +}; + void shared_open(SharedData* data, const char* name, size_t nbytes) { int d = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); if (d != -1) { @@ -350,9 +356,10 @@ void AllreduceSharedMemoryData::initialize() { snprintf( shm_name_prefix, Allreduceworkspace::NAME_BUF_SIZE, - "%s_%d_%s_%s", + "%s_%d_%p_%s_%s", "shm_allreduce_buffer", - getsid(getpid()), + root_pid, + (void*)root_context_addr, addr_string.c_str(), port_string.c_str()); // create shared workspace for SHM based allreduce @@ -482,36 +489,43 @@ void shm(const detail::AllreduceOptionsImpl& opts) { void* data = out[0].get()->ptr; auto tag = opts.tag; - std::unique_ptr tagBuffer = - context->createUnboundBuffer(&tag, sizeof(tag)); - transport::UnboundBuffer* tag_ptr = tagBuffer.get(); + + BufferIdentifier bid; + std::unique_ptr tmpBuffer = + context->createUnboundBuffer(&bid, sizeof(bid)); + transport::UnboundBuffer* tmp_ptr = tmpBuffer.get(); const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag); { // Use mutex to make context->shmData thread safe. std::unique_lock lock(context->shmDataMutex); - if (context->shmData == nullptr) { - context->shmData = std::make_shared( - context->rank, context->size); - context->shmData->initialize(); - } - // In async mode there may be many allreduce ops executing simultaneously. // However shmData is expected to occupied exclusively. We use unique tag to // do synchronization among different ranks. + // There might be multiple process gloo groups, use (root_pid + + // context_addr) to generate name, which will be used by shm_open to create + // unique buffer for each group. if (context->rank == 0) { + bid.pid = getpid(); + bid.contextAddress = reinterpret_cast(context.get()); for (int i = 1; i < context->size; i++) { - tag_ptr->send(i, slot); - tag_ptr->waitSend(); + tmp_ptr->send(i, slot); + tmp_ptr->waitSend(opts.timeout); } } else { lock.unlock(); - tag_ptr->recv(0, slot); - tag_ptr->waitRecv(); + tmp_ptr->recv(0, slot); + tmp_ptr->waitRecv(opts.timeout); lock.lock(); } + if (context->shmData == nullptr) { + context->shmData = std::make_shared( + context->rank, context->size, bid.pid, bid.contextAddress); + context->shmData->initialize(); + } + for (int offset = 0; offset < data_size; offset += Allreduceworkspace::MAX_BUF_SIZE) { auto data_ptr = ((char*)(data) + offset); diff --git a/gloo/allreduce_shm.h b/gloo/allreduce_shm.h index a7b2f8615..76b78699e 100644 --- a/gloo/allreduce_shm.h +++ b/gloo/allreduce_shm.h @@ -36,9 +36,15 @@ struct AllreduceSharedMemoryData { char buffer[2 * NAIVE_ALLREDUCE_THRESHOLD + 2 * MAX_BUF_SIZE]; }; - AllreduceSharedMemoryData(int rank, int world_size) + AllreduceSharedMemoryData( + int rank, + int world_size, + pid_t root_pid, + uintptr_t root_context_addr) : rank(rank), world_size(world_size), + root_pid(root_pid), + root_context_addr(root_context_addr), current_buffer(0), state_idx(0), is_initialized(false) {} @@ -47,6 +53,8 @@ struct AllreduceSharedMemoryData { int rank; int world_size; + pid_t root_pid; + uintptr_t root_context_addr; int current_buffer; int state_idx; bool is_initialized; From f0c4a582c8dfcf6ffc8b4d841a657fefcf546a95 Mon Sep 17 00:00:00 2001 From: gaopengf Date: Tue, 10 Mar 2026 01:54:14 +0000 Subject: [PATCH 3/7] rewrite unspecific algorithm and use memcpy for small buffer --- gloo/allreduce.cc | 9 +++++---- gloo/allreduce.h | 3 +++ gloo/allreduce_shm.cc | 18 ++++++++++++++---- gloo/test/allreduce_test.cc | 13 +++++++++++++ 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/gloo/allreduce.cc b/gloo/allreduce.cc index 901d4302d..352522313 100644 --- a/gloo/allreduce.cc +++ b/gloo/allreduce.cc @@ -12,7 +12,7 @@ #include #include -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) +#if GLOO_SHM_ALLREDUCE_APPLICABLE #include "gloo/allreduce_shm.h" #endif #include "gloo/common/logging.h" @@ -137,8 +137,9 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) { auto algorithm = opts.algorithm; -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) - if (context->isIntraNode() && !context->getDevice()->hasGPUDirect()) { +#if GLOO_SHM_ALLREDUCE_APPLICABLE + if (algorithm == detail::AllreduceOptionsImpl::UNSPECIFIED && + context->isIntraNode() && !context->getDevice()->hasGPUDirect()) { algorithm = detail::AllreduceOptionsImpl::SHM; } #endif @@ -151,7 +152,7 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) { case detail::AllreduceOptionsImpl::BCUBE: bcube(opts, reduceInputs, broadcastOutputs); break; -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) +#if GLOO_SHM_ALLREDUCE_APPLICABLE case detail::AllreduceOptionsImpl::SHM: shm(opts); break; diff --git a/gloo/allreduce.h b/gloo/allreduce.h index 6f6037ede..37e5def37 100644 --- a/gloo/allreduce.h +++ b/gloo/allreduce.h @@ -19,6 +19,9 @@ namespace gloo { namespace detail { +#define GLOO_SHM_ALLREDUCE_APPLICABLE \ + !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) + struct AllreduceOptionsImpl { // This type describes the function to use for element wise reduction. // diff --git a/gloo/allreduce_shm.cc b/gloo/allreduce_shm.cc index 4c0668282..e7cf6d54f 100644 --- a/gloo/allreduce_shm.cc +++ b/gloo/allreduce_shm.cc @@ -175,6 +175,17 @@ static void parallel_memcpy(void* to, void* from, size_t n_bytes) { } } +void copy_buffer(void* to, void* from, size_t n_bytes) { + // use memcpy when buffer size is less than 1 MiB + if (n_bytes < AllreduceSharedMemoryData::AllreduceWorkspace:: + NAIVE_ALLREDUCE_THRESHOLD) { + memcpy(to, from, n_bytes); + return; + } + + parallel_memcpy(to, from, n_bytes); +} + size_t slice_size(size_t chunk_el, int slice_idx, int world_size) { size_t slice_size = chunk_el / world_size; return slice_idx == world_size - 1 ? slice_size + (chunk_el % world_size) @@ -234,7 +245,7 @@ void symmetric_naive_all_reduce( } state_idx = (state_idx + 1) % 3; - parallel_memcpy(symmetric_buffer[current_buffer][rank], data_ptr, chunk_size); + copy_buffer(symmetric_buffer[current_buffer][rank], data_ptr, chunk_size); std::atomic_thread_fence(std::memory_order_release); workspace[rank]->states[state_group] = copy_current; @@ -298,8 +309,7 @@ void distributed_naive_reduce( state_idx = (state_idx + 1) % 2; int data_size = chunk_size / chunk_el; - parallel_memcpy( - distributed_buffer[current_buffer][rank], data_ptr, chunk_size); + copy_buffer(distributed_buffer[current_buffer][rank], data_ptr, chunk_size); std::atomic_thread_fence(std::memory_order_release); workspace[rank]->states[state_group] = copy_current; @@ -324,7 +334,7 @@ void distributed_naive_reduce( for (int i = 0; i < world_size; i++) { int rank = (i + rank) % world_size; - parallel_memcpy( + copy_buffer( slice_data(data_ptr, chunk_el, data_size, rank, world_size), slice_data( distributed_buffer[current_buffer][rank], diff --git a/gloo/test/allreduce_test.cc b/gloo/test/allreduce_test.cc index 7c25aee5f..4a3648736 100644 --- a/gloo/test/allreduce_test.cc +++ b/gloo/test/allreduce_test.cc @@ -377,6 +377,19 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(true, false), ::testing::Values(Algorithm::BCUBE))); +#if GLOO_SHM_ALLREDUCE_APPLICABLE +INSTANTIATE_TEST_CASE_P( + AllreduceNewShm, + AllreduceNewTest, + ::testing::Combine( + ::testing::ValuesIn(kTransportsForFunctionAlgorithms), + ::testing::Values(1, 2, 4, 7), + ::testing::Values(1, 2, 3), + ::testing::Values(0, 1, 10, 100, 1000, 10000), + ::testing::Values(true, false), + ::testing::Values(Algorithm::UNSPECIFIED))); +#endif + template AllreduceOptions::Func getFunction() { void (*func)(void*, const void*, const void*, size_t) = &::gloo::sum; From b3990aa108a8bb553daa2a2e12a43cfa01790d88 Mon Sep 17 00:00:00 2001 From: gaopengf Date: Tue, 10 Mar 2026 02:12:00 +0000 Subject: [PATCH 4/7] fix macro --- gloo/allreduce.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gloo/allreduce.h b/gloo/allreduce.h index 37e5def37..b61f63ca9 100644 --- a/gloo/allreduce.h +++ b/gloo/allreduce.h @@ -20,7 +20,7 @@ namespace gloo { namespace detail { #define GLOO_SHM_ALLREDUCE_APPLICABLE \ - !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) + (!defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__)) struct AllreduceOptionsImpl { // This type describes the function to use for element wise reduction. From e82036ecebf0331f7f480a0ac917acebdff91ab2 Mon Sep 17 00:00:00 2001 From: gaopengf Date: Tue, 10 Mar 2026 02:28:40 +0000 Subject: [PATCH 5/7] fix macro of win --- gloo/allreduce.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/gloo/allreduce.h b/gloo/allreduce.h index b61f63ca9..a4c8e4501 100644 --- a/gloo/allreduce.h +++ b/gloo/allreduce.h @@ -19,8 +19,11 @@ namespace gloo { namespace detail { -#define GLOO_SHM_ALLREDUCE_APPLICABLE \ - (!defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__)) +#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) +#define GLOO_SHM_ALLREDUCE_APPLICABLE 1 +#else +#define GLOO_SHM_ALLREDUCE_APPLICABLE 0 +#endif struct AllreduceOptionsImpl { // This type describes the function to use for element wise reduction. From 228bdb8a5eba46b7272844590b00680cde8ff835 Mon Sep 17 00:00:00 2001 From: gaopengf Date: Wed, 11 Mar 2026 01:55:18 +0000 Subject: [PATCH 6/7] wrap shm data declaration with macro --- gloo/allreduce.h | 6 ------ gloo/context.h | 14 ++++++++++++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/gloo/allreduce.h b/gloo/allreduce.h index a4c8e4501..6f6037ede 100644 --- a/gloo/allreduce.h +++ b/gloo/allreduce.h @@ -19,12 +19,6 @@ namespace gloo { namespace detail { -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) -#define GLOO_SHM_ALLREDUCE_APPLICABLE 1 -#else -#define GLOO_SHM_ALLREDUCE_APPLICABLE 0 -#endif - struct AllreduceOptionsImpl { // This type describes the function to use for element wise reduction. // diff --git a/gloo/context.h b/gloo/context.h index c7f8ae217..5e53b5457 100644 --- a/gloo/context.h +++ b/gloo/context.h @@ -17,6 +17,16 @@ namespace gloo { +#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) +#define GLOO_SHM_ALLREDUCE_APPLICABLE 1 +#else +#define GLOO_SHM_ALLREDUCE_APPLICABLE 0 +#endif + +#if GLOO_SHM_ALLREDUCE_APPLICABLE +class AllreduceSharedMemoryData; +#endif + // There is no need to materialize all transport types here. namespace transport { class Context; @@ -24,8 +34,6 @@ class Device; class UnboundBuffer; } // namespace transport -class AllreduceSharedMemoryData; - class Context { public: Context(int rank, int size, int base = 2); @@ -35,9 +43,11 @@ class Context { const int size; int base; +#if GLOO_SHM_ALLREDUCE_APPLICABLE std::shared_ptr shmData; std::mutex shmDataMutex; +#endif std::shared_ptr& getDevice(); From a863f420fa610a79c152a18881cb1d98fd37700a Mon Sep 17 00:00:00 2001 From: gaopengf Date: Tue, 7 Apr 2026 01:28:18 +0000 Subject: [PATCH 7/7] retrigger CI