diff --git a/cookbooks/cosmos3/generator/action/README.md b/cookbooks/cosmos3/generator/action/README.md index 6158764f..0b47defb 100644 --- a/cookbooks/cosmos3/generator/action/README.md +++ b/cookbooks/cosmos3/generator/action/README.md @@ -25,6 +25,7 @@ HF repository before running these examples. To disable the guardrail, set - [Run with Cosmos Framework](#run-with-cosmos-framework) - [Quickstart](#quickstart) - [Cosmos Framework Walkthrough](#cosmos-framework-walkthrough) +- [Topological Modeling](#topological-modeling) - [Run with vLLM-Omni](#run-with-vllm-omni) - [Quickstart](#quickstart-1) - [Notebook walkthrough](#notebook-walkthrough) @@ -48,6 +49,46 @@ fingers. Action data samples across different embodiments can be inspected interactively in the [Cosmos3 Action Viewer](https://huggingface.co/spaces/nvidia/Cosmos3-Action-Viewer) Hugging Face Space. +## Topological Modeling + +The optional [`topology_helpers.py`](./topology_helpers.py) module adds +topology-aware diagnostics for action forward-dynamics rollouts. It evaluates +binary or labeled masks from generated robotics videos and reports connected +components, hole proxies, Euler-characteristic-style summaries, chunk-boundary +drift, and rollout stability scores. The helper is side-effect-free and does not +change Cosmos3 inference behavior. + +The modeling note in [`topological_modeling.md`](./topological_modeling.md) +describes the contribution as one topology-aware layer for Cosmos action FD: +finite-difference/FDTD rollout structure, topology metrics, topological +inference extension points, and sparse swarm-routing extension points. + +Minimal mask-first usage: + +```python +from pathlib import Path +from topology_helpers import ( + RolloutSpec, + TopologyConfig, + evaluate_fd_rollout, + write_topology_csv, + write_topology_json, +) + +report = evaluate_fd_rollout( + masks=object_masks, # sequence of binary masks, or {"object": masks, "gripper": masks} + rollout=RolloutSpec( + video_id="robotics_action_cond_stitched", + domain_name="droid_lerobot", + fps=15, + action_chunk_size=16, + ), + config=TopologyConfig(min_component_area_px=16, generated_frame_start=1), +) +write_topology_json(report, Path("topology_metrics.json")) +write_topology_csv(report, Path("topology_metrics.csv")) +``` + ## Run with Cosmos Framework ### Quickstart diff --git a/cookbooks/cosmos3/generator/action/topological_modeling.md b/cookbooks/cosmos3/generator/action/topological_modeling.md new file mode 100644 index 00000000..dcfadf45 --- /dev/null +++ b/cookbooks/cosmos3/generator/action/topological_modeling.md @@ -0,0 +1,146 @@ +# Topological Modeling for Cosmos Action Forward Dynamics + +This note describes an optional modeling and evaluation layer for Cosmos3 action +forward dynamics. It does not change model inference. It adds a way to inspect +whether generated robotics rollouts preserve simple topology over time. + +## Modeling Rationale + +Forward dynamics predicts future observations from a start image and action +trajectory. For robotics rollouts, useful structure often lives in relationships: +object separation, contact regions, gripper/object coupling, holes, occlusions, +and chunk-boundary continuity. Pixel error alone does not expose these failures +cleanly. + +The contribution treats generated rollout masks or point-cloud proxies as +field-like state samples: + +- finite-difference and FDTD thinking motivate checking how state evolves across + adjacent frames and chunks; +- topology metrics summarize whether visible structure splits, merges, opens, or + collapses; +- topological inference concepts motivate stable manifold state, sparse + neighborhood reasoning, and convergence checks; +- swarm-routing concepts motivate future multi-agent robot/model specialists + selected by topology-aware state similarity. + +The implementation is fresh Cosmos-native code. No external research prototype +code is imported or vendored. + +## Implemented Core + +`topology_helpers.py` evaluates masks for generated FD rollouts and emits +deterministic JSON/CSV summaries. + +The mask-first API is deliberate. It keeps the core metric independent of any +specific segmentation model and makes the evaluator safe for CPU-only tests. +Notebook users can provide task-specific binary masks, labeled masks, or simple +luminance-threshold masks for quick inspection. + +Frame-level metrics include: + +- connected foreground components; +- enclosed background holes as a loop proxy; +- Euler-characteristic-style `components - holes`; +- foreground area, bounding box, and centroid; +- drift from the previous frame; +- drift from the first generated frame; +- optional persistent-homology summaries when `ripser` is installed and enabled. + +Sequence-level metrics include: + +- component and hole change counts; +- mean and maximum topology drift; +- chunk-boundary jump count; +- stable frame ratio; +- heuristic stability score in `[0, 1]`. + +Finite-difference and convergence metrics include: + +- `compute_fdtd_rollout_trace`, an FDTD-inspired trace over topology state + velocity, acceleration, and local change ratio; +- `TopologyConvergenceGate`, a theorem-style threshold bundle for stable + rollouts; +- `evaluate_topological_convergence`, a pass/fail certificate over stability, + Betti-stability, drift, boundary jumps, and finite-difference speed. + +## FD/FDTD Modeling + +Finite-difference modeling gives the evaluator its local-time bias. A rollout is +not inspected as disconnected images; it is inspected as a sequence whose visible +structure should evolve smoothly unless the action implies a topological event. + +This is especially relevant for autoregressive robotics chunks. A chunk boundary +that splits one object mask into two components, opens a previously closed +contact loop, or collapses a stable foreground component can be flagged even when +the generated video still looks plausible at a glance. + +The helper computes finite differences over compact topology state vectors. This +does not claim to solve a physical PDE. It gives the action FD cookbook a +deterministic analogue of velocity and acceleration over rollout topology, so +large local topology jumps become measurable. + +## Topology Metrics + +The first metric layer is intentionally simple: + +1. build connected components over foreground masks; +2. count enclosed background regions as hole proxies; +3. compare component and hole counts over time; +4. summarize stability per rollout. + +This gives a deterministic baseline that can be reviewed without GPU access, +model weights, or external topology packages. Persistent homology is treated as +an optional enrichment, not a hard dependency. + +## Topological Inference Extension + +The same report schema supports an inference-oriented state vector today: + +- `frame_to_state_vector` compresses a frame report into component, hole, + Euler, area, centroid, and drift features; +- `topology_state_distance` compares state vectors with a weighted L1 distance; +- `betti_stability_score` checks whether component/hole proxies stabilize over a + sliding window; +- `evaluate_topological_convergence` produces a small convergence certificate + from stability, Betti-stability, drift, and finite-difference speed checks. + +These helpers are not a new inference engine. They are small interfaces that can +support later inference-oriented analysis: + +- represent rollout frames as points on a manifold; +- use sparse neighborhood queries to compare the current state to prior states; +- monitor Betti-stability over a sliding window; +- detect drift when topology changes faster than action-conditioned dynamics + justify; +- gate stronger claims behind reproducible tests or formal checks. + +Those ideas remain extension points. The current helper exposes metrics and +schemas, not a new inference engine. + +## Swarm Robotics Extension + +For multi-agent robotics, topology summaries can become routing features. A +future swarm evaluator can build on the included `rank_topology_specialists` +helper, which applies a sparse top-k rule over: + +- topology-aware state similarity; +- worker reliability; +- estimated cost; +- utilization balance; +- local drift risk. + +This would let a multi-robot or multi-model system route hard local dynamics +problems to specialists without changing the base Cosmos3 generation interface. + +## Recommended Use + +1. Generate DROID or UMI action FD outputs using the existing notebooks. +2. Produce task-specific masks for objects, grippers, or contact regions. +3. Call `evaluate_fd_rollout` with a `RolloutSpec`. +4. Write `topology_metrics.json` and, when useful, `topology_metrics.csv` beside + the generated rollout output. +5. Inspect drift spikes at autoregressive chunk boundaries. + +The metric is diagnostic. It should not be reported as generation improvement +unless paired with a controlled validation protocol. diff --git a/cookbooks/cosmos3/generator/action/topology_helpers.py b/cookbooks/cosmos3/generator/action/topology_helpers.py new file mode 100644 index 00000000..8d649705 --- /dev/null +++ b/cookbooks/cosmos3/generator/action/topology_helpers.py @@ -0,0 +1,978 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: OpenMDW-1.1 +"""Topology helpers for Cosmos3 action forward-dynamics rollouts. + +The helpers are intentionally dependency-light. The core path accepts binary +or labeled masks represented as Python lists, NumPy arrays, or objects with a +``tolist()`` method. Optional persistent-homology enrichment is attempted only +when requested and when ``ripser`` is importable. +""" + +from __future__ import annotations + +import csv +import json +from collections import deque +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Iterable, Literal, Mapping, Sequence + +Mask2D = Any +Connectivity = Literal[4, 8] +ReferenceMode = Literal["first_generated", "previous", "conditioning"] + +SCHEMA_VERSION = "topology_metrics.v1" + + +@dataclass(frozen=True) +class PersistentHomologyConfig: + enabled: bool = False + backend: Literal["auto", "ripser", "none"] = "none" + max_dim: int = 1 + sample_limit: int = 2000 + include_diagrams: bool = False + + +@dataclass(frozen=True) +class TopologyConfig: + foreground_connectivity: Connectivity = 8 + background_connectivity: Connectivity = 4 + min_component_area_px: int = 16 + frame_stride: int = 1 + generated_frame_start: int = 1 + reference: ReferenceMode = "first_generated" + stable_delta_threshold: int = 0 + persistent_homology: PersistentHomologyConfig = field(default_factory=PersistentHomologyConfig) + + +@dataclass(frozen=True) +class RolloutSpec: + video_id: str + domain_name: str | None = None + fps: float | None = None + action_chunk_size: int | None = None + chunk_count: int | None = None + conditioning_frames_per_chunk: int = 1 + + +@dataclass(frozen=True) +class ComponentStats: + components: int + area_px: int + area_ratio: float + bbox: tuple[int, int, int, int] | None + centroid: tuple[float, float] | None + + +@dataclass(frozen=True) +class PersistentHomologySummary: + status: Literal["disabled", "unavailable", "empty", "computed", "failed"] + betti_0: int | None = None + betti_1: int | None = None + h1_total_persistence: float | None = None + h1_max_persistence: float | None = None + sampled_points: int = 0 + warning: str | None = None + diagrams: list[list[list[float]]] | None = None + + +@dataclass +class FrameTopology: + video_id: str + domain_name: str | None + label: str + frame_index: int + generated_index: int + chunk_index: int | None + timestamp_s: float | None + width: int + height: int + area_px: int + area_ratio: float + components: int + holes: int + euler_characteristic: int + centroid_x: float | None + centroid_y: float | None + bbox_x0: int | None + bbox_y0: int | None + bbox_x1: int | None + bbox_y1: int | None + topology_delta_prev: int = 0 + topology_delta_ref: int = 0 + ph_status: str = "disabled" + ph_betti_0: int | None = None + ph_betti_1: int | None = None + ph_h1_total_persistence: float | None = None + ph_h1_max_persistence: float | None = None + warnings: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class StabilitySummary: + frame_count: int + label_count: int + component_change_count: int + hole_change_count: int + mean_topology_delta_ref: float + max_topology_delta_ref: int + boundary_jump_count: int + stable_frame_ratio: float + stability_score: float + + +@dataclass(frozen=True) +class TopologyStateVector: + """Compact topology state used for inference/routing extensions.""" + + components: float + holes: float + euler_characteristic: float + area_ratio: float + centroid_x_norm: float + centroid_y_norm: float + topology_delta_prev: float + topology_delta_ref: float + + +@dataclass(frozen=True) +class FiniteDifferenceSample: + """Finite-difference topology dynamics for one frame transition.""" + + label: str + frame_index: int + generated_index: int + chunk_index: int | None + dt_s: float + topology_speed: float + topology_acceleration: float + component_velocity: float + hole_velocity: float + euler_velocity: float + area_velocity: float + centroid_velocity: float + local_change_ratio: float + + +@dataclass(frozen=True) +class TopologyConvergenceGate: + """Thresholds for theorem-style rollout convergence checks.""" + + min_stability_score: float = 0.80 + min_betti_stability: float = 0.75 + max_mean_delta_ref: float = 1.0 + max_boundary_jump_count: int = 0 + max_topology_speed: float = 2.0 + betti_window: int = 4 + + +@dataclass(frozen=True) +class TopologyConvergenceResult: + passed: bool + checks: dict[str, bool] + metrics: dict[str, float] + messages: tuple[str, ...] + + +@dataclass(frozen=True) +class SpecialistSignal: + """Candidate robot/model specialist signal for topology-aware routing.""" + + specialist_id: str + capability_tags: tuple[str, ...] + state: TopologyStateVector + reliability: float = 1.0 + cost: float = 0.0 + utilization: int = 0 + + +@dataclass(frozen=True) +class SpecialistRanking: + specialist_id: str + score: float + topology_distance: float + reliability: float + cost: float + utilization: int + matched_tags: tuple[str, ...] + + +@dataclass +class TopologyReport: + schema_version: str + run: RolloutSpec + config: TopologyConfig + frames: list[FrameTopology] + summary: StabilitySummary + warnings: list[str] = field(default_factory=list) + + +def _validate_connectivity(connectivity: int) -> None: + if connectivity not in (4, 8): + raise ValueError(f"connectivity must be 4 or 8, got {connectivity!r}") + + +def _neighbors(connectivity: Connectivity) -> tuple[tuple[int, int], ...]: + _validate_connectivity(connectivity) + offsets = [(-1, 0), (0, -1), (0, 1), (1, 0)] + if connectivity == 8: + offsets.extend([(-1, -1), (-1, 1), (1, -1), (1, 1)]) + return tuple(offsets) + + +def _mask_to_bool_grid(mask: Mask2D) -> list[list[bool]]: + if hasattr(mask, "tolist"): + mask = mask.tolist() + rows = list(mask) + if not rows: + raise ValueError("mask must have at least one row") + + grid: list[list[bool]] = [] + width: int | None = None + for row in rows: + row_values = list(row) + if width is None: + width = len(row_values) + if width == 0: + raise ValueError("mask rows must not be empty") + elif len(row_values) != width: + raise ValueError("mask rows must all have the same width") + grid.append([bool(value) for value in row_values]) + return grid + + +def _region_stats( + pixels: Sequence[tuple[int, int]], + width: int, + height: int, +) -> tuple[int, tuple[int, int, int, int], tuple[float, float]]: + area = len(pixels) + xs = [x for x, _y in pixels] + ys = [y for _x, y in pixels] + bbox = (min(xs), min(ys), max(xs), max(ys)) + centroid = (sum(xs) / area, sum(ys) / area) + return area, bbox, centroid + + +def _regions( + grid: list[list[bool]], + target: bool, + connectivity: Connectivity, +) -> list[list[tuple[int, int]]]: + height = len(grid) + width = len(grid[0]) + seen = [[False for _x in range(width)] for _y in range(height)] + regions: list[list[tuple[int, int]]] = [] + + for y in range(height): + for x in range(width): + if seen[y][x] or grid[y][x] is not target: + continue + + queue: deque[tuple[int, int]] = deque([(x, y)]) + seen[y][x] = True + pixels: list[tuple[int, int]] = [] + + while queue: + px, py = queue.popleft() + pixels.append((px, py)) + for dy, dx in _neighbors(connectivity): + nx = px + dx + ny = py + dy + if nx < 0 or ny < 0 or nx >= width or ny >= height: + continue + if seen[ny][nx] or grid[ny][nx] is not target: + continue + seen[ny][nx] = True + queue.append((nx, ny)) + + regions.append(pixels) + + return regions + + +def label_components(mask: Mask2D, config: TopologyConfig = TopologyConfig()) -> ComponentStats: + """Return foreground connected-component statistics for a binary mask.""" + + if config.min_component_area_px < 1: + raise ValueError("min_component_area_px must be >= 1") + grid = _mask_to_bool_grid(mask) + height = len(grid) + width = len(grid[0]) + + kept_regions = [ + region + for region in _regions(grid, True, config.foreground_connectivity) + if len(region) >= config.min_component_area_px + ] + + if not kept_regions: + return ComponentStats(components=0, area_px=0, area_ratio=0.0, bbox=None, centroid=None) + + pixels = [point for region in kept_regions for point in region] + area_px, bbox, centroid = _region_stats(pixels, width=width, height=height) + return ComponentStats( + components=len(kept_regions), + area_px=area_px, + area_ratio=area_px / float(width * height), + bbox=bbox, + centroid=centroid, + ) + + +def count_holes(mask: Mask2D, config: TopologyConfig = TopologyConfig()) -> int: + """Count enclosed background regions as a simple loop/hole proxy.""" + + grid = _mask_to_bool_grid(mask) + height = len(grid) + width = len(grid[0]) + holes = 0 + for region in _regions(grid, False, config.background_connectivity): + touches_border = any(x == 0 or y == 0 or x == width - 1 or y == height - 1 for x, y in region) + if not touches_border: + holes += 1 + return holes + + +def _foreground_points(mask: Mask2D) -> list[tuple[int, int]]: + grid = _mask_to_bool_grid(mask) + points: list[tuple[int, int]] = [] + for y, row in enumerate(grid): + for x, value in enumerate(row): + if value: + points.append((x, y)) + return points + + +def compute_persistent_homology( + mask: Mask2D, + config: TopologyConfig = TopologyConfig(), +) -> PersistentHomologySummary: + """Compute optional PH summary when requested and available.""" + + ph_config = config.persistent_homology + if not ph_config.enabled or ph_config.backend == "none": + return PersistentHomologySummary(status="disabled") + + points = _foreground_points(mask) + if not points: + return PersistentHomologySummary(status="empty", sampled_points=0) + + if len(points) > ph_config.sample_limit: + stride = max(1, len(points) // ph_config.sample_limit) + points = points[::stride][: ph_config.sample_limit] + + try: + from ripser import ripser + except ImportError: + return PersistentHomologySummary( + status="unavailable", + sampled_points=len(points), + warning="persistent homology requested but ripser is not installed", + ) + + try: + result = ripser(points, maxdim=ph_config.max_dim) + except Exception as exc: # pragma: no cover - defensive around optional dependency + return PersistentHomologySummary( + status="failed", + sampled_points=len(points), + warning=f"ripser failed: {exc}", + ) + + diagrams = result.get("dgms", []) + h0 = diagrams[0] if len(diagrams) > 0 else [] + h1 = diagrams[1] if len(diagrams) > 1 else [] + h1_lifetimes = [ + float(death - birth) + for birth, death in h1 + if death != float("inf") and death >= birth + ] + encoded_diagrams = None + if ph_config.include_diagrams: + encoded_diagrams = [ + [[float(birth), float(death)] for birth, death in diagram] + for diagram in diagrams + ] + + return PersistentHomologySummary( + status="computed", + betti_0=len(h0), + betti_1=len(h1), + h1_total_persistence=sum(h1_lifetimes), + h1_max_persistence=max(h1_lifetimes, default=0.0), + sampled_points=len(points), + diagrams=encoded_diagrams, + ) + + +def _chunk_index(generated_index: int, action_chunk_size: int | None) -> int | None: + if action_chunk_size is None or action_chunk_size <= 0: + return None + return generated_index // action_chunk_size + + +def compute_frame_topology( + mask: Mask2D, + *, + frame_index: int, + generated_index: int, + timestamp_s: float | None, + label: str, + rollout: RolloutSpec, + config: TopologyConfig = TopologyConfig(), +) -> FrameTopology: + """Compute topology metrics for one label/mask/frame.""" + + grid = _mask_to_bool_grid(mask) + height = len(grid) + width = len(grid[0]) + components = label_components(grid, config) + holes = count_holes(grid, config) + ph = compute_persistent_homology(grid, config) + warnings: list[str] = [] + if components.components == 0: + warnings.append("empty_foreground") + if ph.warning: + warnings.append(ph.warning) + + bbox = components.bbox + centroid = components.centroid + return FrameTopology( + video_id=rollout.video_id, + domain_name=rollout.domain_name, + label=label, + frame_index=frame_index, + generated_index=generated_index, + chunk_index=_chunk_index(generated_index, rollout.action_chunk_size), + timestamp_s=timestamp_s, + width=width, + height=height, + area_px=components.area_px, + area_ratio=components.area_ratio, + components=components.components, + holes=holes, + euler_characteristic=components.components - holes, + centroid_x=None if centroid is None else centroid[0], + centroid_y=None if centroid is None else centroid[1], + bbox_x0=None if bbox is None else bbox[0], + bbox_y0=None if bbox is None else bbox[1], + bbox_x1=None if bbox is None else bbox[2], + bbox_y1=None if bbox is None else bbox[3], + ph_status=ph.status, + ph_betti_0=ph.betti_0, + ph_betti_1=ph.betti_1, + ph_h1_total_persistence=ph.h1_total_persistence, + ph_h1_max_persistence=ph.h1_max_persistence, + warnings=warnings, + ) + + +def _normalise_labeled_masks( + masks: Mapping[str, Sequence[Mask2D]] | Sequence[Mask2D], +) -> dict[str, Sequence[Mask2D]]: + if isinstance(masks, Mapping): + if not masks: + raise ValueError("masks mapping must not be empty") + return dict(masks) + return {"foreground": masks} + + +def _validate_sequence_shapes(labeled_masks: Mapping[str, Sequence[Mask2D]]) -> None: + expected_shape: tuple[int, int] | None = None + expected_len: int | None = None + for label, sequence in labeled_masks.items(): + if not sequence: + raise ValueError(f"mask sequence for label {label!r} must not be empty") + if expected_len is None: + expected_len = len(sequence) + elif len(sequence) != expected_len: + raise ValueError("all labels must contain the same number of frames") + for mask in sequence: + grid = _mask_to_bool_grid(mask) + shape = (len(grid), len(grid[0])) + if expected_shape is None: + expected_shape = shape + elif shape != expected_shape: + raise ValueError("all masks must have the same height and width") + + +def compute_sequence_topology( + masks: Mapping[str, Sequence[Mask2D]] | Sequence[Mask2D], + rollout: RolloutSpec, + config: TopologyConfig = TopologyConfig(), +) -> TopologyReport: + """Compute topology drift and stability over a rollout mask sequence.""" + + if config.frame_stride < 1: + raise ValueError("frame_stride must be >= 1") + if config.generated_frame_start < 0: + raise ValueError("generated_frame_start must be >= 0") + + labeled_masks = _normalise_labeled_masks(masks) + _validate_sequence_shapes(labeled_masks) + + frames: list[FrameTopology] = [] + warnings: list[str] = [] + + for label, sequence in labeled_masks.items(): + label_frames: list[FrameTopology] = [] + for frame_index in range(config.generated_frame_start, len(sequence), config.frame_stride): + generated_index = frame_index - config.generated_frame_start + timestamp_s = None if rollout.fps in (None, 0) else frame_index / float(rollout.fps) + label_frames.append( + compute_frame_topology( + sequence[frame_index], + frame_index=frame_index, + generated_index=generated_index, + timestamp_s=timestamp_s, + label=label, + rollout=rollout, + config=config, + ) + ) + if not label_frames: + warnings.append(f"label {label!r} had no frames after generated_frame_start/stride filtering") + conditioning_frame = None + if config.reference == "conditioning" and config.generated_frame_start > 0: + conditioning_index = config.generated_frame_start - 1 + conditioning_ts = None if rollout.fps in (None, 0) else conditioning_index / float(rollout.fps) + conditioning_frame = compute_frame_topology( + sequence[conditioning_index], + frame_index=conditioning_index, + generated_index=-1, + timestamp_s=conditioning_ts, + label=label, + rollout=rollout, + config=config, + ) + _apply_drift(label_frames, config.reference, conditioning_frame) + frames.extend(label_frames) + + summary = summarize_stability(frames, config) + if frames and all(frame.components == 0 for frame in frames): + warnings.append("all evaluated frames have empty foreground masks") + + return TopologyReport( + schema_version=SCHEMA_VERSION, + run=rollout, + config=config, + frames=frames, + summary=summary, + warnings=warnings, + ) + + +def evaluate_fd_rollout( + masks: Mapping[str, Sequence[Mask2D]] | Sequence[Mask2D], + rollout: RolloutSpec, + config: TopologyConfig = TopologyConfig(), +) -> TopologyReport: + """Alias for notebook readability.""" + + return compute_sequence_topology(masks=masks, rollout=rollout, config=config) + + +def _topology_delta(current: FrameTopology, other: FrameTopology) -> int: + return abs(current.components - other.components) + abs(current.holes - other.holes) + + +def _apply_drift( + frames: list[FrameTopology], + reference: ReferenceMode, + conditioning_frame: FrameTopology | None = None, +) -> None: + if not frames: + return + + ref_frame = frames[0] + for index, frame in enumerate(frames): + prev_frame = frames[index - 1] if index > 0 else frame + frame.topology_delta_prev = _topology_delta(frame, prev_frame) + if reference == "previous": + compare_to = prev_frame + elif reference == "conditioning" and conditioning_frame is not None: + compare_to = conditioning_frame + else: + compare_to = ref_frame + frame.topology_delta_ref = _topology_delta(frame, compare_to) + + +def summarize_stability( + frames: Sequence[FrameTopology], + config: TopologyConfig = TopologyConfig(), +) -> StabilitySummary: + """Summarize topology drift across all labels and frames.""" + + if not frames: + return StabilitySummary( + frame_count=0, + label_count=0, + component_change_count=0, + hole_change_count=0, + mean_topology_delta_ref=0.0, + max_topology_delta_ref=0, + boundary_jump_count=0, + stable_frame_ratio=1.0, + stability_score=1.0, + ) + + label_count = len({frame.label for frame in frames}) + component_change_count = 0 + hole_change_count = 0 + boundary_jump_count = 0 + stable_count = 0 + + by_label: dict[str, list[FrameTopology]] = {} + for frame in frames: + by_label.setdefault(frame.label, []).append(frame) + if ( + frame.topology_delta_prev <= config.stable_delta_threshold + and frame.topology_delta_ref <= config.stable_delta_threshold + ): + stable_count += 1 + + for label_frames in by_label.values(): + sorted_frames = sorted(label_frames, key=lambda item: item.frame_index) + for prev, current in zip(sorted_frames, sorted_frames[1:]): + if current.components != prev.components: + component_change_count += 1 + if current.holes != prev.holes: + hole_change_count += 1 + if ( + current.chunk_index is not None + and prev.chunk_index is not None + and current.chunk_index != prev.chunk_index + and current.topology_delta_prev > config.stable_delta_threshold + ): + boundary_jump_count += 1 + + deltas = [frame.topology_delta_ref for frame in frames] + mean_delta = sum(deltas) / len(deltas) + max_delta = max(deltas) + stable_frame_ratio = stable_count / len(frames) + drift_penalty = min(1.0, mean_delta / 4.0) + boundary_penalty = min(0.25, boundary_jump_count / max(1, len(frames))) + stability_score = max(0.0, min(1.0, stable_frame_ratio * (1.0 - drift_penalty) - boundary_penalty)) + + return StabilitySummary( + frame_count=len(frames), + label_count=label_count, + component_change_count=component_change_count, + hole_change_count=hole_change_count, + mean_topology_delta_ref=mean_delta, + max_topology_delta_ref=max_delta, + boundary_jump_count=boundary_jump_count, + stable_frame_ratio=stable_frame_ratio, + stability_score=stability_score, + ) + + +def frame_to_state_vector(frame: FrameTopology) -> TopologyStateVector: + """Convert a frame report into a normalized state vector. + + This is the bridge from evaluation to topological inference. It is small on + purpose: downstream ranking and neighborhood checks should not need to know + the whole frame schema. + """ + + centroid_x_norm = 0.0 if frame.centroid_x is None else frame.centroid_x / max(1, frame.width - 1) + centroid_y_norm = 0.0 if frame.centroid_y is None else frame.centroid_y / max(1, frame.height - 1) + return TopologyStateVector( + components=float(frame.components), + holes=float(frame.holes), + euler_characteristic=float(frame.euler_characteristic), + area_ratio=float(frame.area_ratio), + centroid_x_norm=float(centroid_x_norm), + centroid_y_norm=float(centroid_y_norm), + topology_delta_prev=float(frame.topology_delta_prev), + topology_delta_ref=float(frame.topology_delta_ref), + ) + + +def topology_state_distance(left: TopologyStateVector, right: TopologyStateVector) -> float: + """Weighted L1 distance for sparse topology-neighborhood reasoning.""" + + return ( + 1.0 * abs(left.components - right.components) + + 1.0 * abs(left.holes - right.holes) + + 0.5 * abs(left.euler_characteristic - right.euler_characteristic) + + 0.5 * abs(left.area_ratio - right.area_ratio) + + 0.25 * abs(left.centroid_x_norm - right.centroid_x_norm) + + 0.25 * abs(left.centroid_y_norm - right.centroid_y_norm) + + 0.25 * abs(left.topology_delta_prev - right.topology_delta_prev) + + 0.25 * abs(left.topology_delta_ref - right.topology_delta_ref) + ) + + +def _state_subtract(left: TopologyStateVector, right: TopologyStateVector, scale: float) -> TopologyStateVector: + return TopologyStateVector( + components=(left.components - right.components) * scale, + holes=(left.holes - right.holes) * scale, + euler_characteristic=(left.euler_characteristic - right.euler_characteristic) * scale, + area_ratio=(left.area_ratio - right.area_ratio) * scale, + centroid_x_norm=(left.centroid_x_norm - right.centroid_x_norm) * scale, + centroid_y_norm=(left.centroid_y_norm - right.centroid_y_norm) * scale, + topology_delta_prev=(left.topology_delta_prev - right.topology_delta_prev) * scale, + topology_delta_ref=(left.topology_delta_ref - right.topology_delta_ref) * scale, + ) + + +def compute_fdtd_rollout_trace(frames: Sequence[FrameTopology]) -> list[FiniteDifferenceSample]: + """Compute finite-difference topology dynamics over a rollout. + + The trace is an FDTD-inspired diagnostic over topology state, not a physical + PDE solver. It exposes local topology velocity and acceleration so chunk + boundaries and unstable rollouts can be inspected with deterministic numbers. + """ + + by_label: dict[str, list[FrameTopology]] = {} + for frame in frames: + by_label.setdefault(frame.label, []).append(frame) + + samples: list[FiniteDifferenceSample] = [] + for label_frames in by_label.values(): + ordered = sorted(label_frames, key=lambda item: item.frame_index) + previous_velocity: TopologyStateVector | None = None + for prev, current in zip(ordered, ordered[1:]): + prev_state = frame_to_state_vector(prev) + current_state = frame_to_state_vector(current) + if prev.timestamp_s is not None and current.timestamp_s is not None: + dt = max(current.timestamp_s - prev.timestamp_s, 1e-9) + else: + dt = max(current.frame_index - prev.frame_index, 1) + + velocity = _state_subtract(current_state, prev_state, 1.0 / dt) + topology_speed = topology_state_distance(current_state, prev_state) / dt + + if previous_velocity is None: + acceleration = TopologyStateVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + else: + acceleration = _state_subtract(velocity, previous_velocity, 1.0 / dt) + topology_acceleration = topology_state_distance(velocity, previous_velocity or velocity) / dt + centroid_velocity = ( + abs(velocity.centroid_x_norm) ** 2 + abs(velocity.centroid_y_norm) ** 2 + ) ** 0.5 + + samples.append( + FiniteDifferenceSample( + label=current.label, + frame_index=current.frame_index, + generated_index=current.generated_index, + chunk_index=current.chunk_index, + dt_s=float(dt), + topology_speed=float(topology_speed), + topology_acceleration=float(topology_acceleration), + component_velocity=float(velocity.components), + hole_velocity=float(velocity.holes), + euler_velocity=float(velocity.euler_characteristic), + area_velocity=float(velocity.area_ratio), + centroid_velocity=float(centroid_velocity), + local_change_ratio=float(topology_speed * dt), + ) + ) + previous_velocity = velocity + + return samples + + +def betti_stability_score(frames: Sequence[FrameTopology], window: int = 4) -> float: + """Return how often component/hole proxies remain stable over a sliding window.""" + + if window < 2: + raise ValueError("window must be >= 2") + if len(frames) < 2: + return 1.0 + + sorted_frames = sorted(frames, key=lambda item: (item.label, item.frame_index)) + stable_windows = 0 + total_windows = 0 + by_label: dict[str, list[FrameTopology]] = {} + for frame in sorted_frames: + by_label.setdefault(frame.label, []).append(frame) + + for label_frames in by_label.values(): + if len(label_frames) < window: + continue + for start in range(0, len(label_frames) - window + 1): + segment = label_frames[start : start + window] + total_windows += 1 + beta_pairs = {(frame.components, frame.holes) for frame in segment} + if len(beta_pairs) == 1: + stable_windows += 1 + + if total_windows == 0: + return 1.0 + return stable_windows / total_windows + + +def evaluate_topological_convergence( + report: TopologyReport, + gate: TopologyConvergenceGate = TopologyConvergenceGate(), +) -> TopologyConvergenceResult: + """Evaluate theorem-style topology convergence gates for a rollout report.""" + + trace = compute_fdtd_rollout_trace(report.frames) + max_speed = max((sample.topology_speed for sample in trace), default=0.0) + betti_score = betti_stability_score(report.frames, window=gate.betti_window) + checks = { + "stability_score": report.summary.stability_score >= gate.min_stability_score, + "betti_stability": betti_score >= gate.min_betti_stability, + "mean_delta_ref": report.summary.mean_topology_delta_ref <= gate.max_mean_delta_ref, + "boundary_jump_count": report.summary.boundary_jump_count <= gate.max_boundary_jump_count, + "topology_speed": max_speed <= gate.max_topology_speed, + } + metrics = { + "stability_score": report.summary.stability_score, + "betti_stability": betti_score, + "mean_delta_ref": report.summary.mean_topology_delta_ref, + "boundary_jump_count": float(report.summary.boundary_jump_count), + "max_topology_speed": max_speed, + } + messages = tuple(name for name, ok in checks.items() if not ok) + return TopologyConvergenceResult( + passed=all(checks.values()), + checks=checks, + metrics=metrics, + messages=messages, + ) + + +def rank_topology_specialists( + query: TopologyStateVector, + specialists: Sequence[SpecialistSignal], + *, + task_tags: Sequence[str] = (), + top_k: int = 5, + similarity_weight: float = 1.0, + reliability_weight: float = 0.5, + cost_weight: float = 0.2, + freshness_weight: float = 0.1, +) -> list[SpecialistRanking]: + """Rank robot/model specialists by topology similarity and routing signals.""" + + if top_k < 1: + raise ValueError("top_k must be >= 1") + + requested = set(task_tags) + rankings: list[SpecialistRanking] = [] + for specialist in specialists: + distance = topology_state_distance(query, specialist.state) + matched = tuple(sorted(requested.intersection(specialist.capability_tags))) + tag_score = len(matched) / max(1, len(requested)) if requested else 1.0 + freshness = 1.0 / (1.0 + max(0, specialist.utilization)) + score = ( + similarity_weight * (tag_score - distance) + + reliability_weight * specialist.reliability + - cost_weight * specialist.cost + + freshness_weight * freshness + ) + rankings.append( + SpecialistRanking( + specialist_id=specialist.specialist_id, + score=score, + topology_distance=distance, + reliability=specialist.reliability, + cost=specialist.cost, + utilization=specialist.utilization, + matched_tags=matched, + ) + ) + + rankings.sort(key=lambda item: item.score, reverse=True) + return rankings[:top_k] + + +def report_to_dict(report: TopologyReport) -> dict[str, Any]: + return { + "schema_version": report.schema_version, + "run": asdict(report.run), + "config": { + **asdict(report.config), + "persistent_homology": asdict(report.config.persistent_homology), + }, + "frames": [asdict(frame) for frame in report.frames], + "summary": asdict(report.summary), + "warnings": list(report.warnings), + } + + +def write_topology_json(report: TopologyReport, path: Path | str) -> Path: + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(report_to_dict(report), indent=2, sort_keys=True) + "\n") + return out + + +CSV_FIELDS = ( + "video_id", + "domain_name", + "label", + "frame_index", + "generated_index", + "chunk_index", + "timestamp_s", + "width", + "height", + "area_px", + "area_ratio", + "components", + "holes", + "euler_characteristic", + "topology_delta_prev", + "topology_delta_ref", + "centroid_x", + "centroid_y", + "bbox_x0", + "bbox_y0", + "bbox_x1", + "bbox_y1", + "ph_status", + "ph_betti_0", + "ph_betti_1", + "ph_h1_total_persistence", + "ph_h1_max_persistence", + "warnings", +) + + +def write_topology_csv(report: TopologyReport, path: Path | str) -> Path: + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + with out.open("w", newline="") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=CSV_FIELDS) + writer.writeheader() + for frame in report.frames: + row = asdict(frame) + row["warnings"] = ";".join(frame.warnings) + writer.writerow({field_name: row.get(field_name) for field_name in CSV_FIELDS}) + return out + + +def threshold_frame_to_mask(frame: Any, threshold: float = 32.0) -> list[list[bool]]: + """Convert a grayscale/RGB frame-like object into a binary mask. + + This is a convenience adapter for notebooks. Production evaluations should + prefer task-specific masks, labels, or keypoint-derived point clouds. + """ + + if hasattr(frame, "tolist"): + frame = frame.tolist() + + rows = list(frame) + mask: list[list[bool]] = [] + for row in rows: + mask_row: list[bool] = [] + for value in row: + if isinstance(value, Sequence) and not isinstance(value, (str, bytes)): + channels = [float(channel) for channel in value[:3]] + luminance = sum(channels) / max(1, len(channels)) + else: + luminance = float(value) + mask_row.append(luminance >= threshold) + mask.append(mask_row) + return _mask_to_bool_grid(mask) + + +def frames_to_luminance_masks(frames: Iterable[Any], threshold: float = 32.0) -> list[list[list[bool]]]: + return [threshold_frame_to_mask(frame, threshold=threshold) for frame in frames] diff --git a/tests/cookbooks/cosmos3/generator/action/test_topology_helpers.py b/tests/cookbooks/cosmos3/generator/action/test_topology_helpers.py new file mode 100644 index 00000000..193ade77 --- /dev/null +++ b/tests/cookbooks/cosmos3/generator/action/test_topology_helpers.py @@ -0,0 +1,283 @@ +import csv +import importlib.util +import json +import sys +import tempfile +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[5] +HELPER_PATH = ROOT / "cookbooks" / "cosmos3" / "generator" / "action" / "topology_helpers.py" +SPEC = importlib.util.spec_from_file_location("topology_helpers", HELPER_PATH) +topology_helpers = importlib.util.module_from_spec(SPEC) +assert SPEC.loader is not None +sys.modules["topology_helpers"] = topology_helpers +SPEC.loader.exec_module(topology_helpers) + + +def mask(rows): + return [[1 if char == "#" else 0 for char in row] for row in rows] + + +class TopologyHelpersTest(unittest.TestCase): + def test_empty_mask_has_no_components_or_holes(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + m = mask(["....", "....", "...."]) + + stats = topology_helpers.label_components(m, cfg) + + self.assertEqual(stats.components, 0) + self.assertEqual(topology_helpers.count_holes(m, cfg), 0) + + def test_solid_rectangle_has_one_component_and_no_holes(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + m = mask(["....", ".##.", ".##.", "...."]) + + stats = topology_helpers.label_components(m, cfg) + + self.assertEqual(stats.components, 1) + self.assertEqual(stats.area_px, 4) + self.assertEqual(stats.bbox, (1, 1, 2, 2)) + self.assertEqual(topology_helpers.count_holes(m, cfg), 0) + + def test_two_rectangles_are_two_components(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + m = mask(["#..#", "#..#", "...."]) + + stats = topology_helpers.label_components(m, cfg) + + self.assertEqual(stats.components, 2) + self.assertEqual(stats.area_px, 4) + + def test_ring_counts_as_one_hole(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + m = mask([".....", ".###.", ".#.#.", ".###.", "....."]) + + self.assertEqual(topology_helpers.label_components(m, cfg).components, 1) + self.assertEqual(topology_helpers.count_holes(m, cfg), 1) + + def test_ring_touching_border_does_not_count_as_hole(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + m = mask(["###.", "#.#.", "###.", "...."]) + + self.assertEqual(topology_helpers.count_holes(m, cfg), 1) + + open_to_border = mask(["#.#.", "#.#.", "###.", "...."]) + self.assertEqual(topology_helpers.count_holes(open_to_border, cfg), 0) + + def test_noise_removed_by_area_threshold(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=3) + m = mask(["#...", "....", "..##", "...."]) + + stats = topology_helpers.label_components(m, cfg) + + self.assertEqual(stats.components, 0) + + def test_diagonal_connectivity_is_configurable(self): + m = mask(["#.", ".#"]) + cfg4 = topology_helpers.TopologyConfig(min_component_area_px=1, foreground_connectivity=4) + cfg8 = topology_helpers.TopologyConfig(min_component_area_px=1, foreground_connectivity=8) + + self.assertEqual(topology_helpers.label_components(m, cfg4).components, 2) + self.assertEqual(topology_helpers.label_components(m, cfg8).components, 1) + + def test_sequence_reports_drift_and_chunk_indices(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1, generated_frame_start=1) + rollout = topology_helpers.RolloutSpec( + video_id="robotics_action_cond_stitched", + domain_name="droid_lerobot", + fps=15, + action_chunk_size=2, + ) + masks = [ + mask(["....", "....", "....", "...."]), + mask(["....", ".##.", ".##.", "...."]), + mask(["#..#", "#..#", "....", "...."]), + mask(["#..#", "#..#", "....", "...."]), + ] + + report = topology_helpers.evaluate_fd_rollout(masks, rollout, cfg) + + self.assertEqual(report.schema_version, "topology_metrics.v1") + self.assertEqual(len(report.frames), 3) + self.assertEqual(report.frames[0].frame_index, 1) + self.assertEqual(report.frames[0].generated_index, 0) + self.assertEqual(report.frames[0].chunk_index, 0) + self.assertEqual(report.frames[1].components, 2) + self.assertEqual(report.frames[1].topology_delta_prev, 1) + self.assertEqual(report.frames[2].chunk_index, 1) + self.assertEqual(report.summary.component_change_count, 1) + + def test_conditioning_reference_compares_to_conditioning_frame(self): + cfg = topology_helpers.TopologyConfig( + min_component_area_px=1, + generated_frame_start=1, + reference="conditioning", + ) + rollout = topology_helpers.RolloutSpec(video_id="robotics_action_cond") + masks = [ + mask(["....", "....", "....", "...."]), + mask(["....", ".##.", ".##.", "...."]), + ] + + report = topology_helpers.evaluate_fd_rollout(masks, rollout, cfg) + + self.assertEqual(report.frames[0].topology_delta_ref, 1) + + def test_labeled_sequences_validate_equal_lengths(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + rollout = topology_helpers.RolloutSpec(video_id="x") + + with self.assertRaisesRegex(ValueError, "same number of frames"): + topology_helpers.evaluate_fd_rollout( + {"object": [mask(["#"])], "gripper": [mask(["#"]), mask(["."])]}, + rollout, + cfg, + ) + + def test_json_and_csv_outputs_are_stable(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + rollout = topology_helpers.RolloutSpec(video_id="umi_action_cond_stitched", domain_name="umi") + report = topology_helpers.evaluate_fd_rollout( + [mask(["..", "##"]), mask(["..", "##"])], + rollout, + cfg, + ) + + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + json_path = topology_helpers.write_topology_json(report, tmp_path / "metrics.json") + csv_path = topology_helpers.write_topology_csv(report, tmp_path / "metrics.csv") + + loaded = json.loads(json_path.read_text()) + self.assertEqual(loaded["schema_version"], "topology_metrics.v1") + self.assertEqual(loaded["run"]["video_id"], "umi_action_cond_stitched") + + with csv_path.open(newline="") as csv_file: + rows = list(csv.DictReader(csv_file)) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["video_id"], "umi_action_cond_stitched") + self.assertIn("topology_delta_ref", rows[0]) + + def test_threshold_frame_to_mask_accepts_rgb_rows(self): + frame = [ + [(0, 0, 0), (255, 255, 255)], + [(20, 20, 20), (60, 60, 60)], + ] + + result = topology_helpers.threshold_frame_to_mask(frame, threshold=32) + + self.assertEqual(result, [[False, True], [False, True]]) + + def test_state_vector_distance_and_betti_stability(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1) + rollout = topology_helpers.RolloutSpec(video_id="stable") + report = topology_helpers.evaluate_fd_rollout( + [ + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + ], + rollout, + cfg, + ) + + first = topology_helpers.frame_to_state_vector(report.frames[0]) + second = topology_helpers.frame_to_state_vector(report.frames[1]) + + self.assertEqual(topology_helpers.topology_state_distance(first, second), 0.0) + self.assertEqual(topology_helpers.betti_stability_score(report.frames, window=3), 1.0) + + def test_fdtd_trace_and_convergence_gate_for_stable_rollout(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1, generated_frame_start=0) + rollout = topology_helpers.RolloutSpec(video_id="stable", fps=10) + report = topology_helpers.evaluate_fd_rollout( + [ + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + mask(["....", ".##.", ".##.", "...."]), + ], + rollout, + cfg, + ) + + trace = topology_helpers.compute_fdtd_rollout_trace(report.frames) + result = topology_helpers.evaluate_topological_convergence(report) + + self.assertEqual(len(trace), 3) + self.assertTrue(all(sample.topology_speed == 0.0 for sample in trace)) + self.assertTrue(result.passed) + + def test_convergence_gate_fails_on_topology_jump(self): + cfg = topology_helpers.TopologyConfig(min_component_area_px=1, generated_frame_start=0) + rollout = topology_helpers.RolloutSpec(video_id="jump", fps=10) + report = topology_helpers.evaluate_fd_rollout( + [ + mask(["....", ".##.", ".##.", "...."]), + mask(["#..#", "#..#", "....", "...."]), + mask(["#..#", "#..#", "....", "...."]), + mask(["#..#", "#..#", "....", "...."]), + ], + rollout, + cfg, + ) + gate = topology_helpers.TopologyConvergenceGate(max_topology_speed=0.5) + + result = topology_helpers.evaluate_topological_convergence(report, gate) + + self.assertFalse(result.passed) + self.assertFalse(result.checks["topology_speed"]) + + def test_sparse_topology_specialist_ranking(self): + query = topology_helpers.TopologyStateVector( + components=1, + holes=0, + euler_characteristic=1, + area_ratio=0.25, + centroid_x_norm=0.5, + centroid_y_norm=0.5, + topology_delta_prev=0, + topology_delta_ref=0, + ) + close = topology_helpers.SpecialistSignal( + specialist_id="droid-contact", + capability_tags=("droid", "contact"), + state=query, + reliability=0.95, + cost=0.1, + utilization=0, + ) + far = topology_helpers.SpecialistSignal( + specialist_id="umi-open-space", + capability_tags=("umi",), + state=topology_helpers.TopologyStateVector( + components=4, + holes=2, + euler_characteristic=2, + area_ratio=0.9, + centroid_x_norm=0.0, + centroid_y_norm=0.0, + topology_delta_prev=3, + topology_delta_ref=3, + ), + reliability=1.0, + cost=0.0, + utilization=0, + ) + + ranked = topology_helpers.rank_topology_specialists( + query, + [far, close], + task_tags=("droid", "contact"), + top_k=1, + ) + + self.assertEqual(ranked[0].specialist_id, "droid-contact") + self.assertEqual(ranked[0].matched_tags, ("contact", "droid")) + + +if __name__ == "__main__": + unittest.main()