diff --git a/AnnService/inc/Core/Common/BKTree.h b/AnnService/inc/Core/Common/BKTree.h index 1586cb846..ac969989a 100644 --- a/AnnService/inc/Core/Common/BKTree.h +++ b/AnnService/inc/Core/Common/BKTree.h @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include "inc/Core/VectorIndex.h" @@ -231,9 +233,7 @@ namespace SPTAG std::vector mythreads; mythreads.reserve(args._TH); - for (int tid = 0; tid < args._TH; tid++) - { - mythreads.emplace_back([tid, first, last, updateCenters, lambda, subsize, &data, &indices, &args, &currDist]() { + auto kmeansWorker = [&](int tid) { SizeType istart = first + tid * subsize; SizeType iend = min(first + (tid + 1) * subsize, last); SizeType *inewCounts = args.newCounts + tid * args._K; @@ -295,13 +295,14 @@ namespace SPTAG } } COMMON::Utils::atomic_float_add(&currDist, idist); - }); - } - for (auto &t : mythreads) - { - t.join(); + }; + if (args._TH <= 1) { + kmeansWorker(0); // serial fast-path: avoid spawning/joining a thread per node + } else { + for (int tid = 0; tid < args._TH; tid++) mythreads.emplace_back(kmeansWorker, tid); + for (auto &t : mythreads) t.join(); + mythreads.clear(); } - mythreads.clear(); for (int i = 1; i < args._TH; i++) { for (int k = 0; k < args._DK; k++) { args.newCounts[k] += args.newCounts[i * args._K + k]; @@ -527,7 +528,7 @@ break; class BKTree { public: - BKTree(): m_iTreeNumber(1), m_iBKTKmeansK(32), m_iBKTLeafSize(8), m_iSamples(1000), m_fBalanceFactor(-1.0f), m_bfs(0), m_lock(new std::shared_timed_mutex), m_pQuantizer(nullptr) {} + BKTree(): m_iTreeNumber(1), m_iBKTKmeansK(32), m_iBKTLeafSize(8), m_iSamples(1000), m_fBalanceFactor(-1.0f), m_bfs(0), m_lock(new std::shared_timed_mutex), m_pQuantizer(nullptr), m_parallelBuild(false), m_treeLock(new std::mutex), m_sampleMapLock(new std::mutex) {} BKTree(const BKTree& other): m_iTreeNumber(other.m_iTreeNumber), m_iBKTKmeansK(other.m_iBKTKmeansK), @@ -536,7 +537,10 @@ break; m_fBalanceFactor(other.m_fBalanceFactor), m_lock(new std::shared_timed_mutex), m_pQuantizer(other.m_pQuantizer), - m_bfs(0) {} + m_bfs(0), + m_parallelBuild(other.m_parallelBuild), + m_treeLock(new std::mutex), + m_sampleMapLock(new std::mutex) {} ~BKTree() {} inline const BKTNode& operator[](SizeType index) const { return m_pTreeRoots[index]; } @@ -570,11 +574,161 @@ break; m_pSampleCenterMap.swap(newTrees.m_pSampleCenterMap); } + struct BKTJob { + SizeType index, first, last; + bool debug; + BKTJob(SizeType index_ = 0, SizeType first_ = 0, SizeType last_ = 0, bool debug_ = false) + : index(index_), first(first_), last(last_), debug(debug_) {} + }; + + // Cluster ONE node into children. The heavy clustering runs lock-free; only the + // structural commit to m_pTreeRoots / m_pSampleCenterMap is done under locks. + // Sibling nodes own disjoint [first,last) ranges of localindices (incl. the centerid + // slot at `last`), so concurrent calls on different nodes never overlap. + template + void ProcessOneNode(const Dataset& data, std::vector& localindices, + KmeansArgs& args, BKTJob item, std::vector* reverseIndices, + bool dynamicK, IAbortOperation* abort, std::vector& outChildren) + { + outChildren.clear(); + if (item.last - item.first <= m_iBKTLeafSize) { + std::lock_guard lk(*m_treeLock); + m_pTreeRoots[item.index].childStart = (SizeType)m_pTreeRoots.size(); + for (SizeType j = item.first; j < item.last; j++) { + SizeType cid = (reverseIndices == nullptr) ? localindices[j] : reverseIndices->at(localindices[j]); + m_pTreeRoots.emplace_back(cid); + } + m_pTreeRoots[item.index].childEnd = (SizeType)m_pTreeRoots.size(); + return; + } + + if (dynamicK) { + args._DK = std::min((item.last - item.first) / m_iBKTLeafSize + 1, m_iBKTKmeansK); + args._DK = std::max(args._DK, 2); + } + + int numClusters = KmeansClustering(data, localindices, item.first, item.last, args, + m_iSamples, m_fBalanceFactor, item.debug, abort); + + std::lock_guard lk(*m_treeLock); + m_pTreeRoots[item.index].childStart = (SizeType)m_pTreeRoots.size(); + if (numClusters <= 1) { + SizeType end = min(item.last + 1, (SizeType)localindices.size()); + std::sort(localindices.begin() + item.first, localindices.begin() + end); + SizeType center = (reverseIndices == nullptr) ? localindices[item.first] : reverseIndices->at(localindices[item.first]); + m_pTreeRoots[item.index].centerid = center; + m_pTreeRoots[item.index].childStart = -m_pTreeRoots[item.index].childStart; + std::lock_guard sm(*m_sampleMapLock); + for (SizeType j = item.first + 1; j < end; j++) { + SizeType cid = (reverseIndices == nullptr) ? localindices[j] : reverseIndices->at(localindices[j]); + m_pTreeRoots.emplace_back(cid); + m_pSampleCenterMap[cid] = center; + } + m_pSampleCenterMap[-1 - center] = item.index; + } + else { + SizeType maxCount = 0; + for (int k = 0; k < m_iBKTKmeansK; k++) if (args.counts[k] > maxCount) maxCount = args.counts[k]; + SizeType first = item.first; + for (int k = 0; k < m_iBKTKmeansK; k++) { + if (args.counts[k] == 0) continue; + SizeType cid = (reverseIndices == nullptr) ? localindices[first + args.counts[k] - 1] : reverseIndices->at(localindices[first + args.counts[k] - 1]); + SizeType childIdx = (SizeType)m_pTreeRoots.size(); + m_pTreeRoots.emplace_back(cid); + if (args.counts[k] > 1) + outChildren.emplace_back(childIdx, first, first + args.counts[k] - 1, item.debug && (args.counts[k] == maxCount)); + first += args.counts[k]; + } + } + m_pTreeRoots[item.index].childEnd = (SizeType)m_pTreeRoots.size(); + } + + // Two-phase parallel build: + // Phase 1 (top): serial node schedule, each node clustered with all threads + // (intra-node data parallelism handles the few large nodes well). + // Phase 2 (bottom): subtrees of size <= cutoff are built in parallel, one worker per + // subtree (per-worker args, _TH=1), sharing the single N-sized label. template - void BuildTrees(const Dataset& data, DistCalcMethod distMethod, int numOfThreads, - std::vector* indices = nullptr, std::vector* reverseIndices = nullptr, + void BuildTreesParallel(const Dataset& data, DistCalcMethod distMethod, int numOfThreads, + std::vector* indices, std::vector* reverseIndices, + bool dynamicK, IAbortOperation* abort) + { + std::vector localindices; + if (indices == nullptr) { + localindices.resize(data.R()); + for (SizeType i = 0; i < (SizeType)localindices.size(); i++) localindices[i] = i; + } + else { + localindices.assign(indices->begin(), indices->end()); + } + KmeansArgs args(m_iBKTKmeansK, data.C(), (SizeType)localindices.size(), numOfThreads, distMethod, m_pQuantizer); + if (m_fBalanceFactor < 0) m_fBalanceFactor = DynamicFactorSelect(data, localindices, 0, (SizeType)localindices.size(), args, m_iSamples); + + std::mt19937 rg; + m_pSampleCenterMap.clear(); + for (char t = 0; t < m_iTreeNumber; t++) { + std::shuffle(localindices.begin(), localindices.end(), rg); + m_pTreeStart.push_back((SizeType)m_pTreeRoots.size()); + m_pTreeRoots.emplace_back((SizeType)localindices.size()); + SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Start to build BKTree %d (parallel)\n", t + 1); + + const SizeType cutoff = std::max((SizeType)(m_iBKTLeafSize * 4), + (SizeType)(localindices.size() / ((size_t)numOfThreads * 16) + 1)); + + // ---- Phase 1: serial schedule + intra-node multithread (shared args) ---- + std::vector deferred; + std::stack ss; + ss.push(BKTJob(m_pTreeStart[t], 0, (SizeType)localindices.size(), true)); + while (!ss.empty()) { + if (abort && abort->ShouldAbort()) return; + BKTJob job = ss.top(); ss.pop(); + std::vector kids; + ProcessOneNode(data, localindices, args, job, reverseIndices, dynamicK, abort, kids); + for (size_t c = 0; c < kids.size(); c++) { + if (kids[c].last - kids[c].first <= cutoff) deferred.push_back(kids[c]); + else ss.push(kids[c]); + } + } + + // ---- Phase 2: node-level parallelism over small subtrees ---- + int* sharedLabel = args.label; // workers write disjoint [first,last) slices + std::atomic nextJob(0); + std::vector pool; + pool.reserve(numOfThreads); + for (int w = 0; w < numOfThreads; w++) { + pool.emplace_back([&]() { + KmeansArgs wargs(m_iBKTKmeansK, data.C(), 1, 1, distMethod, m_pQuantizer); + delete[] wargs.label; wargs.label = sharedLabel; // share N-sized label + std::stack wss; + for (size_t idx = nextJob.fetch_add(1); idx < deferred.size(); idx = nextJob.fetch_add(1)) { + if (abort && abort->ShouldAbort()) break; + wss.push(deferred[idx]); + while (!wss.empty()) { + BKTJob job = wss.top(); wss.pop(); + std::vector kids; + ProcessOneNode(data, localindices, wargs, job, reverseIndices, dynamicK, abort, kids); + for (size_t c = 0; c < kids.size(); c++) wss.push(kids[c]); + } + } + wargs.label = nullptr; // don't double-free the shared label in dtor + }); + } + for (size_t w = 0; w < pool.size(); w++) pool[w].join(); + + m_pTreeRoots.emplace_back(-1); + SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "%d BKTree built (parallel), %zu %zu\n", t + 1, m_pTreeRoots.size() - m_pTreeStart[t], localindices.size()); + } + } + + template + void BuildTrees(const Dataset& data, DistCalcMethod distMethod, int numOfThreads, + std::vector* indices = nullptr, std::vector* reverseIndices = nullptr, bool dynamicK = false, IAbortOperation* abort = nullptr) { + if (m_parallelBuild && numOfThreads > 1) { + BuildTreesParallel(data, distMethod, numOfThreads, indices, reverseIndices, dynamicK, abort); + return; + } struct BKTStackItem { SizeType index, first, last; bool debug; @@ -865,6 +1019,11 @@ break; int m_iTreeNumber, m_iBKTKmeansK, m_iBKTLeafSize, m_iSamples, m_bfs; float m_fBalanceFactor; std::shared_ptr m_pQuantizer; + + // Parallel BKT build (node-level parallelism for SelectHead). Default off. + bool m_parallelBuild; + std::unique_ptr m_treeLock; // guards m_pTreeRoots structural writes + std::unique_ptr m_sampleMapLock; // guards m_pSampleCenterMap writes }; } } diff --git a/AnnService/inc/Core/Common/CommonUtils.h b/AnnService/inc/Core/Common/CommonUtils.h index 23a59c30e..c2dc04f25 100644 --- a/AnnService/inc/Core/Common/CommonUtils.h +++ b/AnnService/inc/Core/Common/CommonUtils.h @@ -16,6 +16,7 @@ #include #include #include +#include #define PREFETCH @@ -33,7 +34,11 @@ namespace SPTAG public: static SizeType rand(SizeType high = MaxSize, SizeType low = 0) // Generates a random int value. { - return low + (SizeType)(float(high - low)*(std::rand() / (RAND_MAX + 1.0))); + // thread_local generator: thread-safe (the old std::rand() shares hidden global + // state and races under concurrent BKT building). Sequences differ from the legacy + // std::rand() version (equivalent quality, not bit-identical to old builds). + static thread_local std::mt19937 g(std::random_device{}()); + return low + (SizeType)(float(high - low) * (g() / (g.max() + 1.0))); } static inline float atomic_float_add(volatile float* ptr, const float operand) diff --git a/AnnService/inc/Core/SPANN/Options.h b/AnnService/inc/Core/SPANN/Options.h index f49621230..387859b57 100644 --- a/AnnService/inc/Core/SPANN/Options.h +++ b/AnnService/inc/Core/SPANN/Options.h @@ -70,6 +70,7 @@ namespace SPTAG { bool m_recursiveCheckSmallCluster; bool m_printSizeCount; std::string m_selectType; + bool m_selectHeadParallel; // Section 3: for build head bool m_buildHead; diff --git a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h index 0a88e3f1d..ee3b1de0a 100644 --- a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h +++ b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h @@ -62,6 +62,7 @@ DefineSelectHeadParameter(m_headVectorCount, int, 0, "Count") DefineSelectHeadParameter(m_recursiveCheckSmallCluster, bool, true, "RecursiveCheckSmallCluster") DefineSelectHeadParameter(m_printSizeCount, bool, true, "PrintSizeCount") DefineSelectHeadParameter(m_selectType, std::string, "BKT", "SelectHeadType") +DefineSelectHeadParameter(m_selectHeadParallel, bool, false, "ParallelBuildBKT") #endif #ifdef DefineBuildHeadParameter diff --git a/AnnService/inc/Helper/ConcurrentSet.h b/AnnService/inc/Helper/ConcurrentSet.h index 49210a29e..9c9f534f5 100644 --- a/AnnService/inc/Helper/ConcurrentSet.h +++ b/AnnService/inc/Helper/ConcurrentSet.h @@ -16,6 +16,7 @@ #include #include #include +#include #endif // TBB #else #include @@ -47,9 +48,10 @@ namespace SPTAG template class ConcurrentSet { + public: typedef typename std::unordered_set::iterator iterator; + typedef typename std::unordered_set::const_iterator const_iterator; - public: ConcurrentSet() { m_lock.reset(new std::shared_timed_mutex); } ~ConcurrentSet() {} @@ -72,6 +74,14 @@ namespace SPTAG return m_data.insert(key); } + // Unsafe iteration: caller must ensure no concurrent modification. + // Mirrors the semantics of tbb::concurrent_unordered_set::unsafe_begin/unsafe_end, + // which (unlike the locked accessors above) do not synchronize. + iterator begin() { return m_data.begin(); } + iterator end() { return m_data.end(); } + const_iterator begin() const { return m_data.begin(); } + const_iterator end() const { return m_data.end(); } + private: std::unique_ptr m_lock; std::unordered_set m_data; @@ -80,9 +90,10 @@ namespace SPTAG template class ConcurrentMap { + public: typedef typename std::unordered_map::iterator iterator; + typedef typename std::unordered_map::value_type value_type; - public: ConcurrentMap(int capacity = 8) { m_lock.reset(new std::shared_timed_mutex); m_data.reserve(capacity); } ~ConcurrentMap() {} @@ -127,6 +138,8 @@ namespace SPTAG class ConcurrentQueue { public: + typedef typename std::deque::iterator iterator; + typedef typename std::deque::const_iterator const_iterator; ConcurrentQueue() {} @@ -135,7 +148,7 @@ namespace SPTAG void push(const T& j) { std::lock_guard lock(m_lock); - m_queue.push(j); + m_queue.push_back(j); } bool try_pop(T& j) @@ -145,13 +158,36 @@ namespace SPTAG return false; } j = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); return true; } + // The TBB concurrent_queue exposes empty() and unsafe_size() as + // best-effort, lock-free queries. Here we take the lock so the + // snapshot is consistent; callers should still treat the result + // as advisory in concurrent contexts. + bool empty() const + { + std::lock_guard lock(m_lock); + return m_queue.empty(); + } + + size_t unsafe_size() const + { + std::lock_guard lock(m_lock); + return m_queue.size(); + } + + // Unsafe iteration: caller must ensure no concurrent modification, + // matching tbb::concurrent_queue::unsafe_begin/unsafe_end semantics. + iterator unsafe_begin() { return m_queue.begin(); } + iterator unsafe_end() { return m_queue.end(); } + const_iterator unsafe_begin() const { return m_queue.begin(); } + const_iterator unsafe_end() const { return m_queue.end(); } + protected: - std::mutex m_lock; - std::queue m_queue; + mutable std::mutex m_lock; + std::deque m_queue; }; template @@ -161,7 +197,7 @@ namespace SPTAG ConcurrentPriorityQueue() {} ~ConcurrentPriorityQueue() {} - size_type size() const { + size_t size() const { std::lock_guard lock(m_lock); return m_queue.size(); } @@ -178,11 +214,11 @@ namespace SPTAG } value = m_queue.top(); m_queue.pop(); - return true; + return true; } private: - std::mutex m_lock; + mutable std::mutex m_lock; std::priority_queue m_queue; }; #endif // TBB diff --git a/AnnService/inc/Helper/Logging.h b/AnnService/inc/Helper/Logging.h index 221f4dfa5..d5e379481 100644 --- a/AnnService/inc/Helper/Logging.h +++ b/AnnService/inc/Helper/Logging.h @@ -37,7 +37,14 @@ namespace SPTAG class LoggerHolder { -#if ((defined(_MSVC_LANG) && _MSVC_LANG >= 202002L) || __cplusplus >= 202002L) + // The C++20 path uses `std::atomic>`, which the + // libc++ shipped with ClickHouse rejects (it requires the value type to + // be trivially copyable). The pre-C++20 path with `std::atomic_load` / + // `std::atomic_store` works under both standards, so force it + // unconditionally — same workaround SPTAG-cmake uses for `_sptag` via + // `-std=c++17`, but applied at the header so consumers compiled at + // higher standards (e.g. ClickHouse's `dbms` at C++23) are unaffected. +#if 0 private: std::atomic> m_logger; public: diff --git a/AnnService/src/Core/SPANN/SPANNIndex.cpp b/AnnService/src/Core/SPANN/SPANNIndex.cpp index 0c34a17b8..7a81c4f98 100644 --- a/AnnService/src/Core/SPANN/SPANNIndex.cpp +++ b/AnnService/src/Core/SPANN/SPANNIndex.cpp @@ -939,6 +939,7 @@ bool Index::SelectHeadInternal(std::shared_ptr &p_re bkt->m_iTreeNumber = m_options.m_iTreeNumber; bkt->m_fBalanceFactor = m_options.m_fBalanceFactor; bkt->m_pQuantizer = m_pQuantizer; + bkt->m_parallelBuild = m_options.m_selectHeadParallel; // node-level parallel BKT build (default off) SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Start invoking BuildTrees.\n"); SPTAGLIB_LOG( Helper::LogLevel::LL_Info, diff --git a/AnnService/src/Core/VectorIndex.cpp b/AnnService/src/Core/VectorIndex.cpp index 2f8ebfd13..5b7bd5270 100644 --- a/AnnService/src/Core/VectorIndex.cpp +++ b/AnnService/src/Core/VectorIndex.cpp @@ -1185,7 +1185,7 @@ void VectorIndex::ApproximateRNG(std::shared_ptr &fullVectors, std::u default: SPTAGLIB_LOG( Helper::LogLevel::LL_Error, "Unable to get quantizer reconstruct type %s", - Helper::Convert::ConvertToString(m_pQuantizer->GetReconstructType())); + Helper::Convert::ConvertToString(m_pQuantizer->GetReconstructType()).c_str()); } } else diff --git a/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp b/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp index 8bf9fb55c..ef1439821 100644 --- a/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp +++ b/AnnService/src/Helper/VectorSetReaders/TxtReader.cpp @@ -5,7 +5,9 @@ #include "inc/Core/VectorIndex.h" #include "inc/Helper/CommonHelper.h" #include "inc/Helper/StringConvert.h" -#include +// was included historically but no OpenMP primitives are actually used +// in this translation unit. Drop it — ClickHouse's contrib build does not +// configure an OpenMP runtime. using namespace SPTAG; using namespace SPTAG::Helper;