Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/include/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct ncclProxyOp {
bool incWorkCounter;
int eActivationMask;
void* taskEventHandle;
uint64_t commHash;
int rank;
int peer;
pid_t pid;
Expand Down Expand Up @@ -164,6 +165,7 @@ struct ncclProxySubArgs {
struct ncclProxyArgs {
struct ncclProxySubArgs subs[NCCL_PROXY_MAX_SUBS];
proxyProgressFunc_t progress;
uint64_t commHash;
int nsubs;
int done;
int onePPN;
Expand Down
25 changes: 13 additions & 12 deletions src/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,27 @@ ncclResult_t getOpIndex(struct ncclProxyArgs* op, struct ncclProxyProgressState*
}

ncclResult_t printProxyOp(struct ncclProxyArgs* op, int poolIndex, int opIndex) {
printf("[%d-%d|%ld| %s", poolIndex, opIndex, op->opCount, op->pattern == ncclPatternSend ? "Send" : op->pattern == ncclPatternRecv ? "Recv" : "Coll");
printf("[0x%lx|%d-%d|%ld| %s", op->commHash, poolIndex, opIndex, op->opCount, ncclFuncToString((ncclFunc_t)op->coll));
for (int s=0; s<op->nsubs; s++) {
struct ncclProxySubArgs* sub = op->subs+s;
printf(" | %s channel %s/%02d", sub->connection->send ? "send" : "recv", ncclTransports[sub->connection->transport]->name, sub->channelId);
if (op->state == ncclProxyOpProgress) {
char status = ' ';
if (op->pattern == ncclPatternRecv) {
if (sub->connection->send) {
if (sub->posted < sub->nsteps && sub->posted < sub->done + NCCL_STEPS) status = 'I'; // Init
else if (sub->transmitted < sub->posted) status = 'G'; // Waiting on GPU
else if (sub->done < sub->transmitted) status = 'S'; // Sending
else status = 'D'; // Done
printf(": %d -> %d / status %c (nsteps %d, posted %lu, transmitted %lu, done %lu)", sub->rank, sub->peer, status, sub->nsteps, sub->posted, sub->transmitted, sub->done);
} else {
if (sub->posted < sub->nsteps && sub->posted < sub->done + NCCL_STEPS) status = 'I'; // Init
else if (sub->received < sub->posted) status = 'R'; // Receiving
else if (sub->received < sub->transmitted) status = 'R'; // Receiving
else if (sub->transmitted < sub->received) status = 'F'; // Flushing
else if (sub->done < sub->transmitted) status = 'G'; // Waiting on GPU
else status = 'D'; // Done
} else if (op->pattern == ncclPatternSend) {
if (sub->posted < sub->nsteps && sub->posted < sub->done + NCCL_STEPS) status = 'I'; // Init
else if (sub->transmitted < sub->posted) status = 'G'; // Waiting on GPU
else if (sub->done < sub->transmitted) status = 'S'; // Sending
else status = 'D'; // Done
printf(": %d <- %d / status %c (nsteps %d, posted %lu, received %lu, transmitted %lu, done %lu)", sub->rank, sub->peer, status, sub->nsteps, sub->posted, sub->received, sub->transmitted, sub->done);
}
printf(" %d%c/%d", sub->peer, status, sub->channelId);
} else {
printf(" %d/%d", sub->peer, sub->channelId);
}
}
printf("]");
Expand Down Expand Up @@ -406,6 +406,7 @@ static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyAr
}
//memset(&args->progress, 0, sizeof(struct ncclProxyArgs)-offsetof(struct ncclProxyArgs, progress));
args->done = 0;
args->commHash = op->commHash;
args->opCount = op->opCount;
args->sliceSteps = op->sliceSteps;
args->chunkSteps = op->chunkSteps;
Expand Down Expand Up @@ -564,7 +565,7 @@ static ncclResult_t SaveProxyProfiler(struct ncclComm* comm, struct ncclProxyOp*

static ncclResult_t SaveProxy(struct ncclComm* comm, struct ncclChannel* channel, int type, int peer, struct ncclProxyOp* op, int connIndex, bool* justInquire) {
if (peer < 0) return ncclSuccess;

op->commHash = comm->commHash;
struct ncclChannelPeer* peerComm = channel->peers[peer];
struct ncclConnector* connector = type == proxyRecv ? peerComm->recv+connIndex : peerComm->send+connIndex;
if (connector->transportComm == NULL) {
Expand Down Expand Up @@ -858,7 +859,7 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int
}

#include <signal.h>
static ncclProxyProgressState* ncclLastProxyState;
static __thread ncclProxyProgressState* ncclLastProxyState;
void ncclDumpProxyState(int signal) {
dumpProxyState(ncclLastProxyState);
}
Expand Down