Skip to content
This repository was archived by the owner on Aug 5, 2022. It is now read-only.

Commit 80f1595

Browse files
committed
Implement optimization of overlapping wait communication with forward; Support time measurement of staring and waiting communication
Change-Id: I6c5af8d85fbb8a81353142752a37dfce8cd1870d
1 parent 84b52ca commit 80f1595

File tree

8 files changed

+279
-37
lines changed

8 files changed

+279
-37
lines changed

Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,13 @@ ifeq ($(CAFFE_PER_LAYER_TIMINGS), 1)
7777
endif
7878

7979
ifeq ($(CAFFE_MLSL_SHUFFLE), 1)
80-
COMMON_FLAGS += -DCAFFE_MLSL_SHUFFLE
80+
COMMON_FLAGS += -DCAFFE_MLSL_SHUFFLE
8181
endif
8282

83+
ifneq ($(FW_OVERLAP_OPT), 0)
84+
COMMON_FLAGS += -DFW_OVERLAP_OPT
85+
endif
8386
endif
84-
8587
#################### MLSL ####################
8688

8789

cmake/Dependencies.cmake

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ if(USE_MLSL)
115115
include_directories(SYSTEM "${MLSL_ROOT}/intel64/include")
116116
link_directories(SYSTEM "${MLSL_ROOT}/intel64/lib")
117117
list(APPEND Caffe_LINKER_LIBS mlsl)
118+
119+
if(CAFFE_PER_LAYER_TIMINGS)
120+
add_definitions("-DCAFFE_PER_LAYER_TIMINGS")
121+
endif()
122+
if(CAFFE_MLSL_SHUFFLE)
123+
add_definitions("-DCAFFE_MLSL_SHUFFLE")
124+
endif()
125+
if(FW_OVERLAP_OPT OR NOT DEFINED FW_OVERLAP_OPT)
126+
message(STATUS "Forward overlapping optimization is enabled!")
127+
add_definitions("-DFW_OVERLAP_OPT")
128+
endif()
118129
endif()
119130

120131
# ---[ BLAS

include/caffe/multinode/multi_solver.hpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ class MultiSolver {
6060
iter_size(root_solver_->param().iter_size()) {
6161
root_solver_->set_forward_backward(
6262
boost::bind(&MultiSolver<Dtype>::ForwardBackward, this));
63+
#ifdef FW_OVERLAP_OPT
64+
Net<Dtype>& net = *root_solver_->net();
65+
const std::vector<shared_ptr<Layer<Dtype>>> & layers{ net.layers() };
66+
layer_finished_flags_.resize(layers.size());
67+
std::fill(layer_finished_flags_.begin(), layer_finished_flags_.end(), false);
68+
#endif
6369
}
6470

6571

@@ -99,14 +105,23 @@ class MultiSolver {
99105
boost::shared_ptr<Solver<Dtype>> root_solver() {
100106
return root_solver_;
101107
}
102-
108+
#ifdef FW_OVERLAP_OPT
109+
void set_layer_finished_flag(int layer_id, bool flag) {
110+
layer_finished_flags_[layer_id] = flag;
111+
}
112+
#endif
103113
private:
104114
virtual Dtype ForwardBackwardImpl(bool first, bool last);
115+
bool IsSkipWaitGradient(int layer_id);
116+
void WaitAndUpdateGradient(int layer_id);
105117

106118
protected:
107119
boost::shared_ptr<Solver<Dtype>> root_solver_;
108120
int iter_size;
109121
vector<Callback*> callbacks_;
122+
#ifdef FW_OVERLAP_OPT
123+
vector<bool> layer_finished_flags_;
124+
#endif
110125
};
111126

112127
} // namespace caffe

include/caffe/multinode/multi_sync.hpp

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ namespace caffe {
7474
shared_ptr<Net<Dtype>> net;
7575
const vector<Blob<Dtype> *> &net_params;
7676
vector<vector<int>> layer_param_ids;
77+
#ifdef FW_OVERLAP_OPT
78+
vector<vector<bool>> param_ids_finished_flags;
79+
#endif
80+
7781
// layer_id -> blob_id -> cached blob to restore
7882
// statistics
7983
vector<vector<shared_ptr<Blob<Dtype>>>> cached_stats;
@@ -160,6 +164,12 @@ namespace caffe {
160164
<< " ENABLED"
161165
#else
162166
<< " DISABLED"
167+
#endif
168+
<< ", FORWARD OVERLAP OPTIMIZATION IS"
169+
#ifdef FW_OVERLAP_OPT
170+
<< " ENABLED"
171+
#else
172+
<< " DISABLED"
163173
#endif
164174
<< ", SINGLE DB SPLITTING IS"
165175
#ifdef CAFFE_MLSL_SHUFFLE
@@ -201,6 +211,12 @@ namespace caffe {
201211
return;
202212
}
203213

214+
#ifdef FW_OVERLAP_OPT
215+
std::fill(param_ids_finished_flags[layer_id].begin(),
216+
param_ids_finished_flags[layer_id].end(),
217+
false);
218+
#endif
219+
204220
std::vector<int> &param_ids = layer_param_ids[layer_id];
205221
for (int i = 0; i < param_ids.size(); ++i) {
206222
if (!layer->ParamNeedReduce(i)) continue;
@@ -215,15 +231,41 @@ namespace caffe {
215231
void on_delwt_wait(int layer_id) {
216232
boost::shared_ptr<Layer<Dtype>> &layer = layers[layer_id];
217233
if (layer->layerOp == nullptr) {
234+
#ifdef FW_OVERLAP_OPT
235+
solver->set_layer_finished_flag(layer_id, true);
236+
#endif
218237
return;
219238
}
220239

221240
std::vector<int> &param_ids = layer_param_ids[layer_id];
222241

242+
#ifdef FW_OVERLAP_OPT
243+
int finished_count = 0;
244+
#endif
245+
223246
for (int i=0; i<param_ids.size(); i++) {
224-
if (!layer->ParamNeedReduce(i)) continue;
247+
if (!layer->ParamNeedReduce(i)
248+
#ifdef FW_OVERLAP_OPT
249+
|| (param_ids_finished_flags[layer_id][i] == true)) {
250+
finished_count++;
251+
#else
252+
) {
253+
#endif
254+
continue;
255+
}
256+
257+
#ifdef FW_OVERLAP_OPT
258+
bool is_completed = false;
259+
Dtype *delwt_buf{(Dtype *) layer->layerOp->GetParameterSet(i)->TestGradientComm(&is_completed)};
260+
#else
225261
Dtype *delwt_buf{(Dtype *) layer->layerOp->GetParameterSet(i)->WaitGradientComm()};
262+
#endif
226263
if (delwt_buf) {
264+
#ifdef FW_OVERLAP_OPT
265+
assert(is_completed);
266+
param_ids_finished_flags[layer_id][i] = true;
267+
finished_count++;
268+
#endif
227269
if (CAN_USE_PRV(net_params[param_ids[i]])) {
228270
if (delwt_buf != net_params[param_ids[i]]->prv_diff())
229271
caffe_copy(net_params[param_ids[i]]->count(),
@@ -235,6 +277,12 @@ namespace caffe {
235277
net_params[param_ids[i]]->mutable_cpu_diff());
236278
}
237279
}
280+
281+
#ifdef FW_OVERLAP_OPT
282+
if (finished_count == param_ids.size()) {
283+
solver->set_layer_finished_flag(layer_id, true);
284+
}
285+
#endif
238286
}
239287

240288
void on_gradients_ready() {

include/caffe/solver.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,18 @@ class Solver {
165165
std::vector<double> forward_time_per_layer;
166166
std::vector<double> backward_time_per_layer;
167167
std::vector<double> update_time_per_layer;
168+
#ifdef USE_MLSL
169+
std::vector<double> startcomm_time_per_layer;
170+
std::vector<double> waitcomm_time_per_layer;
171+
#endif
168172

169173
std::vector<double> forward_time_per_layer_total;
170174
std::vector<double> backward_time_per_layer_total;
171175
std::vector<double> update_time_per_layer_total;
176+
#ifdef USE_MLSL
177+
std::vector<double> startcomm_time_per_layer_total;
178+
std::vector<double> waitcomm_time_per_layer_total;
179+
#endif
172180

173181
void InitTimers();
174182
void ResetTimers();

src/caffe/multinode/multi_solver.cpp

Lines changed: 120 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,77 +46,167 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
4646

4747
namespace caffe {
4848

49+
#define START_ITER 1
50+
51+
52+
#ifdef CAFFE_PER_LAYER_TIMINGS
53+
#define LAYER_TIMING_START() do { \
54+
timer.Start(); \
55+
}while(0)
56+
57+
#define LAYER_TIMING_STOP(name, index) do { \
58+
name##_time_per_layer[index] += timer.MicroSeconds(); \
59+
}while(0)
60+
#else
61+
#define LAYER_TIMING_START()
62+
63+
#define LAYER_TIMING_STOP(name,index)
64+
#endif
65+
66+
template <typename Dtype>
67+
inline bool MultiSolver<Dtype>::IsSkipWaitGradient(int layer_id) {
68+
Net<Dtype>& net = *root_solver_->net();
69+
const std::vector<shared_ptr<Layer<Dtype>>>& layers{ net.layers() };
70+
const std::vector<bool>& layer_need_backward{ net.layer_need_backward() };
71+
72+
if (!layer_need_backward[layer_id] || ((layers[layer_id]->layerOp != nullptr)
73+
&& !layers[layer_id]->layerOp->HasParameterSets())) {
74+
DLOG(INFO) << "ForwardBackwardImpl: no need for apply_updates for layer # "
75+
<< layer_id << ", skip on_delwt_wait, apply_updates, on_wtinc_ready";
76+
return true;
77+
}
78+
return false;
79+
}
80+
81+
template <typename Dtype>
82+
inline void MultiSolver<Dtype>::WaitAndUpdateGradient(int layer_id) {
83+
LAYER_TIMING_START();
84+
for (int j = 0; j < callbacks_.size(); ++j) {
85+
callbacks_[j]->on_delwt_wait(layer_id);
86+
}
87+
LAYER_TIMING_STOP(waitcomm, layer_id);
88+
89+
#ifdef FW_OVERLAP_OPT
90+
if (layer_finished_flags_[layer_id]) {
91+
#endif
92+
LAYER_TIMING_START();
93+
for (int j = 0; j < callbacks_.size(); ++j) {
94+
callbacks_[j]->apply_updates(layer_id);
95+
}
96+
LAYER_TIMING_STOP(update, layer_id);
97+
#ifdef FW_OVERLAP_OPT
98+
}
99+
#endif
100+
}
101+
49102
template <typename Dtype>
50103
Dtype MultiSolver<Dtype>::ForwardBackwardImpl(bool first, bool last) {
51104

52105
Dtype loss = 0;
53106
Net<Dtype>& net = *root_solver_->net();
54107
const std::vector<shared_ptr<Layer<Dtype>>>& layers{ net.layers() };
55108
const std::vector<bool>& layer_need_backward{ net.layer_need_backward() };
109+
#ifdef FW_OVERLAP_OPT
110+
int iter = root_solver_->iter();
111+
#endif
56112

57113
#ifdef CAFFE_PER_LAYER_TIMINGS
58114
Timer& timer = root_solver_->timer;
59115
std::vector<double>& forward_time_per_layer = root_solver_->forward_time_per_layer;
60116
std::vector<double>& backward_time_per_layer = root_solver_->backward_time_per_layer;
61117
std::vector<double>& update_time_per_layer = root_solver_->update_time_per_layer;
118+
std::vector<double>& startcomm_time_per_layer = root_solver_->startcomm_time_per_layer;
119+
std::vector<double>& waitcomm_time_per_layer = root_solver_->waitcomm_time_per_layer;
62120
#endif /* CAFFE_PER_LAYER_TIMINGS */
63121

122+
64123
for (int i = 0; i < layers.size(); ++i) {
65-
#ifdef CAFFE_PER_LAYER_TIMINGS
66-
timer.Start();
124+
#ifdef FW_OVERLAP_OPT
125+
if (first && iter >= START_ITER + 1) {
126+
while (layer_finished_flags_[i] == false) {
127+
if (IsSkipWaitGradient(i)) {
128+
break;
129+
}
130+
131+
WaitAndUpdateGradient(i);
132+
if (layer_finished_flags_[i]) {
133+
break;
134+
}
135+
136+
for (int k=i+1; k<layers.size(); k++) {
137+
if (layer_finished_flags_[k] || IsSkipWaitGradient(k)) {
138+
layer_finished_flags_[k] = true;
139+
continue;
140+
}
141+
142+
WaitAndUpdateGradient(k);
143+
if (layer_finished_flags_[k])
144+
break;
145+
}
146+
}
147+
layer_finished_flags_[i] = false;
148+
}
67149
#endif
68-
loss += net.ForwardFromTo(i, i);
69150

70-
#ifdef CAFFE_PER_LAYER_TIMINGS
71-
forward_time_per_layer[i] += timer.MicroSeconds();
72-
#endif
151+
LAYER_TIMING_START();
152+
loss += net.ForwardFromTo(i, i);
153+
LAYER_TIMING_STOP(forward, i);
73154
}
74155

75156
for (int i = layers.size() - 1; i >= 0; --i) {
76-
#ifdef CAFFE_PER_LAYER_TIMINGS
77-
timer.Start();
78-
#endif
79-
80157
if (!layer_need_backward[i]) {
81158
continue;
82159
}
160+
161+
LAYER_TIMING_START();
83162

84163
net.BackwardFromTo(i, i);
85164

165+
LAYER_TIMING_STOP(backward, i);
166+
86167
if (last && (layers[i]->layerOp != nullptr) && layers[i]->layerOp->HasParameterSets()) {
168+
LAYER_TIMING_START();
87169
for (int j = 0; j < callbacks_.size(); ++j) {
88-
callbacks_[j]->on_iter_finished(i);
170+
callbacks_[j]->on_iter_finished(i);
89171
}
172+
LAYER_TIMING_STOP(startcomm, i);
90173
}
91-
92-
#ifdef CAFFE_PER_LAYER_TIMINGS
93-
backward_time_per_layer[i] += timer.MicroSeconds();
94-
#endif
95174
}
96175

176+
#ifdef FW_OVERLAP_OPT
177+
int max_iter = root_solver_->param().max_iter();
178+
bool test = (root_solver_->param().test_interval()
179+
&& ((iter + 1) % root_solver_->param().test_interval() == 0));
180+
if (last && (test || (iter == max_iter - 1))) {
181+
int finished_count = 0;
182+
while (finished_count < layers.size()) {
183+
#else
97184
if (last) {
185+
#endif
98186

99187
for (int i = 0; i < layers.size(); ++i) {
100-
#ifdef CAFFE_PER_LAYER_TIMINGS
101-
timer.Start();
188+
#ifdef FW_OVERLAP_OPT
189+
if (layer_finished_flags_[i])
190+
continue;
102191
#endif
103-
if (!layer_need_backward[i] || ((layers[i]->layerOp != nullptr) && !layers[i]->layerOp->HasParameterSets())) {
104-
DLOG(INFO) << "ForwardBackwardImpl: no need for apply_updates for layer # " << i
105-
<< ", skip on_delwt_wait, apply_updates, on_wtinc_ready";
106-
continue;
107-
}
192+
if (IsSkipWaitGradient(i)) {
193+
#ifdef FW_OVERLAP_OPT
194+
finished_count++;
195+
layer_finished_flags_[i] = true;
196+
#endif
197+
continue;
198+
}
108199

109-
for (int j = 0; j < callbacks_.size(); ++j) {
110-
callbacks_[j]->on_delwt_wait(i);
111-
}
200+
WaitAndUpdateGradient(i);
112201

113-
for (int j = 0; j < callbacks_.size(); ++j) {
114-
callbacks_[j]->apply_updates(i);
115-
}
116-
#ifdef CAFFE_PER_LAYER_TIMINGS
117-
update_time_per_layer[i] += timer.MicroSeconds();
202+
#ifdef FW_OVERLAP_OPT
203+
if (layer_finished_flags_[i])
204+
finished_count++;
118205
#endif
206+
}
207+
#ifdef FW_OVERLAP_OPT
119208
}
209+
#endif
120210
}
121211

122212
DLOG(WARNING) << "iter " << root_solver_->iter() << ", loss " << loss;

0 commit comments

Comments
 (0)