Skip to content
Open
78 changes: 78 additions & 0 deletions tools/ReferenceFileIO.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import gzip
import csv
import json
from json import JSONEncoder
import numpy as np

regression_version = "1.0"

class NumpyArrayEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return JSONEncoder.default(self, obj)


# --------------------------------------------------
# Helper: read CSV + metadata
# --------------------------------------------------
def read_CSV_reference_file(file_path):
meta = {}
data_rows = []

with gzip.open(file_path, "rt") as f:
# Read metadata
while True:
pos = f.tell()
line = f.readline()
if not line:
break

if line.startswith("#"):
if "=" in line:
k, v = line[1:].strip().split("=", 1)
meta[k.strip()] = v.strip()
else:
f.seek(pos)
break

reader = csv.reader(f)
for row in reader:
if row:
data_rows.append(row)

return meta, data_rows


def write_CSV_reference_file(file_path, dof_per_point, num_points, csv_rows):
with gzip.open(file_path, "wt", newline="") as f:
writer = csv.writer(f)
f.write(f"# format_version={regression_version}\n")
f.write(f"# dof_per_point={dof_per_point}\n")
f.write(f"# num_points={num_points}\n")

if dof_per_point == 2:
f.write("# layout=time,X0,Y1,...,Xn,Yn\n")
elif dof_per_point == 3:
f.write("# layout=time,X0,Y1,Z1,...,Xn,Yn,Zn\n")
elif dof_per_point == 7:
f.write("# layout=time,X0,Y1,Z1,Qx1,Qy1,Qz1,Qw1,...,Xn,Yn,Zn,QxN,QyN,QzN1,QwN\n")
else:
f.write("# layout=unknown\n")

writer.writerows(csv_rows)


def write_JSON_reference_file(file_path, numpy_data):
with gzip.open(file_path, 'wb') as write_file:
write_file.write(json.dumps(numpy_data, cls=NumpyArrayEncoder).encode('utf-8'))

def read_JSON_reference_file(file_path):
with gzip.open(file_path, 'r') as zipfile:
decoded_array = json.loads(zipfile.read().decode('utf-8'))

keyframes = []
for key in decoded_array:
keyframes.append(float(key))

return decoded_array, keyframes
219 changes: 154 additions & 65 deletions tools/RegressionSceneData.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from tqdm import tqdm
import json
from json import JSONEncoder
import numpy as np
import gzip
import pathlib

import tools.ReferenceFileIO as reference_io
import Sofa

def is_simulated(node):
Expand Down Expand Up @@ -49,12 +47,6 @@ def onAnimateEndEvent(self, event):
self.frame_step += 1


class NumpyArrayEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return JSONEncoder.default(self, obj)


def is_mapped(node):
mapping = node.getMechanicalMapping()
Expand Down Expand Up @@ -157,7 +149,7 @@ def add_write_state(self):
counter = counter+1


def load_scene(self):
def load_scene(self, format = "JSON"):
self.root_node = Sofa.Simulation.load(self.file_scene_path)
if not self.root_node: # error while loading
print(f'Error while trying to load {self.file_scene_path}')
Expand All @@ -169,120 +161,217 @@ def load_scene(self):
self.parse_node(self.root_node, 0)
counter = 0
for mecaObj in self.meca_objs:
_filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".json.gz"
if format == "CSV":
_filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".csv.gz"
elif format == "JSON":
_filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".json.gz"
self.filenames.append(_filename)
counter = counter+1



def write_references(self):
def write_references(self, format = "JSON"):
pbar_simu = tqdm(total=self.steps, disable=self.disable_progress_bar)
pbar_simu.set_description("Simulate: " + self.file_scene_path)

nbr_meca = len(self.meca_objs)
numpy_data = [] # List<map>
for meca_id in range(0, nbr_meca):
meca_dofs = {}
numpy_data.append(meca_dofs)


# compute stepping parameters for the simulation
counter_step = 0
modulo_step = self.steps / self.dump_number_step
dt = self.root_node.dt.value

# prepae per-mechanical-object data
nbr_meca = len(self.meca_objs)
if format == "CSV":
csv_rows = [[] for _ in range(nbr_meca)]
elif format == "JSON":
numpy_data = [] # List<map>
for meca_id in range(0, nbr_meca):
meca_dofs = {}
numpy_data.append(meca_dofs)
else:
print(f"Unsupported format: {format}")
raise ValueError(f"Unsupported format: {format}")

for step in range(0, self.steps + 1):
# export rest position, final position + modulo steps:
if step == 0 or counter_step >= modulo_step or step == self.steps:
for meca_id in range(0, nbr_meca):
numpy_data[meca_id][self.root_node.dt.value * step] = np.copy(self.meca_objs[meca_id].position.value)
t = dt * step
for meca_id in range(nbr_meca):
positions = np.asarray(self.meca_objs[meca_id].position.value)

if format == "CSV":
row = [t]
row.extend(positions.reshape(-1).tolist()) # flatten vec3d
csv_rows[meca_id].append(row)
elif format == "JSON":
numpy_data[meca_id][t] = np.copy(positions)

counter_step = 0

Sofa.Simulation.animate(self.root_node, self.root_node.dt.value)
counter_step = counter_step + 1

Sofa.Simulation.animate(self.root_node, dt)
counter_step += 1
pbar_simu.update(1)

pbar_simu.close()

for meca_id in range(0, nbr_meca):
# make sure the parent directory of the references exists
# write reference files
for meca_id in range(nbr_meca):
output_file = pathlib.Path(self.filenames[meca_id])
output_file.parent.mkdir(exist_ok=True, parents=True)

with gzip.open(self.filenames[meca_id], 'wb') as write_file:
write_file.write(json.dumps(numpy_data[meca_id], cls=NumpyArrayEncoder).encode('utf-8'))
if format == "CSV":
dof_per_point = self.meca_objs[meca_id].position.value.shape[1]
n_points = self.meca_objs[meca_id].position.value.shape[0]
reference_io.write_CSV_reference_file(self.filenames[meca_id], dof_per_point, n_points, csv_rows[meca_id])
elif format == "JSON":
reference_io.write_JSON_reference_file(self.filenames[meca_id], numpy_data[meca_id])

Sofa.Simulation.unload(self.root_node)


def compare_references(self):

def compare_references(self, format = "JSON"):
pbar_simu = tqdm(total=float(self.steps), disable=self.disable_progress_bar)
pbar_simu.set_description("compare_references: " + self.file_scene_path)

nbr_meca = len(self.meca_objs)
numpy_data = [] # List<map>
keyframes = []

# Reference data
keyframes = [] # shared timeline
if format == "CSV":
ref_values = [] # List[List[np.ndarray]]
elif format == "JSON":
numpy_data = [] # List<map>
else:
print(f"Unsupported format: {format}")
raise ValueError(f"Unsupported format: {format}")

# Outputs init
self.total_error = []
self.error_by_dof = []
self.nbr_tested_frame = 0
self.regression_failed = False

try:
for meca_id in range(0, nbr_meca):
with gzip.open(self.filenames[meca_id], 'r') as zipfile:
decoded_array = json.loads(zipfile.read().decode('utf-8'))
# --------------------------------------------------
# Load reference files
# --------------------------------------------------
for meca_id in range(nbr_meca):
try:
if format == "CSV":
meta, rows = reference_io.read_CSV_reference_file(self.filenames[meca_id])

dof_per_point = int(meta["dof_per_point"])
n_points = int(meta["num_points"])

times = []
values = []

for row in rows:
t = float(row[0])
flat = np.asarray(row[1:], dtype=float)

expected_size = n_points * dof_per_point
if flat.size != expected_size:
print(
f"Reference size mismatch for file {self.file_scene_path}, "
f"MechanicalObject {meca_id}: "
f"expected {expected_size}, got {flat.size}"
)
return False

values.append(flat.reshape((n_points, dof_per_point)))
times.append(t)

ref_values.append(values)

# Keep timeline from first MechanicalObject
if meca_id == 0:
keyframes = times
else:
if len(times) != len(keyframes):
print(
f"Reference timeline mismatch for file {self.file_scene_path}, "
f"MechanicalObject {meca_id}"
)
return False

elif format == "JSON":
decoded_array, decoded_keyframes = reference_io.read_JSON_reference_file(self.filenames[meca_id])
numpy_data.append(decoded_array)

# Keep timeline from first MechanicalObject
if meca_id == 0:
for key in decoded_array:
keyframes.append(float(key))
keyframes = decoded_keyframes

self.total_error.append(0.0)
self.error_by_dof.append(0.0)

except FileNotFoundError as e:
print(f'Error while reading references: {str(e)}')
print(f"Error while reading references: {str(e)}")
return False
except KeyError as e:
print(f"Missing metadata in reference file: {str(e)}")
return False


# --------------------------------------------------
# Simulation + comparison
# --------------------------------------------------
frame_step = 0
nbr_frames = len(keyframes)
self.nbr_tested_frame = 0
dt = self.root_node.dt.value
for step in range(0, self.steps + 1):
simu_time = self.root_node.dt.value * step
simu_time = dt * step

if simu_time == keyframes[frame_step]:
for meca_id in range(0, nbr_meca):
# Use tolerance for float comparison
if frame_step < nbr_frames and np.isclose(simu_time, keyframes[frame_step]):
for meca_id in range(nbr_meca):
meca_dofs = np.copy(self.meca_objs[meca_id].position.value)
data_ref = np.asarray(numpy_data[meca_id][str(keyframes[frame_step])])
if (meca_dofs.size != data_ref.size):
print(f'Error while reading reference for file {self.file_scene_path} at mechanicalObject id: {str(meca_id)}. Reference size: {data_ref.size} vs current size: {meca_dofs.size}')

if format == "CSV":
data_ref = ref_values[meca_id][frame_step]
elif format == "JSON":
data_ref = np.asarray(numpy_data[meca_id][str(keyframes[frame_step])])

if meca_dofs.shape != data_ref.shape:
print(
f"Shape mismatch for file {self.file_scene_path}, "
f"MechanicalObject {meca_id}: "
f"reference {data_ref.shape} vs current {meca_dofs.shape}"
)
return False

data_diff = data_ref - meca_dofs

# Compute total distance between the 2 sets
full_dist = np.linalg.norm(data_diff)
error_by_dof = full_dist / float(data_diff.size)

if self.verbose:
print (str(step) + "| " + self.meca_objs[meca_id].name.value + " | full_dist: " + str(full_dist) + " | error_by_dof: " + str(error_by_dof) + " | nbrDofs: " + str(data_ref.size))
print(
f"{step} | {self.meca_objs[meca_id].name.value} | "
f"full_dist: {full_dist} | "
f"error_by_dof: {error_by_dof} | "
f"nbrDofs: {data_ref.size}"
)

self.total_error[meca_id] = self.total_error[meca_id] + full_dist
self.error_by_dof[meca_id] = self.error_by_dof[meca_id] + error_by_dof
self.total_error[meca_id] += full_dist
self.error_by_dof[meca_id] += error_by_dof

frame_step += 1
self.nbr_tested_frame += 1

frame_step = frame_step + 1
self.nbr_tested_frame = self.nbr_tested_frame + 1

# security exit if simulation steps exceed nbr_frames
if frame_step == nbr_frames:
break
Sofa.Simulation.animate(self.root_node, self.root_node.dt.value)

Sofa.Simulation.animate(self.root_node, dt)

pbar_simu.update(1)
pbar_simu.close()

for meca_id in range(0, nbr_meca):

# Final regression returns value
for meca_id in range(nbr_meca):
if self.total_error[meca_id] > self.epsilon:
self.regression_failed = True
return False

return True


Expand Down