Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions pipit/readers/nsight_sqlite_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,11 @@ def read(self) -> pipit.trace.Trace:
value_name="Timestamp (ns)",
)

if "bytes" in trace_df.columns:
trace_df = trace_df.astype(
{
"bytes": "Int64",
}
)
# Convert to the pandas nullable dtypes
# This will help preserve e.g. streamId as an
# integer column with nulls instead of casting to
# float64
trace_df = trace_df.convert_dtypes()

# Cache mapping
trace_df["_matching_event"] = np.concatenate(
Expand All @@ -249,9 +248,24 @@ def read(self) -> pipit.trace.Trace:
trace_df["_matching_event"]
].to_numpy()

trace_df["_depth"] = 0
trace_df["_parent"] = None
trace_df["_children"] = None
# Cannot use ignore_index = True since that breaks the
# _matching_event col
trace_df = trace_df.sort_values(by="Timestamp (ns)")

if self.trace_types == ["gpu_trace"]:
parallelism_levels = ["gpuId", "streamId"]
elif self.trace_types == ["cuda_api"]:
Comment thread
jhdavis8 marked this conversation as resolved.
parallelism_levels = ["Process"]
else:
parallelism_levels = ["Process", "gpuId", "streamId"]

trace = pipit.trace.Trace(None, trace_df, parallelism_levels=parallelism_levels)
if self.create_cct:
trace.create_cct()

# Call match caller callee to recreate hierarchical
# relationship between annotations
trace._match_caller_callee()

# Associate CUDA API calls with memory operations or
# kernel launches
Expand Down Expand Up @@ -281,23 +295,4 @@ def read(self) -> pipit.trace.Trace:
calls_that_launch["index_x"].to_numpy()
)

# Follow _match_caller_callee
# _match_caller_callee also converts to a categorical of Int32
trace_df = trace_df.astype({"_depth": "Int32", "_parent": "Int32"})
trace_df = trace_df.astype({"_depth": "category", "_parent": "category"})

# Cannot use ignore_index = True since that breaks the
# _matching_event col
trace_df = trace_df.sort_values(by="Timestamp (ns)")

if self.trace_types == ["gpu_trace"]:
parallelism_levels = ["gpuId", "streamId"]
elif self.trace_types == ["cuda_api"]:
parallelism_levels = ["Process"]
else:
parallelism_levels = ["Process", "gpuId", "streamId"]

trace = pipit.trace.Trace(None, trace_df, parallelism_levels=parallelism_levels)
if self.create_cct:
trace.create_cct()
return trace
136 changes: 70 additions & 66 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,36 +217,8 @@ def _match_caller_callee(self):
# ignore unmatched ones
self._match_events()

# only use enter and leave rows
# to determine calling relationships
enter_leave_df = self.events.loc[
(
self.events["Event Type"].isin(["Enter", "Leave"])
& (self.events["_matching_event"].notnull())
)
]

# list of processes and/or threads to iterate over
if "Thread" in self.events.columns:
exec_locations = set(zip(self.events["Process"], self.events["Thread"]))
has_thread = True
else:
exec_locations = set(self.events["Process"])
has_thread = False

for curr_loc in exec_locations:
# only filter by thread if the trace has a thread column
if has_thread:
curr_process, curr_thread = curr_loc
filtered_df = enter_leave_df.loc[
(enter_leave_df["Process"] == curr_process)
& (enter_leave_df["Thread"] == curr_thread)
]
else:
filtered_df = enter_leave_df.loc[
(enter_leave_df["Process"] == curr_loc)
]

def _match_caller_callee_by_level(filtered_df):
# Matches caller/callee for each parallelism level
children = np.array([None] * len(filtered_df))
depth, parent = [float("nan")] * len(filtered_df), [float("nan")] * len(
filtered_df
Expand Down Expand Up @@ -291,31 +263,30 @@ def _match_caller_callee(self):

curr_depth -= 1

curr_process = curr_loc
thread_mask = 1
if has_thread:
curr_process, curr_thread = curr_loc
thread_mask = self.events["Thread"] == curr_thread
mask = (
self.events["Event Type"].isin(["Enter", "Leave"])
& (self.events["_matching_event"].notnull())
& (self.events["Process"] == curr_process)
& thread_mask
)
(
self.events.loc[mask, "_depth"],
self.events.loc[mask, "_parent"],
self.events.loc[mask, "_children"],
) = (
depth,
parent,
children,
)
new_df = filtered_df.copy() # don't mutate in transform!
Comment thread
jhdavis8 marked this conversation as resolved.
new_df["_depth"] = depth
new_df["_parent"] = parent
new_df["_children"] = children
return new_df

self.events = self.events.astype({"_depth": "Int32", "_parent": "Int32"})
self.events = self.events.astype(
{"_depth": "category", "_parent": "category"}
# only use enter and leave rows
# to determine calling relationships
enter_leave_mask = self.events["Event Type"].isin(["Enter", "Leave"]) & (
self.events["_matching_event"].notnull()
)
enter_leave_df = self.events.loc[enter_leave_mask]

# add dummy values for depth/parent/children
# (otherwise loc won't insert the values)
self.events["_depth"] = 0
self.events["_parent"] = None
self.events["_children"] = None
self.events.loc[enter_leave_mask] = enter_leave_df.groupby(
self.parallelism_levels, group_keys=False, dropna=False
).apply(_match_caller_callee_by_level)

self.events = self.events.astype({"_depth": "Int32", "_parent": "Int32"})
self.events = self.events.astype({"_depth": "category", "_parent": "category"})

def calc_inc_metrics(self, columns=None):
# if no columns are specified by the user, then we calculate
Expand Down Expand Up @@ -377,15 +348,32 @@ def calc_exc_metrics(self, columns=None):

if metric_col_name not in self.events.columns:
# exc metric starts out as a copy of the inc metric values
exc_values = self.events[inc_col_name].to_list()
inc_values = self.events[inc_col_name].to_list()
exc_values = self.events[inc_col_name].copy()
inc_values = self.events[inc_col_name]

for i in range(len(filtered_df)):
curr_parent_idx, curr_children = parent_df_indices[i], children[i]
for child_idx in curr_children:
# subtract each child's inclusive metric from the total
# to calculate the exclusive metric for the parent
exc_values[curr_parent_idx] -= inc_values[child_idx]

# if the exclusive metric is time, we only want to subtract
# the overlapping portion of time between the parent and child
# this is important for e.g. GPUs where execution happens async
# relative to e.g. a kernel launch
inc_metric = inc_values[child_idx]
if col == "Timestamp (ns)":
# calculate overlap between
# start of child event and end of parent event
Comment thread
jhdavis8 marked this conversation as resolved.
end_time = min(
self.events.loc[curr_parent_idx, "_matching_timestamp"],
self.events.loc[child_idx, "_matching_timestamp"],
)
inc_metric = max(
end_time - self.events.loc[child_idx, "Timestamp (ns)"],
0,
)
exc_values[curr_parent_idx] -= inc_metric

self.events[metric_col_name] = exc_values
self.exc_metrics.append(metric_col_name)
Expand Down Expand Up @@ -629,17 +617,33 @@ def load_imbalance(self, metric="time.exc", num_processes=1):
return imbalance_df

def idle_time(self, idle_functions=["Idle"], mpi_events=False):
# dict for creating a new dataframe
idle_times = {"Process": [], "Idle Time": []}

for process in set(self.events["Process"]):
idle_times["Process"].append(process)
idle_times["Idle Time"].append(
self._calculate_idle_time_for_process(
process, idle_functions, mpi_events
)
# calculate inclusive metrics
if "time.inc" not in self.events.columns:
self.calc_inc_metrics()

if mpi_events:
idle_functions += ["MPI_Wait", "MPI_Waitall", "MPI_Recv"]

def calc_idle_time(events):
# assumes events is sorted by time

# Calculate idle time due to gaps in between events
# This is the total time minus exclusive time spent in functions
total_time = events["Timestamp (ns)"].max() - events["Timestamp (ns)"].min()

idle_time = total_time - events["time.exc"].sum()

# Calculate idle time due to idle_functions
idle_time += events[events["Name"].isin(idle_functions)]["time.inc"].sum()
return idle_time

return (
self.events.groupby(self.parallelism_levels, dropna=False)
.apply(
calc_idle_time,
)
return pd.DataFrame(idle_times)
.rename("idle_time")
)

def _calculate_idle_time_for_process(
self, process, idle_functions=["Idle"], mpi_events=False
Expand Down