Skip to content

Multi-GPU Batched KMeans#2017

Open
viclafargue wants to merge 80 commits intorapidsai:mainfrom
viclafargue:mg-batched-kmeans
Open

Multi-GPU Batched KMeans#2017
viclafargue wants to merge 80 commits intorapidsai:mainfrom
viclafargue:mg-batched-kmeans

Conversation

@viclafargue
Copy link
Copy Markdown
Contributor

Closes #1989.

Adds multi-GPU support to KMeans fit for host-resident data, with two modes:

  • OpenMP (cuVS SNMG): A single process drives all local GPUs via OMP threads and raw NCCL. Activated automatically when the handle is a device_resources_snmg.
  • RAFT comms (Ray / Dask / MPI): Each rank is a separate process that calls fit with its own data shard and an initialized RAFT communicator. Coordination uses the RAFT comms.

Both modes share the same core Lloyd's loop, batched streaming of host data, NCCL/comms allreduce of centroid sums and counts, and synchronized convergence. Supports sample weights, n_init best-of-N restarts, KMeansPlusPlus initialization, and float/double. Falls back to single-GPU when neither multi-GPU resources nor comms are present.

@viclafargue viclafargue self-assigned this Apr 13, 2026
@viclafargue viclafargue requested review from a team as code owners April 13, 2026 14:34
@viclafargue viclafargue added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Apr 13, 2026
@viclafargue
Copy link
Copy Markdown
Contributor Author

Here are some instructions to test the Multi-GPU Batched KMeans API with RAFT comms (to be used with Ray/Dask) :

RAFT comms (Ray/Dask) demo code
#include <cuvs/cluster/kmeans.hpp>

#include <raft/comms/std_comms.hpp>
#include <raft/core/device_mdarray.hpp>
#include <raft/core/host_mdspan.hpp>
#include <raft/core/resource/comms.hpp>
#include <raft/core/resource/cuda_stream.hpp>
#include <raft/core/resources.hpp>

#include <cuda_runtime.h>
#include <mpi.h>
#include <nccl.h>

#include <algorithm>
#include <cmath>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <numeric>
#include <random>
#include <vector>

#define CHECK_CUDA(call)                                                 \
  do {                                                                   \
    cudaError_t e = (call);                                              \
    if (e != cudaSuccess) {                                              \
      std::fprintf(stderr, "CUDA error %s @ %s:%d\n",                   \
                   cudaGetErrorString(e), __FILE__, __LINE__);           \
      MPI_Abort(MPI_COMM_WORLD, 1);                                      \
    }                                                                    \
  } while (0)

#define CHECK_NCCL(call)                                                 \
  do {                                                                   \
    ncclResult_t r = (call);                                             \
    if (r != ncclSuccess) {                                              \
      std::fprintf(stderr, "NCCL error %s @ %s:%d\n",                   \
                   ncclGetErrorString(r), __FILE__, __LINE__);           \
      MPI_Abort(MPI_COMM_WORLD, 1);                                      \
    }                                                                    \
  } while (0)

int main(int argc, char** argv)
{
  MPI_Init(&argc, &argv);

  int rank, num_ranks;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);

  CHECK_CUDA(cudaSetDevice(rank));

  ncclUniqueId nccl_id;
  if (rank == 0) CHECK_NCCL(ncclGetUniqueId(&nccl_id));
  MPI_Bcast(&nccl_id, sizeof(nccl_id), MPI_BYTE, 0, MPI_COMM_WORLD);

  ncclComm_t nccl_comm;
  CHECK_NCCL(ncclCommInitRank(&nccl_comm, num_ranks, nccl_id, rank));

  raft::resources handle;
  raft::comms::build_comms_nccl_only(&handle, nccl_comm, num_ranks, rank);

  // --- Demo parameters ---
  constexpr int64_t n_samples       = 100'000;
  constexpr int64_t n_features      = 32;
  constexpr int     n_clusters      = 10;
  constexpr int64_t streaming_batch = 10'000;
  constexpr float   cluster_spread  = 1.0f;
  constexpr float   center_range    = 30.0f;

  if (rank == 0) {
    std::printf("=== Multi-GPU KMeans Demo (%d ranks) ===\n", num_ranks);
    std::printf("Samples: %ld | Features: %ld | k: %d | batch: %ld\n\n",
                long(n_samples), long(n_features), n_clusters, long(streaming_batch));
  }

  // Generate synthetic blobs with well-separated cluster centers
  std::vector<float> h_data(n_samples * n_features);
  std::vector<int>   h_true_labels(n_samples);
  std::vector<float> cluster_centers(n_clusters * n_features);
  {
    std::mt19937 gen(12345);
    std::uniform_real_distribution<float> center_dist(-center_range, center_range);
    std::normal_distribution<float> noise(0.0f, cluster_spread);

    for (int c = 0; c < n_clusters; ++c)
      for (int d = 0; d < n_features; ++d)
        cluster_centers[c * n_features + d] = center_dist(gen);

    for (int64_t i = 0; i < n_samples; ++i) {
      int label = static_cast<int>(i % n_clusters);
      h_true_labels[i] = label;
      for (int d = 0; d < n_features; ++d)
        h_data[i * n_features + d] = cluster_centers[label * n_features + d] + noise(gen);
    }

    // Shuffle so labels aren't just sequential runs
    std::vector<int64_t> perm(n_samples);
    std::iota(perm.begin(), perm.end(), 0);
    std::shuffle(perm.begin(), perm.end(), gen);

    std::vector<float> tmp_data(h_data);
    std::vector<int>   tmp_labels(h_true_labels);
    for (int64_t i = 0; i < n_samples; ++i) {
      std::memcpy(h_data.data() + i * n_features,
                  tmp_data.data() + perm[i] * n_features,
                  n_features * sizeof(float));
      h_true_labels[i] = tmp_labels[perm[i]];
    }
  }

  int64_t base    = n_samples / num_ranks;
  int64_t rem     = n_samples % num_ranks;
  int64_t offset  = rank * base + std::min<int64_t>(rank, rem);
  int64_t n_local = base + (rank < rem ? 1 : 0);

  std::printf("[rank %d / GPU %d]  rows [%ld .. %ld)  (%ld samples)\n",
              rank, rank, long(offset), long(offset + n_local), long(n_local));

  auto X_local = raft::make_host_matrix_view<const float, int64_t>(
    h_data.data() + offset * n_features, n_local, n_features);

  auto d_centroids = raft::make_device_matrix<float, int64_t>(handle, n_clusters, n_features);

  cuvs::cluster::kmeans::params params;
  params.n_clusters           = n_clusters;
  params.max_iter             = 50;
  params.tol                  = 1e-4;
  params.init                 = cuvs::cluster::kmeans::params::KMeansPlusPlus;
  params.rng_state.seed       = 42;
  params.inertia_check        = true;
  params.streaming_batch_size = streaming_batch;

  float   inertia = 0.0f;
  int64_t n_iter  = 0;

  cuvs::cluster::kmeans::fit(handle,
                             params,
                             X_local,
                             std::nullopt,
                             d_centroids.view(),
                             raft::make_host_scalar_view(&inertia),
                             raft::make_host_scalar_view(&n_iter));

  auto stream = raft::resource::get_cuda_stream(handle);
  CHECK_CUDA(cudaStreamSynchronize(stream));

  if (rank == 0) {
    // --- Predict labels on the full dataset (on rank 0) ---
    auto d_X = raft::make_device_matrix<float, int64_t>(handle, n_samples, n_features);
    CHECK_CUDA(cudaMemcpy(d_X.data_handle(), h_data.data(),
                          sizeof(float) * n_samples * n_features, cudaMemcpyHostToDevice));

    auto d_labels = raft::make_device_vector<int64_t, int64_t>(handle, n_samples);
    float predict_inertia = 0.0f;

    cuvs::cluster::kmeans::predict(
      handle, params,
      raft::make_device_matrix_view<const float, int64_t>(d_X.data_handle(), n_samples, n_features),
      std::nullopt,
      raft::make_device_matrix_view<const float, int64_t>(
        d_centroids.data_handle(), n_clusters, n_features),
      d_labels.view(),
      false,
      raft::make_host_scalar_view(&predict_inertia));
    CHECK_CUDA(cudaStreamSynchronize(stream));

    std::vector<int64_t> h_labels(n_samples);
    CHECK_CUDA(cudaMemcpy(h_labels.data(), d_labels.data_handle(),
                          sizeof(int64_t) * n_samples, cudaMemcpyDeviceToHost));

    // --- Quality: permutation-invariant accuracy via majority voting ---
    // For each predicted cluster, find which true label appears most often.
    std::vector<std::vector<int64_t>> confusion(n_clusters, std::vector<int64_t>(n_clusters, 0));
    for (int64_t i = 0; i < n_samples; ++i)
      confusion[h_labels[i]][h_true_labels[i]]++;

    // Greedy matching: assign each predicted cluster to its dominant true label
    std::vector<int> pred_to_true(n_clusters, -1);
    std::vector<bool> true_taken(n_clusters, false);
    for (int round = 0; round < n_clusters; ++round) {
      int64_t best_count = -1;
      int best_pred = -1, best_true = -1;
      for (int p = 0; p < n_clusters; ++p) {
        if (pred_to_true[p] >= 0) continue;
        for (int t = 0; t < n_clusters; ++t) {
          if (true_taken[t]) continue;
          if (confusion[p][t] > best_count) {
            best_count = confusion[p][t];
            best_pred = p;
            best_true = t;
          }
        }
      }
      pred_to_true[best_pred] = best_true;
      true_taken[best_true] = true;
    }

    int64_t correct = 0;
    std::vector<int64_t> cluster_sizes(n_clusters, 0);
    std::vector<int64_t> cluster_correct(n_clusters, 0);
    for (int64_t i = 0; i < n_samples; ++i) {
      int p = static_cast<int>(h_labels[i]);
      cluster_sizes[p]++;
      if (h_true_labels[i] == pred_to_true[p]) {
        ++correct;
        ++cluster_correct[p];
      }
    }
    double accuracy = 100.0 * correct / n_samples;

    // --- Compute centroid-to-true-center distances ---
    std::vector<float> h_centroids(n_clusters * n_features);
    CHECK_CUDA(cudaMemcpy(h_centroids.data(), d_centroids.data_handle(),
                          sizeof(float) * n_clusters * n_features, cudaMemcpyDeviceToHost));

    std::printf("\n============ Multi-GPU KMeans Results ============\n");
    std::printf("  Ranks             : %d\n", num_ranks);
    std::printf("  Total samples     : %ld\n", long(n_samples));
    std::printf("  Features          : %ld\n", long(n_features));
    std::printf("  Clusters (k)      : %d\n", n_clusters);
    std::printf("  Streaming batch   : %ld\n", long(streaming_batch));
    std::printf("  Lloyd iterations  : %ld\n", long(n_iter));
    std::printf("  Final inertia     : %.6f\n", double(inertia));
    std::printf("  Predict inertia   : %.6f\n", double(predict_inertia));
    std::printf("\n  --- Clustering Quality ---\n");
    std::printf("  Overall accuracy  : %.2f%% (%ld / %ld)\n",
                accuracy, long(correct), long(n_samples));

    std::printf("\n  Per-cluster breakdown:\n");
    std::printf("  %6s  %10s  %10s  %8s  %12s\n",
                "Pred", "TrueLabel", "Size", "Acc%", "CentroidErr");
    for (int p = 0; p < n_clusters; ++p) {
      int t = pred_to_true[p];
      double pct = cluster_sizes[p] > 0
                     ? 100.0 * cluster_correct[p] / cluster_sizes[p]
                     : 0.0;

      // L2 distance between learned centroid and ground truth center
      double dist2 = 0.0;
      for (int d = 0; d < n_features; ++d) {
        double diff = h_centroids[p * n_features + d] - cluster_centers[t * n_features + d];
        dist2 += diff * diff;
      }
      std::printf("  %6d  %10d  %10ld  %7.2f%%  %12.4f\n",
                  p, t, long(cluster_sizes[p]), pct, std::sqrt(dist2));
    }

    std::printf("\n  Expected accuracy for well-separated blobs: >99%%\n");
    if (accuracy >= 99.0)
      std::printf("  PASS: Clustering quality is high.\n");
    else if (accuracy >= 90.0)
      std::printf("  WARN: Clustering quality is acceptable but not ideal.\n");
    else
      std::printf("  FAIL: Clustering quality is poor!\n");

    std::printf("==================================================\n");
  }

  CHECK_NCCL(ncclCommDestroy(nccl_comm));
  MPI_Finalize();
  return 0;
}
Compilation command
nvcc -std=c++17 -x cu --extended-lambda -arch=native       \
 -I$CONDA_PREFIX/include/rapids                            \
 -I$CONDA_PREFIX/include                                   \
 demo_mg_kmeans_raft_comms.cu                              \
 -L$CONDA_PREFIX/lib -lcuvs -lnccl -lrmm -lmpi             \
 -lucxx -lucp -lucs                                       \
 -Xlinker=-rpath,$CONDA_PREFIX/lib                         \
 -o demo_mg_kmeans
Launch command

mpirun -np 2 ./demo_mg_kmeans

@viclafargue viclafargue requested a review from tarang-jain April 13, 2026 14:42
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
/*
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this file entirely and combine with regular mg kmeans (just as we are doing in PR #2015)? Is that possible?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, MNMG should be able to reuse the snmg_fit function (for a single worker) as is, right? Except that the nccl reduce macro will be replaced by something like comms.allreduce()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the code to work flawlessly with the single GPU refactor. The process_batch function is now being used. The weight normalization is improved (rel_tol and zero/invalid check). The inertia_check field is not used anymore.

However, I do not feel confident implementing a massive unifying refactor in this PR that is originally dedicated to introducing MG Batched KMeans. Would it be fine to leave things as is for now? We could come back to it in a dedicated follow-up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can get this and the refactor into 26.06, its ok to break it down. We'll need to track the MG refactor in an issue and that would be a high priority.

Comment thread cpp/src/cluster/detail/kmeans_common.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg.cuh Outdated

// Separate workspace for compute_centroid_adjustments (distinct from the one
// used by minClusterAndDistanceCompute inside process_batch).
rmm::device_uvector<char> batch_workspace(0, stream);
Copy link
Copy Markdown
Contributor

@tarang-jain tarang-jain May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am tempted to put the entire batching logic itself in a helper, so that both kmeans and kmeans_mg can use it identically without duplicating code at all. But that will be a significant amount of work in a follow up

auto batch_data_view = raft::make_device_matrix_view<const T, IdxT>(
data_batch.data(), current_batch_size, n_features);

cuvs::cluster::kmeans::detail::copy_and_scale_batch_weights<T, IdxT>(dev_res,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this helper has been removed. We are now caching weights in pinned memory. Question: can we have a single CPU cache for the weights even in the multi-GPU case? If not, we can always recompute it for each batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can scale the weight once in pinned memory too. For the OpenMP case, each rank receives a cache and fills its own slice. For the multi-process case, each rank allocates and fills its own pinned memory cache.

Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
@viclafargue viclafargue requested review from a team as code owners May 7, 2026 14:18
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 7, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
python/cuvs/cuvs/cluster/kmeans/kmeans.pyx (1)

121-152: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep a deprecated inertia_check property for one release cycle.

In the previous release, KMeansParams.inertia_check was a public property. The current change removes it entirely while still accepting the parameter in __init__ (with a warning). This breaks existing code that reads params.inertia_check, violating the requirement for at least one release cycle of deprecation for public interface removals. Add back the property with a deprecation warning before removing it in a future release.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cuvs/cuvs/cluster/kmeans/kmeans.pyx` around lines 121 - 152, Add a
deprecated inertia_check property back onto the KMeansParams type so callers
reading params.inertia_check continue to work: implement a `@property-like` getter
and setter on KMeansParams (the same type used as self.params in
kmeans.__init__) that returns a stored boolean (default False), and have both
the getter and setter emit a FutureWarning stating the property is deprecated
and ignored (inertia-based convergence always runs); keep internal behavior
unchanged (the parameter is ignored) and ensure __init__ still accepts
inertia_check and sets the stored field so legacy reads see the value.
🧹 Nitpick comments (2)
c/include/cuvs/cluster/kmeans.h (1)

42-43: ⚡ Quick win

Use the introduction release in these deprecation notes.

These notes point to 26.08, but the repo convention is to stamp deprecations with the current release where the replacement first appears. That makes it clear when *_v2 became available, instead of conflating introduction with planned removal.

Based on learnings, "CalVer deprecation notices should reference the current minor release version ... not a future release."

Also applies to: 208-232, 271-305, 337-369

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@c/include/cuvs/cluster/kmeans.h` around lines 42 - 43, Update the deprecation
comments to use the release where the replacement actually landed (the
"introduction" CalVer) instead of the future removal release; specifically
change the note referencing "26.08" to the CalVer when cuvsKMeansParams_v2 was
introduced and do the same for other deprecation blocks that mention the
inertia_check field and cuvsKMeansParams_v2 (and analogous symbols in the other
listed comment blocks). Ensure each comment clearly states the replacement
symbol (e.g., inertia_check -> cuvsKMeansParams_v2) and the actual release that
introduced the v2 type.
cpp/src/cluster/detail/kmeans_common.cuh (1)

654-720: 💤 Low value

Consider pre-allocating batch_cost outside the function.

Line 712 allocates a new device_scalar on each call to process_batch. For large datasets with many batches, this creates repeated small allocations. Consider passing batch_cost as a pre-allocated parameter to avoid per-batch allocation overhead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/detail/kmeans_common.cuh` around lines 654 - 720,
process_batch currently allocates a per-call device scalar via
raft::make_device_scalar<DataT> (batch_cost) which causes many small
allocations; change process_batch to accept a pre-allocated batch_cost (e.g.,
raft::device_scalar_view<DataT> or rmm::device_scalar<DataT>&) as an additional
parameter instead of creating it inside the function, remove the internal
raft::make_device_scalar<DataT> allocation and pass
batch_cost.view()/data_handle() into computeClusterCost and the subsequent
raft::linalg::add call, and update all callers of process_batch to allocate a
single reusable device scalar once (via raft::make_device_scalar) and pass it
into each batch invocation to avoid per-batch allocations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@c/tests/cluster/kmeans_c.cu`:
- Around line 106-123: The CUDA copies enqueued on handle's stream must be
synchronized before invoking the C API; add a call to handle.sync_stream()
immediately after the raft::copy calls and before cuvsResourcesCreate() in both
test helpers (test_fit_predict and test_fit_host) so the H2D transfers complete
prior to calling cuvsResourcesCreate()/other C API functions that use a separate
cuvsResources_t.

In `@cpp/src/cluster/detail/kmeans_mg_batched.cuh`:
- Around line 339-341: The device-side done flag d_done_flag is not being reset
between n_init restarts (only the host shadow h_done_flag is), which can cause a
previous run's d_done_flag==1 to prematurely terminate the next restart; before
each restart iteration reset the device scalar d_done_flag back to 0 (in
addition to clearing h_done_flag) by writing a zero to the device scalar using
the same device-scalar API or a host-to-device copy/memset so the device-side
reduction starts from 0 for each new initialization.
- Around line 471-487: The new_centroids produced by finalize_centroids are not
L2-normalized for the CosineExpanded path, so subsequent distance computations
use non-unit vectors; after cuvs::cluster::kmeans::detail::finalize_centroids
(and before compute_centroid_shift and raft::copy) call the centroid
normalization routine for the cosine metric (e.g., invoke the project/normalize
kernel that L2-normalizes the mdspan backing new_centroids or add a new
l2_normalize_centroids<T,IdxT>(new_centroids.view(), n_clusters, n_features,
stream) and execute it only when params.metric == DistanceType::CosineExpanded)
so that new_centroids are unit-length on-device prior to being used as active
centroids (ensure run on the same CUDA stream).
- Around line 72-83: The MG init path currently samples from only the local
shard (X_local) on rank 0 and reduces init_sample_size by streaming_batch_size,
which biases KMeans(Random/KMeansPlusPlus) and violates documented init_size
semantics; change the logic so sampling for both InitMethod::Random and
InitMethod::KMeansPlusPlus is drawn from the global host dataset (use X, not
X_local, or gather global samples on root before broadcast) and compute
init_sample_size strictly per params.init_size (respect params.init_size when >0
and only clamp to n_samples or n_clusters, not to streaming_batch_size), then
call raft::matrix::sample_rows(handle, rng, X, centroids) or equivalent on the
chosen root with the corrected init_sample_size and broadcast centroids; apply
the same fix to the second occurrence around symbols
init_sample_size/streaming_batch_size at the later block (lines ~359-363).

In `@cpp/src/cluster/detail/kmeans.cuh`:
- Around line 601-602: When sample_weight.has_value() and you call
weightSum(handle, sample_weight.value(), d_wt_sum.view()), add a device-to-host
copy and validate the sum is positive before proceeding: after weightSum
returns, copy d_wt_sum to a host scalar (or use
raft::device_scalar::copy_to_host) and assert RAFT_EXPECTS(wt_sum_host >
DataT{0}, "sample_weight sum must be > 0") (or throw the same error used in the
host path) so downstream normalization cannot divide by zero; alternatively
perform the positivity check inside the normalization kernel, but the simplest
fix is to copy d_wt_sum to host and perform the RAFT_EXPECTS check immediately
after the weightSum call.

In `@cpp/src/cluster/kmeans_impl.cuh`:
- Around line 21-26: The code rebuilds sample_weights using
raft::make_device_vector_view with X.extent(0), which can create an incorrect
size and out-of-bounds reads; instead pass the incoming sample_weights view
directly (e.g., use sw = std::make_optional(sample_weights)) and add an
assertion that sample_weights.extent(0) == X.extent(0) before calling
cuvs::cluster::kmeans::detail::kmeans_fit; update the use of sw passed into
kmeans_fit and remove the fabricated raft::make_device_vector_view construction.

---

Outside diff comments:
In `@python/cuvs/cuvs/cluster/kmeans/kmeans.pyx`:
- Around line 121-152: Add a deprecated inertia_check property back onto the
KMeansParams type so callers reading params.inertia_check continue to work:
implement a `@property-like` getter and setter on KMeansParams (the same type used
as self.params in kmeans.__init__) that returns a stored boolean (default
False), and have both the getter and setter emit a FutureWarning stating the
property is deprecated and ignored (inertia-based convergence always runs); keep
internal behavior unchanged (the parameter is ignored) and ensure __init__ still
accepts inertia_check and sets the stored field so legacy reads see the value.

---

Nitpick comments:
In `@c/include/cuvs/cluster/kmeans.h`:
- Around line 42-43: Update the deprecation comments to use the release where
the replacement actually landed (the "introduction" CalVer) instead of the
future removal release; specifically change the note referencing "26.08" to the
CalVer when cuvsKMeansParams_v2 was introduced and do the same for other
deprecation blocks that mention the inertia_check field and cuvsKMeansParams_v2
(and analogous symbols in the other listed comment blocks). Ensure each comment
clearly states the replacement symbol (e.g., inertia_check ->
cuvsKMeansParams_v2) and the actual release that introduced the v2 type.

In `@cpp/src/cluster/detail/kmeans_common.cuh`:
- Around line 654-720: process_batch currently allocates a per-call device
scalar via raft::make_device_scalar<DataT> (batch_cost) which causes many small
allocations; change process_batch to accept a pre-allocated batch_cost (e.g.,
raft::device_scalar_view<DataT> or rmm::device_scalar<DataT>&) as an additional
parameter instead of creating it inside the function, remove the internal
raft::make_device_scalar<DataT> allocation and pass
batch_cost.view()/data_handle() into computeClusterCost and the subsequent
raft::linalg::add call, and update all callers of process_batch to allocate a
single reusable device scalar once (via raft::make_device_scalar) and pass it
into each batch invocation to avoid per-batch allocations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: f65e2954-ca19-4fc0-ac29-fbf765ce155a

📥 Commits

Reviewing files that changed from the base of the PR and between 7055272 and add9db1.

📒 Files selected for processing (18)
  • c/include/cuvs/cluster/kmeans.h
  • c/src/cluster/kmeans.cpp
  • c/tests/CMakeLists.txt
  • c/tests/cluster/kmeans_c.cu
  • cpp/include/cuvs/cluster/kmeans.hpp
  • cpp/src/cluster/detail/kmeans.cuh
  • cpp/src/cluster/detail/kmeans_batched.cuh
  • cpp/src/cluster/detail/kmeans_common.cuh
  • cpp/src/cluster/detail/kmeans_mg.cuh
  • cpp/src/cluster/detail/kmeans_mg_batched.cuh
  • cpp/src/cluster/kmeans.cuh
  • cpp/src/cluster/kmeans_fit_double.cu
  • cpp/src/cluster/kmeans_fit_float.cu
  • cpp/src/cluster/kmeans_impl.cuh
  • cpp/tests/cluster/kmeans.cu
  • cpp/tests/cluster/kmeans_mg_batched.cu
  • python/cuvs/cuvs/cluster/kmeans/kmeans.pxd
  • python/cuvs/cuvs/cluster/kmeans/kmeans.pyx
💤 Files with no reviewable changes (1)
  • cpp/src/cluster/detail/kmeans_batched.cuh
✅ Files skipped from review due to trivial changes (1)
  • c/tests/CMakeLists.txt
🚧 Files skipped from review as they are similar to previous changes (4)
  • cpp/src/cluster/kmeans_fit_float.cu
  • cpp/src/cluster/kmeans.cuh
  • cpp/src/cluster/detail/kmeans_mg.cuh
  • cpp/src/cluster/kmeans_fit_double.cu

Comment on lines +106 to +123
raft::handle_t handle;
auto stream = raft::resource::get_cuda_stream(handle);

rmm::device_uvector<float> dataset_d(kNSamples * kNFeatures, stream);
rmm::device_uvector<float> centroids_d(kNClusters * kNFeatures, stream);
rmm::device_uvector<int32_t> labels_d(kNSamples, stream);

raft::copy(dataset_d.data(),
reinterpret_cast<float const*>(kDataset),
kNSamples * kNFeatures,
stream);
raft::copy(centroids_d.data(),
reinterpret_cast<float const*>(kInitCentroids),
kNClusters * kNFeatures,
stream);

cuvsResources_t res;
ASSERT_EQ(cuvsResourcesCreate(&res), CUVS_SUCCESS);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

find . -name "kmeans_c.cu" -type f

Repository: rapidsai/cuvs

Length of output: 87


🏁 Script executed:

cat -n c/tests/cluster/kmeans_c.cu | head -230

Repository: rapidsai/cuvs

Length of output: 9436


🏁 Script executed:

# Check the full context around the mentioned line ranges
sed -n '100,160p' c/tests/cluster/kmeans_c.cu

Repository: rapidsai/cuvs

Length of output: 2250


🏁 Script executed:

# Check the other mentioned ranges
sed -n '150,220p' c/tests/cluster/kmeans_c.cu

Repository: rapidsai/cuvs

Length of output: 2680


🏁 Script executed:

# Search for cuvsResourcesCreate usage patterns elsewhere
rg "cuvsResourcesCreate" -B5 -A5 c/tests/ | head -100

Repository: rapidsai/cuvs

Length of output: 5665


🏁 Script executed:

# Check how RAFT handle and cuvsResources are typically connected
rg "cuvsResources.*handle|handle.*cuvsResources" c/ | head -20

Repository: rapidsai/cuvs

Length of output: 1702


🏁 Script executed:

# Check how cuvsResourcesCreate is implemented and what stream it uses
rg "cuvsResourcesCreate" -A 20 c/src/ --type cpp | head -80

Repository: rapidsai/cuvs

Length of output: 1159


🏁 Script executed:

# Look for the implementation in the C API layer
find c/src -name "c_api.cpp" -o -name "c_api.c" | head -5

Repository: rapidsai/cuvs

Length of output: 78


🏁 Script executed:

# Check if there's documentation about how cuvsResources and raft::handle interact
rg "cuvsResources" c/include/cuvs/core/c_api.h -B 2 -A 2 | head -80

Repository: rapidsai/cuvs

Length of output: 2356


🏁 Script executed:

# Check for any stream synchronization patterns in other C API tests
rg "handle\.sync_stream|cuvsResourcesCreate" c/tests/ -B 3 -A 3 | head -150

Repository: rapidsai/cuvs

Length of output: 7822


🏁 Script executed:

# Verify the exact line numbers for the C API calls in kmeans_c.cu
sed -n '150,156p' c/tests/cluster/kmeans_c.cu

Repository: rapidsai/cuvs

Length of output: 404


🏁 Script executed:

# Verify test_fit_host C API call lines
sed -n '212,216p' c/tests/cluster/kmeans_c.cu

Repository: rapidsai/cuvs

Length of output: 233


Add stream synchronization before calling the C API in both test helpers.

These helpers enqueue input copies on handle's stream, then immediately call C API functions through a separate cuvsResources_t. Since each raft::resources object has its own CUDA stream, the C API can race the H2D setup without explicit synchronization. Add handle.sync_stream() before cuvsResourcesCreate() in test_fit_predict() and test_fit_host(), consistent with the pattern used in pca_c.cu.

Applies to: lines 106-123, 152-155, 180-190, 214-214

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@c/tests/cluster/kmeans_c.cu` around lines 106 - 123, The CUDA copies enqueued
on handle's stream must be synchronized before invoking the C API; add a call to
handle.sync_stream() immediately after the raft::copy calls and before
cuvsResourcesCreate() in both test helpers (test_fit_predict and test_fit_host)
so the H2D transfers complete prior to calling cuvsResourcesCreate()/other C API
functions that use a separate cuvsResources_t.

Comment on lines +72 to +83
if (params.init == cuvs::cluster::kmeans::params::InitMethod::Random) {
raft::matrix::sample_rows(handle, rng, X, centroids);
} else if (params.init == cuvs::cluster::kmeans::params::InitMethod::KMeansPlusPlus) {
IndexT default_init_size =
std::min(static_cast<IndexT>(std::int64_t{3} * n_clusters), n_samples);
IndexT init_sample_size = params.init_size > 0
? std::min(static_cast<IndexT>(params.init_size), n_samples)
: default_init_size;
if (streaming_batch_size > 0) {
init_sample_size = std::min(init_sample_size, streaming_batch_size);
}
init_sample_size = std::max(init_sample_size, std::min(n_clusters, n_samples));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Seed from the global host dataset and honor init_size as documented.

In the MG paths, Random / KMeansPlusPlus currently initialize from X_local on rank 0 only, then broadcast. That means partitioned runs seed from a single shard instead of the full host dataset, and the extra streaming_batch_size clamp silently overrides larger init_size requests. Both behaviors diverge from the documented host-data seeding semantics and can badly bias initialization when shards are uneven or rank 0 is empty.

As per coding guidelines, "verify KMeans++ initialization sampling count matches init_size host rules."

Also applies to: 359-363

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/detail/kmeans_mg_batched.cuh` around lines 72 - 83, The MG
init path currently samples from only the local shard (X_local) on rank 0 and
reduces init_sample_size by streaming_batch_size, which biases
KMeans(Random/KMeansPlusPlus) and violates documented init_size semantics;
change the logic so sampling for both InitMethod::Random and
InitMethod::KMeansPlusPlus is drawn from the global host dataset (use X, not
X_local, or gather global samples on root before broadcast) and compute
init_sample_size strictly per params.init_size (respect params.init_size when >0
and only clamp to n_samples or n_clusters, not to streaming_batch_size), then
call raft::matrix::sample_rows(handle, rng, X, centroids) or equivalent on the
chosen root with the corrected init_sample_size and broadcast centroids; apply
the same fix to the second occurrence around symbols
init_sample_size/streaming_batch_size at the later block (lines ~359-363).

Comment on lines +339 to +341
auto d_prior_cost = raft::make_device_scalar<T>(dev_res, T{0});
auto d_done_flag = raft::make_device_scalar<int64_t>(dev_res, 0);
auto h_done_flag = raft::make_pinned_scalar<int64_t>(dev_res, 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reset the device done flag between n_init restarts.

Only the host shadow flag is cleared here. If a previous restart left d_done_flag == 1, the next restart can allreduce an already-set device flag and terminate almost immediately.

Suggested fix
     // Reset per-pass convergence state to avoid leaking it across n_init.
     raft::matrix::fill(dev_res, d_prior_cost.view(), T{0});
+    raft::matrix::fill(dev_res, d_done_flag.view(), int64_t{0});
     *h_done_flag.data_handle() = 0;

Also applies to: 376-379

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/detail/kmeans_mg_batched.cuh` around lines 339 - 341, The
device-side done flag d_done_flag is not being reset between n_init restarts
(only the host shadow h_done_flag is), which can cause a previous run's
d_done_flag==1 to prematurely terminate the next restart; before each restart
iteration reset the device scalar d_done_flag back to 0 (in addition to clearing
h_done_flag) by writing a zero to the device scalar using the same device-scalar
API or a host-to-device copy/memset so the device-side reduction starts from 0
for each new initialization.

Comment on lines +471 to +487
cuvs::cluster::kmeans::detail::finalize_centroids<T, IdxT>(dev_res,
centroid_sums_const,
weight_per_cluster_const,
rank_centroids_const,
new_centroids.view());

// Phase 4: device-side convergence evaluation. Compute shift,
// run `check_convergence` via `map_offset`, allreduce the flag,
// shadow into pinned host. Consumed at top of next iteration.
cuvs::cluster::kmeans::detail::compute_centroid_shift<T, IdxT>(
dev_res,
raft::make_const_mdspan(rank_centroids.view()),
raft::make_const_mdspan(new_centroids.view()),
sqrd_norm_error_dev.view());

raft::copy(
rank_centroids.data_handle(), new_centroids.data_handle(), n_clusters * n_features, stream);
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Re-normalize centroids on the cosine path.

This loop finalizes new_centroids and immediately reuses them in the next assignment step, but there is no cosine-specific normalization before they become the active centroids. For DistanceType::CosineExpanded, that means later distance computations run against non-unit centroids.

Based on learnings, "When adding or modifying k-means code paths that support the CosineExpanded metric (e.g., via minClusterAndDistanceCompute with params.metric == CosineExpanded), ensure the regular (non-balanced) k-means implementation L2-normalizes centroids before computing distances."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/detail/kmeans_mg_batched.cuh` around lines 471 - 487, The
new_centroids produced by finalize_centroids are not L2-normalized for the
CosineExpanded path, so subsequent distance computations use non-unit vectors;
after cuvs::cluster::kmeans::detail::finalize_centroids (and before
compute_centroid_shift and raft::copy) call the centroid normalization routine
for the cosine metric (e.g., invoke the project/normalize kernel that
L2-normalizes the mdspan backing new_centroids or add a new
l2_normalize_centroids<T,IdxT>(new_centroids.view(), n_clusters, n_features,
stream) and execute it only when params.metric == DistanceType::CosineExpanded)
so that new_centroids are unit-length on-device prior to being used as active
centroids (ensure run on the same CUDA stream).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosine is not supported for kmeans

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tarang-jain, thanks for the clarification! My concern was based on a prior learning that referenced the CosineExpanded metric in k-means code paths, but since cosine distance is not supported for k-means in cuVS, the comment does not apply here. I'll record this to avoid flagging it in future reviews.

The prior learning (ID: 10993bc6-23ed-43c0-8159-6c82286234ac) that triggered this false positive may be misleading — would you like me to remove it?


✏️ Learnings added
Learnt from: tarang-jain
Repo: rapidsai/cuvs PR: 2017
File: cpp/src/cluster/detail/kmeans_mg_batched.cuh:471-487
Timestamp: 2026-05-07T15:00:43.816Z
Learning: Cosine distance (DistanceType::CosineExpanded) is not supported for k-means in cuVS (rapidsai/cuvs). Do not flag missing L2 centroid normalization for the cosine metric in any k-means code path, as the metric is unsupported and such checks are not applicable.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: tarang-jain
Repo: rapidsai/cuvs PR: 2001
File: cpp/src/cluster/detail/minClusterDistanceCompute.cu:44-50
Timestamp: 2026-04-22T22:30:56.187Z
Learning: When adding or modifying k-means code paths that support the CosineExpanded metric (e.g., via minClusterAndDistanceCompute with params.metric == CosineExpanded), ensure the regular (non-balanced) k-means implementation L2-normalizes centroids before computing distances. In particular, verify that the code mirrors the expectations implied by the CosineExpanded computation (e.g., using L2NormX with sqrt_op) and does not compute cosine-related distances against non-normalized centroids.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But @viclafargue I do feel that we should be checking for the metric somewhere. So the metric needs to be checked in both kmeans_fit and predict. I think this was being done indirectly via minClusterDistanceCompute.cu earlier, but since we started using the fused path for cosine, cosine will pass through and a metric assertion might be missing.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +601 to +602
auto d_wt_sum = raft::make_device_scalar<DataT>(handle, static_cast<DataT>(n_samples));
if (sample_weight.has_value()) { weightSum(handle, sample_weight.value(), d_wt_sum.view()); }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Missing validation for all-zero device weights.

When sample weights are provided and reside on device, weightSum computes the sum asynchronously but does not validate that the result is positive. If all weights are zero, downstream normalization will divide by zero. The host path in weightSum has this check (RAFT_EXPECTS(wt_sum > DataT{0}, ...)), but the device path does not.

Consider adding a device-to-host copy of d_wt_sum after the weightSum call to validate positivity, or perform the check lazily inside the normalization kernels.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/detail/kmeans.cuh` around lines 601 - 602, When
sample_weight.has_value() and you call weightSum(handle, sample_weight.value(),
d_wt_sum.view()), add a device-to-host copy and validate the sum is positive
before proceeding: after weightSum returns, copy d_wt_sum to a host scalar (or
use raft::device_scalar::copy_to_host) and assert RAFT_EXPECTS(wt_sum_host >
DataT{0}, "sample_weight sum must be > 0") (or throw the same error used in the
host path) so downstream normalization cannot divide by zero; alternatively
perform the positivity check inside the normalization kernel, but the simplest
fix is to copy d_wt_sum to host and perform the RAFT_EXPECTS check immediately
after the weightSum call.

Comment on lines +21 to +26
cuvs::cluster::kmeans::params p = params;
p.init = kmeans::params::InitMethod::Array;
auto sw = std::make_optional(
raft::make_device_vector_view<const DataT, IndexT>(sample_weights.data_handle(), X.extent(0)));
cuvs::cluster::kmeans::detail::kmeans_fit(
handle, p, X, sw, centroids, inertia, n_iter, std::ref(workspace));
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't rebuild sample_weights with X.extent(0).

This discards the caller-provided extent and fabricates a new view sized from X. If the two lengths ever diverge, that turns a validation bug into an out-of-bounds read. Reuse the incoming view directly and assert the extents match once.

Suggested fix
   cuvs::cluster::kmeans::params p = params;
   p.init                          = kmeans::params::InitMethod::Array;
-  auto sw                         = std::make_optional(
-    raft::make_device_vector_view<const DataT, IndexT>(sample_weights.data_handle(), X.extent(0)));
+  RAFT_EXPECTS(sample_weights.extent(0) == X.extent(0),
+               "sample_weights.extent(0) must equal X.extent(0)");
+  auto sw = std::make_optional(sample_weights);
   cuvs::cluster::kmeans::detail::kmeans_fit(
     handle, p, X, sw, centroids, inertia, n_iter, std::ref(workspace));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/cluster/kmeans_impl.cuh` around lines 21 - 26, The code rebuilds
sample_weights using raft::make_device_vector_view with X.extent(0), which can
create an incorrect size and out-of-bounds reads; instead pass the incoming
sample_weights view directly (e.g., use sw = std::make_optional(sample_weights))
and add an assertion that sample_weights.extent(0) == X.extent(0) before calling
cuvs::cluster::kmeans::detail::kmeans_fit; update the use of sw passed into
kmeans_fit and remove the fabricated raft::make_device_vector_view construction.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viclafargue I sort of agree with this. We should be comparing the extend of X and the sample weights if they are passed. Can we add an assertion for this if we already dont have on?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@tarang-jain
Copy link
Copy Markdown
Contributor

The KMeans++ init on only rank zero can be quite problematic.

  1. The shard might not be representative of the whole dataset
  2. The shard's size might be smaller than the user passed init_size parameter.
    I think the rabbit comment makes sense here.

Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
Comment thread cpp/src/cluster/detail/kmeans_mg_batched.cuh Outdated
@zhilizju
Copy link
Copy Markdown

zhilizju commented May 8, 2026

Hi @viclafargue @tarang-jain , thanks for the excellent work on this PR!

We have a large-scale clustering pipeline that requires multi-GPU batched
KMeans — we're running on hundreds of nodes and need this capability to
handle our dataset scale. We're eagerly waiting to adopt this feature
once it lands.

Could you share a rough timeline for when this might be ready to merge?
Do you think the remaining items (rebase on #2015, the KMeans++ init
concern, etc.) could be wrapped up by this weekend, or would it need
another week or so? We'd like to plan our integration work accordingly.

We'd be happy to be among the first adopters — once this is in a testable
state, we can run it against our production workload and provide detailed feedback on correctness,
scalability, and API ergonomics. Let us know if there's a branch or
pre-release build we can start testing against.

Thanks again for pushing this forward!

@cjnolet
Copy link
Copy Markdown
Member

cjnolet commented May 8, 2026

Hi @zhilizju,

This should be merged by early next week, but just know that we are in the process of doing large-scale benchmarks, so those will likely not be complete before merging (this mileage might vary).

This will be part of our 26.06 release, which will be officially released in early June.

Will you be integrating this through c++ or another one of the language wrappers?

@cjnolet
Copy link
Copy Markdown
Member

cjnolet commented May 8, 2026

The shard might not be representative of the whole dataset

@tarang-jain it's usually safe to assume the data is randomly scattered. Kmeans++ is also expensive enough, especially with larger k, that I think doing the initialization on a single partition is going to be a good trade off most of the time. I'd even go as far as to say that we could offer an option for this in the mnmg version, and default it to preferring a single shard. Think of it like taking a simple random sample of a simple random sample (aka a random sample from the entire training set).

@tarang-jain
Copy link
Copy Markdown
Contributor

yeah I think that makes sense -- random sample of a random sample.

Comment thread cpp/tests/cluster/kmeans_mg_batched.cu Outdated
auto X = raft::make_device_matrix<T, int>(clique_, n_samples, n_features);
auto labels = raft::make_device_vector<int, int>(clique_, n_samples);

raft::random::make_blobs<T, int>(X.data_handle(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added a helper in kmeans.cu to prepare the blobs. Can we reuse it? I know that is not a header though, so including it in this TU might not be possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing prepareBlobInputs as a whole was not really possible, but I could create a kmeans_test_blobs.cuh file to make sure that we have some code re-use between the two tests.

d_labels_sg.data(), d_labels_snmg.data(), n_samples, sg_stream);

raft::resource::sync_stream(sg_handle, sg_stream);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you are testing both -- ARI in the random init and bit matching centroids in the array init. That should give us a robust set of tests.

@zhilizju
Copy link
Copy Markdown

zhilizju commented May 8, 2026

Hi @zhilizju,

This should be merged by early next week, but just know that we are in the process of doing large-scale benchmarks, so those will likely not be complete before merging (this mileage might vary).

This will be part of our 26.06 release, which will be officially released in early June.

Will you be integrating this through c++ or another one of the language wrappers?

Thanks @cjnolet! Great to hear it's targeting early next week.

We're already running the RAFT comms + Ray path (via cuML 25.10 _fit(multigpu=True) with NCCL across hundreds of
nodes), so we're very familiar with this architecture. Looking forward to the cuVS native implementation for better performance with host-resident batched streaming.

We'll primarily use the Python wrapper. A couple of questions:

  1. Once merged, will the multi-GPU batched KMeans be accessible from Python with the RAFT comms path? We'd like
    to integrate it into our existing Ray + NCCL setup.
  2. Is there a nightly conda package that will pick this up automatically after merge, or would we need to build from
    source?

@viclafargue viclafargue force-pushed the mg-batched-kmeans branch from c142245 to 41c66b8 Compare May 8, 2026 09:14
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
cpp/tests/cluster/kmeans.cu (2)

552-576: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

These regressions still bypass the new host/batched path.

fitKMeansPlusPlus() and runZeroCost() both call fit() with d_X->view(), so CI is only exercising the existing device-resident overload here. That leaves n_init > 1 and the zero-cost edge case unvalidated for the host-streaming implementation this PR adds.

Also applies to: 617-624

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/tests/cluster/kmeans.cu` around lines 552 - 576, Tests are currently
calling the device-resident overload by passing
raft::make_const_mdspan(d_X->view()) from fitKMeansPlusPlus() (and similarly in
runZeroCost()), so CI never exercises the new host/batched fit path; change
those calls to pass a host-resident/batched view instead (e.g., create or reuse
a host matrix/mdspan for X and call cuvs::cluster::kmeans::fit with
raft::make_const_mdspan(h_X->view()) or the library’s host-mdspan helper) so the
host-streaming overload is invoked (apply same change for the similar call at
lines 617-624) and run with n_init>1 and the zero-cost scenario to validate the
new path.

524-549: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't assert that larger init_size must improve inertia.

For a single KMeans++ trial (n_init == 1), increasing the candidate set from the default subsample to n_samples is not a monotonic guarantee. ASSERT_LE(inertia_full, inertia_default * (1 + rel)) can fail even when the implementation is correct, so this introduces avoidable test flakiness.

Possible adjustment
-    // Full-dataset seeding has at least as much information as the subsample
-    // default, so the converged inertia should not be worse.
-    ASSERT_LE(inertia_full, inertia_default * (T(1) + rel));
+    // `init_size = 0` should resolve to the documented default.
+    // Avoid asserting against `inertia_full`: a larger candidate set is not a
+    // monotonic guarantee for a single KMeans++ run.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/tests/cluster/kmeans.cu` around lines 524 - 549, The test
runInitSizeCompare asserts that inertia_full <= inertia_default*(1+rel), which
is flaky because a single KMeans++ trial (n_init==1) does not guarantee better
inertia with larger init_size; remove that ASSERT_LE(inertia_full,
inertia_default * (T(1) + rel)) or make it conditional on a deterministic
multi-init case (e.g., only check when n_init>1), keeping the other assertions
on finiteness and positivity of inertia_default, inertia_explicit, and
inertia_full unchanged; locate the check in runInitSizeCompare where
inertia_full and inertia_default are compared and remove or guard that
comparison.
🧹 Nitpick comments (1)
cpp/tests/cluster/kmeans.cu (1)

362-365: ⚡ Quick win

Weighted batched coverage only hits the all-ones case.

Every weighted batched test fills kmeans_weight_mode::uniform, so the suite still doesn't exercise zero-sum or sparse-weight inputs. Given the recent weight-handling changes, I'd add at least one host-data case with all-zero weights and one mixed zero/non-zero case to lock down that behavior.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/tests/cluster/kmeans.cu` around lines 362 - 365, The weighted-batched
tests currently only use fill_kmeans_test_weights(...,
kmeans_weight_mode::uniform) so they never exercise zero-sum or sparse weights;
update the test generation in the block guarded by testparams.weighted to add at
least two additional weight scenarios: one where you create host-side all-zero
weights and one mixed zero/non-zero (sparse) weights, transfer them into
d_sample_weight (same device vector used now) and run the same batched KMeans
paths; use or extend fill_kmeans_test_weights (or a new helper) to produce
kmeans_weight_mode variants (e.g., zero/sparse) and ensure the test loop
iterates over these modes so d_sample_weight, the KMeans call, and any
assertions are exercised for zero-sum and mixed-weight cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@cpp/tests/cluster/kmeans.cu`:
- Around line 552-576: Tests are currently calling the device-resident overload
by passing raft::make_const_mdspan(d_X->view()) from fitKMeansPlusPlus() (and
similarly in runZeroCost()), so CI never exercises the new host/batched fit
path; change those calls to pass a host-resident/batched view instead (e.g.,
create or reuse a host matrix/mdspan for X and call cuvs::cluster::kmeans::fit
with raft::make_const_mdspan(h_X->view()) or the library’s host-mdspan helper)
so the host-streaming overload is invoked (apply same change for the similar
call at lines 617-624) and run with n_init>1 and the zero-cost scenario to
validate the new path.
- Around line 524-549: The test runInitSizeCompare asserts that inertia_full <=
inertia_default*(1+rel), which is flaky because a single KMeans++ trial
(n_init==1) does not guarantee better inertia with larger init_size; remove that
ASSERT_LE(inertia_full, inertia_default * (T(1) + rel)) or make it conditional
on a deterministic multi-init case (e.g., only check when n_init>1), keeping the
other assertions on finiteness and positivity of inertia_default,
inertia_explicit, and inertia_full unchanged; locate the check in
runInitSizeCompare where inertia_full and inertia_default are compared and
remove or guard that comparison.

---

Nitpick comments:
In `@cpp/tests/cluster/kmeans.cu`:
- Around line 362-365: The weighted-batched tests currently only use
fill_kmeans_test_weights(..., kmeans_weight_mode::uniform) so they never
exercise zero-sum or sparse weights; update the test generation in the block
guarded by testparams.weighted to add at least two additional weight scenarios:
one where you create host-side all-zero weights and one mixed zero/non-zero
(sparse) weights, transfer them into d_sample_weight (same device vector used
now) and run the same batched KMeans paths; use or extend
fill_kmeans_test_weights (or a new helper) to produce kmeans_weight_mode
variants (e.g., zero/sparse) and ensure the test loop iterates over these modes
so d_sample_weight, the KMeans call, and any assertions are exercised for
zero-sum and mixed-weight cases.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: a5efc988-95ff-49dd-ab2d-5c184e918f55

📥 Commits

Reviewing files that changed from the base of the PR and between add9db1 and 41c66b8.

📒 Files selected for processing (4)
  • cpp/src/cluster/detail/kmeans_mg_batched.cuh
  • cpp/tests/cluster/kmeans.cu
  • cpp/tests/cluster/kmeans_mg_batched.cu
  • cpp/tests/cluster/kmeans_test_blobs.cuh
✅ Files skipped from review due to trivial changes (1)
  • cpp/tests/cluster/kmeans_test_blobs.cuh
🚧 Files skipped from review as they are similar to previous changes (2)
  • cpp/tests/cluster/kmeans_mg_batched.cu
  • cpp/src/cluster/detail/kmeans_mg_batched.cuh

@viclafargue
Copy link
Copy Markdown
Contributor Author

Hi @zhilizju,

Is there a nightly conda package that will pick this up automatically after merge, or would we need to build from source?

The cuVS nightly package will be available following the merge. However, it will only offer a C++ interface.

Once merged, will the multi-GPU batched KMeans be accessible from Python with the RAFT comms path? We'd like to integrate it into our existing Ray + NCCL setup.

The plan so far would be to have the Python wrapper be placed in cuML in similar fashion to the regular MG KMeans.

@viclafargue
Copy link
Copy Markdown
Contributor Author

/ok to test 41c66b8

@viclafargue
Copy link
Copy Markdown
Contributor Author

/ok to test f664c2c

@viclafargue viclafargue requested a review from tarang-jain May 8, 2026 17:01
// shadow into pinned host. Consumed at top of next iteration.
cuvs::cluster::kmeans::detail::compute_centroid_shift<T, IdxT>(
dev_res,
raft::make_const_mdspan(rank_centroids.view()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use rank_centroids_const here. The const view already exists.

Copy link
Copy Markdown
Contributor

@jinsolp jinsolp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viclafargue ! No major concerns from my side

RAFT_EXPECTS(centroids.extent(1) == n_features, "centroids.extent(1) must equal n_features");
RAFT_EXPECTS(num_ranks > 0, "num_ranks must be positive");

RAFT_LOG_DEBUG("SNMG KMeans fit: rank=%d/%d, n_local=%zu, n_features=%zu, n_clusters=%d",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe change log names to MNMG Kmeans to match the function name?

Comment on lines +370 to +372
if (rank == 0) {
cuvs::cluster::kmeans::detail::init_centroids_from_host_sample<T, IdxT>(
dev_res, iter_params, streaming_batch_size, X_local, rank_centroids.view(), workspace);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document that we require a well-shuffled dataset to run MG kmeans? Otherwise this will end up doing bad init because is only uses rank0 local data.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely should go one step further here and add an option to switch between "use kmeans++ globally and use kmeans++ locally".

Comment on lines +604 to +605
raft::copy(
centroids.data_handle(), rank_centroids.data_handle(), n_clusters * n_features, stream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we sync before returning if the caller expects centroids to be ready?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[FEA] Multi-node Multi-GPU Kmeans (C++) to support new out-of-core batching

5 participants