Skip to content

Commit ed1a608

Browse files
fix: fix requested changes and add sync in profiler
1 parent 0053fa8 commit ed1a608

File tree

10 files changed

+70
-41
lines changed

10 files changed

+70
-41
lines changed

infini_train/include/nn/parallel/distributed_data_parallel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class DistributedDataParallel : public nn::Module {
2020
std::vector<std::shared_ptr<Tensor>> Forward(const std::vector<std::shared_ptr<Tensor>> &input_tensors) override;
2121

2222
private:
23-
std::shared_ptr<Reducer> reducer_;
23+
std::shared_ptr<Reducer> reducer_ = nullptr;
2424
};
2525

2626
} // namespace infini_train::nn::parallel

infini_train/include/nn/parallel/process_group.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
#endif
1212

1313
#include "infini_train/include/nn/parallel/reduce_op_type.h"
14-
#include "infini_train/include/nn/parallel/work.h"
1514

1615
namespace infini_train {
1716
class Tensor;
1817
class Device;
1918
namespace nn {
2019
class Module;
21-
}
20+
namespace parallel {
21+
class Work;
22+
} // namespace parallel
23+
} // namespace nn
2224

2325
} // namespace infini_train
2426

infini_train/include/nn/parallel/reducer.h

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,34 @@
11
#pragma once
22

3+
#include <atomic>
34
#include <memory>
45
#include <mutex>
56
#include <vector>
67

7-
#include "infini_train/include/autograd/function_hook.h"
8+
#include "infini_train/include/datatype.h"
89
#include "infini_train/include/nn/parallel/parallel_functional.h"
9-
#include "infini_train/include/tensor.h"
10+
11+
namespace infini_train {
12+
class Tensor;
13+
class Device;
14+
namespace autograd {
15+
class PostAccumulateGradHook;
16+
} // namespace autograd
17+
} // namespace infini_train
1018

1119
namespace infini_train::nn::parallel {
20+
namespace {
21+
constexpr int kFirstBucketCapMB = 25;
22+
constexpr int kNormalBucketCapMB = 25;
23+
constexpr size_t kBytesPerMB = 1024ULL * 1024ULL;
24+
} // namespace
1225

1326
// GradBucket passes bucket contents tensor to DDP communication hook.
1427
// ref: https://github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/comm.hpp
1528
class GradBucket {
1629
public:
1730
explicit GradBucket(const std::vector<std::shared_ptr<Tensor>> &tensors) : tensors_(tensors) {}
18-
const std::vector<std::shared_ptr<Tensor>> &getTensors() const { return tensors_; }
31+
const std::vector<std::shared_ptr<Tensor>> &tensors() const { return tensors_; }
1932

2033
private:
2134
std::vector<std::shared_ptr<Tensor>> tensors_;
@@ -34,11 +47,15 @@ struct ReducerOptions {
3447
// Ref: https://docs.pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html
3548

3649
// Max capacity for each bucket(in MB)
37-
size_t first_bucket_cap_mb = 128;
38-
size_t normal_bucket_cap_mb = 512;
50+
size_t first_bucket_cap_mb = kFirstBucketCapMB;
51+
size_t normal_bucket_cap_mb = kNormalBucketCapMB;
3952

4053
// When set true, map param.grad directly to the slice of bucket.flat(same address in memory) instead of memcpy
4154
bool gradient_as_bucket_view = true;
55+
56+
// Whether to enable gradient bucketing
57+
// FIXME(zbl): should enable gradient bucketing by default
58+
bool gradient_bucketing_enabled = true;
4259
};
4360

4461
// DDP Reducer that handles gradient bucketing in backward
@@ -60,7 +77,7 @@ class Reducer : public std::enable_shared_from_this<Reducer> {
6077
// Prepare bucket info for next step
6178
void PrepareForBackward();
6279

63-
// For custom DDP hook to overwrite the default AllReduce. T
80+
// For custom DDP hook to overwrite the default AllReduce.
6481
// This can be used for algorithms like Gradient Compression/GossipGrad.
6582
// Hook is registered using `Reducer::RegisterCommHook()`.
6683
// TODO(zbl): Leave the placeholder for the moment

infini_train/include/tensor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ class Tensor : public std::enable_shared_from_this<Tensor> {
196196
std::shared_ptr<Tensor> RequiresGrad();
197197

198198
std::shared_ptr<Tensor> grad() const;
199-
void set_grad(std::shared_ptr<Tensor> grad);
199+
void set_grad(std::shared_ptr<Tensor> &grad);
200200

201201
bool requires_grad() const;
202202
void set_requires_grad(bool requires_grad);

infini_train/src/autograd/accumulate.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ AccumulateGrad::Backward(const std::vector<std::shared_ptr<Tensor>> &grad_output
3636
kernel.Call<void>(grad_output, learning_rate_, grad);
3737
}
3838
} else {
39-
// NOTE(zbl): check whether need to do copying instead of slicing
39+
// FIXME(zbl): check whether need to do copying instead of slicing
4040
auto new_grad = std::make_shared<Tensor>(*grad_output.get(), 0, grad_output->Dims());
41-
tensor_->set_grad(std::move(new_grad));
41+
tensor_->set_grad(new_grad);
4242
}
4343
auto hook = tensor_->post_accumulate_grad_hook();
4444
if (hook != nullptr) {

infini_train/src/nn/parallel/distributed_data_parallel.cc

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,32 @@ constexpr char kModuleName[] = "module";
2020
DistributedDataParallel::DistributedDataParallel(std::shared_ptr<nn::Module> module, int device_id,
2121
const ReducerOptions &opts) {
2222
for (auto &param : module->Parameters()) {
23-
CHECK_EQ(param->GetDevice()->Index(), device_id) << "All parameters must be on the same device as the module";
23+
auto device = param->GetDevice();
24+
CHECK_EQ(device->Index(), device_id) << "All parameters must be on the same device as the module";
25+
if (!opts.gradient_bucketing_enabled) {
26+
auto ddp_pg
27+
= ProcessGroupFactory::Instance()->Get(GetDataParallelProcessGroupName(device->rank().thread_rank()));
28+
auto hook = std::make_unique<infini_train::autograd::AllReducePostAccumulateHook>(
29+
function::ReduceOpType::kAvg, ddp_pg);
30+
param->RegisterPostAccumulateGradHook(std::move(hook));
31+
}
2432
}
2533
for (auto &buffer : module->Buffers()) {
2634
CHECK_EQ(buffer->GetDevice()->Index(), device_id) << "All buffers must be on the same device as the module";
2735
}
2836
modules_[kModuleName] = std::move(module);
2937

30-
// Bucket Assignment
31-
auto params = modules_[kModuleName]->Parameters();
32-
const size_t first_cap_bytes = opts.first_bucket_cap_mb * 1024ULL * 1024ULL;
33-
const size_t normal_cap_bytes = opts.normal_bucket_cap_mb * 1024ULL * 1024ULL;
34-
std::vector<size_t> bucket_size_limits = {first_cap_bytes, normal_cap_bytes};
35-
auto bucket_indices = ComputeBucketAssignmentBySize(params, bucket_size_limits);
38+
if (opts.gradient_bucketing_enabled) {
39+
// Bucket Assignment
40+
auto params = modules_[kModuleName]->Parameters();
41+
const size_t first_cap_bytes = opts.first_bucket_cap_mb * kBytesPerMB;
42+
const size_t normal_cap_bytes = opts.normal_bucket_cap_mb * kBytesPerMB;
43+
std::vector<size_t> bucket_size_limits = {first_cap_bytes, normal_cap_bytes};
44+
auto bucket_indices = ComputeBucketAssignmentBySize(params, bucket_size_limits);
3645

37-
reducer_ = std::make_shared<Reducer>(params, bucket_indices, opts);
38-
reducer_->AttachHooksToParameters();
46+
reducer_ = std::make_shared<Reducer>(params, bucket_indices, opts);
47+
reducer_->AttachHooksToParameters();
48+
}
3949
}
4050

4151
std::vector<std::shared_ptr<Tensor>>

infini_train/src/nn/parallel/process_group.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include "infini_train/include/nn/parallel/process_group.h"
22

3-
#include <algorithm>
43
#include <numeric>
54
#include <vector>
65

@@ -14,6 +13,7 @@
1413
#include "infini_train/include/datatype.h"
1514
#include "infini_train/include/device.h"
1615
#include "infini_train/include/nn/parallel/global.h"
16+
#include "infini_train/include/nn/parallel/work.h"
1717
#include "infini_train/include/tensor.h"
1818

1919
namespace infini_train {
@@ -57,8 +57,8 @@ ProcessGroup::ProcessGroup(const std::vector<int> &device_indices) : comm_size_(
5757

5858
device->SetDevice();
5959
int low, high;
60-
cudaDeviceGetStreamPriorityRange(&low, &high);
61-
cudaStreamCreateWithPriority(&comm_streams_[i], cudaStreamNonBlocking, high);
60+
CUDA_CHECK(cudaDeviceGetStreamPriorityRange(&low, &high));
61+
CUDA_CHECK(cudaStreamCreateWithPriority(&comm_streams_[i], cudaStreamNonBlocking, high));
6262
device_stream_map_[device] = comm_streams_[i];
6363
}
6464

infini_train/src/nn/parallel/reducer.cc

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
#include "infini_train/include/autograd/function_hook.h"
1616
#include "infini_train/include/common/cuda/common_cuda.h"
17-
#include "infini_train/include/device.h"
1817
#include "infini_train/include/nn/parallel/utils.h"
1918
#include "infini_train/include/nn/parallel/work.h"
19+
#include "infini_train/include/tensor.h"
2020

2121
namespace infini_train::nn::parallel {
2222
namespace {
@@ -106,7 +106,6 @@ std::vector<std::vector<size_t>> ComputeBucketAssignmentBySize(const std::vector
106106
return (std::hash<int>()(k.dev) << 1) ^ std::hash<int>()(static_cast<int>(k.dtype));
107107
}
108108
};
109-
auto key_of = [&](size_t i) -> Key { return Key{tensors[i]->GetDevice()->Index(), tensors[i]->Dtype()}; };
110109

111110
// Maintain the current state of each bucket
112111
struct State {
@@ -117,8 +116,6 @@ std::vector<std::vector<size_t>> ComputeBucketAssignmentBySize(const std::vector
117116

118117
std::unordered_map<Key, State, KeyHash> states;
119118
std::vector<Key> key_order;
120-
// NOTE(zbl): Assume combinations of (device, dtype) <= 8
121-
states.reserve(8);
122119

123120
std::vector<std::vector<size_t>> buckets_all;
124121
buckets_all.reserve(tensors.size());
@@ -130,9 +127,7 @@ std::vector<std::vector<size_t>> ComputeBucketAssignmentBySize(const std::vector
130127
}
131128
};
132129

133-
auto current_cap = [&](const State &s) -> size_t { return bucket_size_limits[s.limit_idx]; };
134-
135-
auto flush_current_bucket = [&](State &s) {
130+
auto flushCurrentBucket = [&](State &s) {
136131
if (!s.current_tensors.empty()) {
137132
buckets_all.push_back(std::move(s.current_tensors));
138133
s.current_tensors.clear();
@@ -146,7 +141,7 @@ std::vector<std::vector<size_t>> ComputeBucketAssignmentBySize(const std::vector
146141
const auto &tensor = tensors[idx_in_order];
147142
CHECK(tensor);
148143

149-
const Key k = key_of(idx_in_order);
144+
const Key k = Key{tensors[idx_in_order]->GetDevice()->Index(), tensors[idx_in_order]->Dtype()};
150145
auto it = states.find(k);
151146
if (it == states.end()) {
152147
it = states.emplace(k, State{}).first;
@@ -156,20 +151,20 @@ std::vector<std::vector<size_t>> ComputeBucketAssignmentBySize(const std::vector
156151

157152
const size_t element_size_in_bytes = kDataTypeToSize.at(tensor->Dtype());
158153
const size_t bytes = tensor->NumElements() * element_size_in_bytes;
159-
const size_t cap = current_cap(state);
154+
const size_t cap = bucket_size_limits[state.limit_idx];
160155

161156
// Assign current tensor to current bucket first
162157
state.current_tensors.push_back(idx_in_order);
163158
state.current_bytes += bytes;
164159

165160
// If current bucket is out of capacity, then flush and move on to the next bucket
166161
if (state.current_bytes >= cap) {
167-
flush_current_bucket(state);
162+
flushCurrentBucket(state);
168163
}
169164
}
170165

171166
// Flush the last bucket of each group manually
172-
for (auto &key : key_order) { flush_current_bucket(states[key]); }
167+
for (auto &key : key_order) { flushCurrentBucket(states[key]); }
173168

174169
return buckets_all;
175170
}
@@ -215,6 +210,7 @@ void Reducer::BuildBuckets(const std::vector<std::vector<size_t>> &bucket_indice
215210
CHECK(!bucket_indices[bucket_idx].empty());
216211
const auto &first_param = params_[bucket_indices[bucket_idx][0]];
217212
bucket.dtype = first_param->Dtype();
213+
// FIXME(zbl): use global_rank() in multi-node settings
218214
bucket.device_rank = first_param->GetDevice()->rank().thread_rank();
219215

220216
size_t total_elems = 0;
@@ -274,8 +270,8 @@ void Reducer::RebuildBuckets() {
274270
tensors_in_order.push_back(params_[global_idx]);
275271
}
276272

277-
const size_t first_cap_bytes = opts_.first_bucket_cap_mb * 1024ULL * 1024ULL;
278-
const size_t normal_cap_bytes = opts_.normal_bucket_cap_mb * 1024ULL * 1024ULL;
273+
const size_t first_cap_bytes = opts_.first_bucket_cap_mb * kBytesPerMB;
274+
const size_t normal_cap_bytes = opts_.normal_bucket_cap_mb * kBytesPerMB;
279275
std::vector<size_t> bucket_size_limits = {first_cap_bytes, normal_cap_bytes};
280276
auto new_bucket_indices = ComputeBucketAssignmentBySize(tensors_in_order, bucket_size_limits, full_order);
281277

@@ -364,8 +360,7 @@ void Reducer::MarkVariableReadyDense(size_t variable_index) {
364360
auto &bucket = buckets_.at(loc.bucket_index);
365361

366362
// Record real order of bucket being ready
367-
if (!has_rebuilt_bucket_ && variable_index < ready_seen_this_iter_.size()
368-
&& !ready_seen_this_iter_[variable_index]) {
363+
if (!has_rebuilt_bucket_ && !ready_seen_this_iter_[variable_index]) {
369364
grad_ready_order_indices_.push_back(variable_index);
370365
ready_seen_this_iter_[variable_index] = 1;
371366
}

infini_train/src/profiler.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ void Profiler::StartRecord(const std::string &name, DeviceType device) {
8484
cudaStream_t stream = GetCudaStream();
8585
CUDA_CHECK(cudaEventCreate(&start));
8686
CUDA_CHECK(cudaEventCreate(&stop));
87+
88+
// Make sure the compute stream has done waiting, and ready for the execution of next op
89+
CUDA_CHECK(cudaStreamSynchronize(stream));
90+
// Start record after waiting
91+
cpu_timing_map_[name] = std::chrono::high_resolution_clock::now();
8792
CUDA_CHECK(cudaEventRecord(start, stream));
8893
cuda_timing_map_[name] = {reinterpret_cast<void *>(start), reinterpret_cast<void *>(stop)};
8994
break;

infini_train/src/tensor.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,12 @@ std::shared_ptr<Tensor> Tensor::RequiresGrad() {
544544
}
545545

546546
std::shared_ptr<Tensor> Tensor::grad() const { return grad_; };
547-
void Tensor::set_grad(std::shared_ptr<Tensor> grad) {
547+
void Tensor::set_grad(std::shared_ptr<Tensor> &grad) {
548548
if (grad) {
549549
CHECK(grad->GetDevice() == GetDevice());
550550
CHECK(grad->Dtype() == Dtype());
551551
CHECK(grad->Dims() == Dims());
552-
grad_ = std::move(grad);
552+
grad_ = grad;
553553
} else {
554554
grad_.reset();
555555
}

0 commit comments

Comments
 (0)