diff --git a/tools/ReferenceFileIO.py b/tools/ReferenceFileIO.py new file mode 100644 index 0000000..6342a63 --- /dev/null +++ b/tools/ReferenceFileIO.py @@ -0,0 +1,78 @@ +import gzip +import csv +import json +from json import JSONEncoder +import numpy as np + +regression_version = "1.0" + +class NumpyArrayEncoder(JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + return JSONEncoder.default(self, obj) + + +# -------------------------------------------------- +# Helper: read CSV + metadata +# -------------------------------------------------- +def read_CSV_reference_file(file_path): + meta = {} + data_rows = [] + + with gzip.open(file_path, "rt") as f: + # Read metadata + while True: + pos = f.tell() + line = f.readline() + if not line: + break + + if line.startswith("#"): + if "=" in line: + k, v = line[1:].strip().split("=", 1) + meta[k.strip()] = v.strip() + else: + f.seek(pos) + break + + reader = csv.reader(f) + for row in reader: + if row: + data_rows.append(row) + + return meta, data_rows + + +def write_CSV_reference_file(file_path, dof_per_point, num_points, csv_rows): + with gzip.open(file_path, "wt", newline="") as f: + writer = csv.writer(f) + f.write(f"# format_version={regression_version}\n") + f.write(f"# dof_per_point={dof_per_point}\n") + f.write(f"# num_points={num_points}\n") + + if dof_per_point == 2: + f.write("# layout=time,X0,Y1,...,Xn,Yn\n") + elif dof_per_point == 3: + f.write("# layout=time,X0,Y1,Z1,...,Xn,Yn,Zn\n") + elif dof_per_point == 7: + f.write("# layout=time,X0,Y1,Z1,Qx1,Qy1,Qz1,Qw1,...,Xn,Yn,Zn,QxN,QyN,QzN1,QwN\n") + else: + f.write("# layout=unknown\n") + + writer.writerows(csv_rows) + + +def write_JSON_reference_file(file_path, numpy_data): + with gzip.open(file_path, 'wb') as write_file: + write_file.write(json.dumps(numpy_data, cls=NumpyArrayEncoder).encode('utf-8')) + +def read_JSON_reference_file(file_path): + with gzip.open(file_path, 'r') as zipfile: + decoded_array = json.loads(zipfile.read().decode('utf-8')) + + keyframes = [] + for key in decoded_array: + keyframes.append(float(key)) + + return decoded_array, keyframes diff --git a/tools/RegressionSceneData.py b/tools/RegressionSceneData.py index 5992519..d4a9ae6 100644 --- a/tools/RegressionSceneData.py +++ b/tools/RegressionSceneData.py @@ -1,10 +1,8 @@ from tqdm import tqdm -import json -from json import JSONEncoder import numpy as np -import gzip import pathlib +import tools.ReferenceFileIO as reference_io import Sofa def is_simulated(node): @@ -49,12 +47,6 @@ def onAnimateEndEvent(self, event): self.frame_step += 1 -class NumpyArrayEncoder(JSONEncoder): - def default(self, obj): - if isinstance(obj, np.ndarray): - return obj.tolist() - return JSONEncoder.default(self, obj) - def is_mapped(node): mapping = node.getMechanicalMapping() @@ -157,7 +149,7 @@ def add_write_state(self): counter = counter+1 - def load_scene(self): + def load_scene(self, format = "JSON"): self.root_node = Sofa.Simulation.load(self.file_scene_path) if not self.root_node: # error while loading print(f'Error while trying to load {self.file_scene_path}') @@ -169,120 +161,217 @@ def load_scene(self): self.parse_node(self.root_node, 0) counter = 0 for mecaObj in self.meca_objs: - _filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".json.gz" + if format == "CSV": + _filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".csv.gz" + elif format == "JSON": + _filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".json.gz" self.filenames.append(_filename) counter = counter+1 - - def write_references(self): + def write_references(self, format = "JSON"): pbar_simu = tqdm(total=self.steps, disable=self.disable_progress_bar) pbar_simu.set_description("Simulate: " + self.file_scene_path) - - nbr_meca = len(self.meca_objs) - numpy_data = [] # List - for meca_id in range(0, nbr_meca): - meca_dofs = {} - numpy_data.append(meca_dofs) - + # compute stepping parameters for the simulation counter_step = 0 modulo_step = self.steps / self.dump_number_step + dt = self.root_node.dt.value + # prepae per-mechanical-object data + nbr_meca = len(self.meca_objs) + if format == "CSV": + csv_rows = [[] for _ in range(nbr_meca)] + elif format == "JSON": + numpy_data = [] # List + for meca_id in range(0, nbr_meca): + meca_dofs = {} + numpy_data.append(meca_dofs) + else: + print(f"Unsupported format: {format}") + raise ValueError(f"Unsupported format: {format}") + for step in range(0, self.steps + 1): - # export rest position, final position + modulo steps: if step == 0 or counter_step >= modulo_step or step == self.steps: - for meca_id in range(0, nbr_meca): - numpy_data[meca_id][self.root_node.dt.value * step] = np.copy(self.meca_objs[meca_id].position.value) + t = dt * step + for meca_id in range(nbr_meca): + positions = np.asarray(self.meca_objs[meca_id].position.value) + + if format == "CSV": + row = [t] + row.extend(positions.reshape(-1).tolist()) # flatten vec3d + csv_rows[meca_id].append(row) + elif format == "JSON": + numpy_data[meca_id][t] = np.copy(positions) + counter_step = 0 - Sofa.Simulation.animate(self.root_node, self.root_node.dt.value) - counter_step = counter_step + 1 - + Sofa.Simulation.animate(self.root_node, dt) + counter_step += 1 pbar_simu.update(1) + pbar_simu.close() - for meca_id in range(0, nbr_meca): - # make sure the parent directory of the references exists + # write reference files + for meca_id in range(nbr_meca): output_file = pathlib.Path(self.filenames[meca_id]) output_file.parent.mkdir(exist_ok=True, parents=True) - with gzip.open(self.filenames[meca_id], 'wb') as write_file: - write_file.write(json.dumps(numpy_data[meca_id], cls=NumpyArrayEncoder).encode('utf-8')) + if format == "CSV": + dof_per_point = self.meca_objs[meca_id].position.value.shape[1] + n_points = self.meca_objs[meca_id].position.value.shape[0] + reference_io.write_CSV_reference_file(self.filenames[meca_id], dof_per_point, n_points, csv_rows[meca_id]) + elif format == "JSON": + reference_io.write_JSON_reference_file(self.filenames[meca_id], numpy_data[meca_id]) Sofa.Simulation.unload(self.root_node) - - def compare_references(self): + + def compare_references(self, format = "JSON"): pbar_simu = tqdm(total=float(self.steps), disable=self.disable_progress_bar) pbar_simu.set_description("compare_references: " + self.file_scene_path) - + nbr_meca = len(self.meca_objs) - numpy_data = [] # List - keyframes = [] + + # Reference data + keyframes = [] # shared timeline + if format == "CSV": + ref_values = [] # List[List[np.ndarray]] + elif format == "JSON": + numpy_data = [] # List + else: + print(f"Unsupported format: {format}") + raise ValueError(f"Unsupported format: {format}") + + # Outputs init self.total_error = [] self.error_by_dof = [] + self.nbr_tested_frame = 0 + self.regression_failed = False - try: - for meca_id in range(0, nbr_meca): - with gzip.open(self.filenames[meca_id], 'r') as zipfile: - decoded_array = json.loads(zipfile.read().decode('utf-8')) + # -------------------------------------------------- + # Load reference files + # -------------------------------------------------- + for meca_id in range(nbr_meca): + try: + if format == "CSV": + meta, rows = reference_io.read_CSV_reference_file(self.filenames[meca_id]) + + dof_per_point = int(meta["dof_per_point"]) + n_points = int(meta["num_points"]) + + times = [] + values = [] + + for row in rows: + t = float(row[0]) + flat = np.asarray(row[1:], dtype=float) + + expected_size = n_points * dof_per_point + if flat.size != expected_size: + print( + f"Reference size mismatch for file {self.file_scene_path}, " + f"MechanicalObject {meca_id}: " + f"expected {expected_size}, got {flat.size}" + ) + return False + + values.append(flat.reshape((n_points, dof_per_point))) + times.append(t) + + ref_values.append(values) + + # Keep timeline from first MechanicalObject + if meca_id == 0: + keyframes = times + else: + if len(times) != len(keyframes): + print( + f"Reference timeline mismatch for file {self.file_scene_path}, " + f"MechanicalObject {meca_id}" + ) + return False + + elif format == "JSON": + decoded_array, decoded_keyframes = reference_io.read_JSON_reference_file(self.filenames[meca_id]) numpy_data.append(decoded_array) + # Keep timeline from first MechanicalObject if meca_id == 0: - for key in decoded_array: - keyframes.append(float(key)) + keyframes = decoded_keyframes self.total_error.append(0.0) self.error_by_dof.append(0.0) + except FileNotFoundError as e: - print(f'Error while reading references: {str(e)}') + print(f"Error while reading references: {str(e)}") + return False + except KeyError as e: + print(f"Missing metadata in reference file: {str(e)}") return False - + # -------------------------------------------------- + # Simulation + comparison + # -------------------------------------------------- frame_step = 0 nbr_frames = len(keyframes) - self.nbr_tested_frame = 0 + dt = self.root_node.dt.value for step in range(0, self.steps + 1): - simu_time = self.root_node.dt.value * step + simu_time = dt * step - if simu_time == keyframes[frame_step]: - for meca_id in range(0, nbr_meca): + # Use tolerance for float comparison + if frame_step < nbr_frames and np.isclose(simu_time, keyframes[frame_step]): + for meca_id in range(nbr_meca): meca_dofs = np.copy(self.meca_objs[meca_id].position.value) - data_ref = np.asarray(numpy_data[meca_id][str(keyframes[frame_step])]) - if (meca_dofs.size != data_ref.size): - print(f'Error while reading reference for file {self.file_scene_path} at mechanicalObject id: {str(meca_id)}. Reference size: {data_ref.size} vs current size: {meca_dofs.size}') + + if format == "CSV": + data_ref = ref_values[meca_id][frame_step] + elif format == "JSON": + data_ref = np.asarray(numpy_data[meca_id][str(keyframes[frame_step])]) + + if meca_dofs.shape != data_ref.shape: + print( + f"Shape mismatch for file {self.file_scene_path}, " + f"MechanicalObject {meca_id}: " + f"reference {data_ref.shape} vs current {meca_dofs.shape}" + ) return False - + data_diff = data_ref - meca_dofs - + # Compute total distance between the 2 sets full_dist = np.linalg.norm(data_diff) error_by_dof = full_dist / float(data_diff.size) - + if self.verbose: - print (str(step) + "| " + self.meca_objs[meca_id].name.value + " | full_dist: " + str(full_dist) + " | error_by_dof: " + str(error_by_dof) + " | nbrDofs: " + str(data_ref.size)) + print( + f"{step} | {self.meca_objs[meca_id].name.value} | " + f"full_dist: {full_dist} | " + f"error_by_dof: {error_by_dof} | " + f"nbrDofs: {data_ref.size}" + ) - self.total_error[meca_id] = self.total_error[meca_id] + full_dist - self.error_by_dof[meca_id] = self.error_by_dof[meca_id] + error_by_dof + self.total_error[meca_id] += full_dist + self.error_by_dof[meca_id] += error_by_dof + + frame_step += 1 + self.nbr_tested_frame += 1 - frame_step = frame_step + 1 - self.nbr_tested_frame = self.nbr_tested_frame + 1 - # security exit if simulation steps exceed nbr_frames if frame_step == nbr_frames: break - - Sofa.Simulation.animate(self.root_node, self.root_node.dt.value) - + + Sofa.Simulation.animate(self.root_node, dt) + pbar_simu.update(1) pbar_simu.close() - - for meca_id in range(0, nbr_meca): + + # Final regression returns value + for meca_id in range(nbr_meca): if self.total_error[meca_id] > self.epsilon: self.regression_failed = True return False - + return True