Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions src/odemis/driver/actuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import math
import numbers
import threading
import time
from concurrent.futures._base import CancelledError, FINISHED
from concurrent import futures
from typing import Dict, Union, Set
Expand Down Expand Up @@ -1377,9 +1378,14 @@ def __init__(self, name, role, dependencies, axis_name, positions, cycle=None,
if cycle is not None:
if ref_start is None:
# The reference switch is typically at 0, so a little after that is often a good
# starting point (assuming the referencing goes towards negative direction).
self._ref_start = self._cycle / len(self._positions)
# starting point. As we don't know which direction goes the referencing, we
# start with two candidates, and the faster one will be used.
self._ref_start_auto = True
self._ref_start_candidates = [cycle * 0.1, cycle * 0.9]
self._ref_candidate_times = [None, None]
self._ref_start = self._ref_start_candidates[0]
Comment thread
pieleric marked this conversation as resolved.
else:
self._ref_start_auto = False
self._ref_start = ref_start
if not all(0 <= p < cycle for p in positions.keys()):
raise ValueError("Positions must be between 0 and %s (non inclusive)" % (cycle,))
Expand Down Expand Up @@ -1492,20 +1498,14 @@ def _doMoveAbs(self, pos):
move = {self._caxis: req_pos}
self._dependency.moveAbs(move).result()
else:
cur_pos = self._dependency.position.value[self._caxis]

# After moving more than the equivalent of a full cycle, reference again to get rid
# After moving more than the equivalent of 2 full cycles, reference again to get rid
# of accumulated error.
if self._move_sum >= self._cycle:
if self._move_sum >= 2 * self._cycle:
logging.debug("Re-referencing axis %s (-> %s) after moving %s",
self._axis, self._caxis, self._move_sum)
# Move near the reference switch, using the shortest way, to save a bit of time
shift = util.rot_shortest_move(cur_pos, self._ref_start, self._cycle)
self._dependency.moveRel({self._caxis: shift}).result()
self._dependency.reference({self._caxis}).result()
self._move_sum = 0
cur_pos = self._dependency.position.value[self._caxis]
self._quick_reference()

cur_pos = self._dependency.position.value[self._caxis]
# Optimize by moving through the closest way
shift = util.rot_shortest_move(cur_pos, req_pos, self._cycle)
self._move_sum += abs(shift)
Expand All @@ -1517,6 +1517,60 @@ def _doMoveAbs(self, pos):
finally:
self._dependency.position.subscribe(self._update_dep_position, init=True)

def _quick_reference(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check the file around line 1520
head -1530 src/odemis/driver/actuator.py | tail -20

Repository: delmic/odemis

Length of output: 1184


🏁 Script executed:

# Get more context around line 1520
sed -n '1510,1535p' src/odemis/driver/actuator.py

Repository: delmic/odemis

Length of output: 1426


🏁 Script executed:

# Check if _quick_reference method exists and look at its full definition
rg -n "_quick_reference" src/odemis/driver/actuator.py -A 10

Repository: delmic/odemis

Length of output: 1448


🏁 Script executed:

# Check the overall type hint patterns in the file (count methods with and without return types)
wc -l src/odemis/driver/actuator.py

Repository: delmic/odemis

Length of output: 91


🏁 Script executed:

# Get the complete _quick_reference method
sed -n '1520,1570p' src/odemis/driver/actuator.py

Repository: delmic/odemis

Length of output: 3101


🏁 Script executed:

# Check type hint patterns in the file - look at a few other methods
grep -n "def " src/odemis/driver/actuator.py | head -30

Repository: delmic/odemis

Length of output: 1380


🏁 Script executed:

# Sample some actual method signatures with return types
sed -n '100,150p' src/odemis/driver/actuator.py | grep -A 1 "def "

Repository: delmic/odemis

Length of output: 290


🏁 Script executed:

# Check several method signatures to see the return type annotation pattern
grep -n "def " src/odemis/driver/actuator.py | grep -E "->|None|int|str" | head -20

Repository: delmic/odemis

Length of output: 167


🏁 Script executed:

# Sample a few actual method definitions with their signatures
sed -n '187,195p' src/odemis/driver/actuator.py
sed -n '253,260p' src/odemis/driver/actuator.py
sed -n '290,300p' src/odemis/driver/actuator.py

Repository: delmic/odemis

Length of output: 1096


🏁 Script executed:

# Check if _quick_reference has any return statements
sed -n '1520,1600p' src/odemis/driver/actuator.py | grep -n "return"

Repository: delmic/odemis

Length of output: 126


🏁 Script executed:

# Check methods in the file that DO have return type annotations
rg "def.*->.*:" src/odemis/driver/actuator.py | head -15

Repository: delmic/odemis

Length of output: 132


🏁 Script executed:

# Check the end of _quick_reference method to see what it returns
sed -n '1520,1600p' src/odemis/driver/actuator.py | tail -20

Repository: delmic/odemis

Length of output: 720


🏁 Script executed:

# Get the complete _quick_reference method definition to see what it returns
sed -n '1520,1595p' src/odemis/driver/actuator.py

Repository: delmic/odemis

Length of output: 4112


Add the return annotation to _quick_reference.

The method is missing -> None. Per the coding guidelines, Python functions must include type hints for return types.

Minimal fix
-    def _quick_reference(self):
+    def _quick_reference(self) -> None:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _quick_reference(self):
def _quick_reference(self) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/odemis/driver/actuator.py` at line 1520, The _quick_reference method is
missing a return type annotation; update its definition (the def
_quick_reference(...) declaration) to include the return annotation "-> None" so
the signature becomes def _quick_reference(...) -> None: to comply with the
project's type-hinting guidelines.

"""
Do a quick reference by moving to a position near the reference switch and then referencing.
Assuming we know which direction uses the reference procedure, *on a cyclic axis*,
it's faster than potentially doing a full turn.
When ref_start was not provided at init, the method learns which of two candidate start
positions (10% or 90% of the cycle) yields a faster reference operation, and
permanently uses the faster one once both have been tried.
"""
cur_pos = self._dependency.position.value[self._caxis]
shift = util.rot_shortest_move(cur_pos, self._ref_start, self._cycle)

t_premove_start = time.monotonic()
self._dependency.moveRel({self._caxis: shift}).result()
t_premove = time.monotonic() - t_premove_start

t_ref_start = time.monotonic()
self._dependency.reference({self._caxis}).result()
t_ref = time.monotonic() - t_ref_start

if self._ref_start_auto:
idx = self._ref_start_candidates.index(self._ref_start)
if self._ref_candidate_times[idx] is None:
# First time measuring this candidate: record its duration and decide what to do next.
self._ref_candidate_times[idx] = t_ref
logging.debug("Candidate ref_start %g took %.3f s for reference (pre-move %.3f s)",
self._ref_start_candidates[idx], t_ref, t_premove)

other_idx = 1 - idx
if self._ref_candidate_times[other_idx] is None:
# Other candidate not tried yet; switch to it if this one was slow
is_long = t_ref > (2 * t_premove + 0.01)
if is_long:
logging.debug("Reference from %g was slow (%.3f s > 2 * %.3f s), "
"trying the other candidate (%g) next time",
self._ref_start_candidates[idx], t_ref, t_premove,
self._ref_start_candidates[other_idx])
self._ref_start = self._ref_start_candidates[other_idx]
else: # Fast enough => stick to it
logging.debug(
"Reference from %g was short enough (%.3f s < 2 * %.3f s), will keep using it.",
self._ref_start_candidates[idx], t_ref, t_premove)
self._ref_start_auto = False
Comment thread
pieleric marked this conversation as resolved.
else:
# Both candidates have been measured; pick the faster one permanently
best_idx = 0 if self._ref_candidate_times[0] <= self._ref_candidate_times[1] else 1
self._ref_start = self._ref_start_candidates[best_idx]
self._ref_start_auto = False
logging.info("Settled on ref_start %g (%.3f s) over %g (%.3f s)",
self._ref_start_candidates[best_idx], self._ref_candidate_times[best_idx],
self._ref_start_candidates[1 - best_idx], self._ref_candidate_times[1 - best_idx])

self._move_sum = 0

def _doReference(self, axes):
logging.debug("Referencing axis %s (-> %s)", self._axis, self._caxis)
f = self._dependency.reference({self._caxis})
Expand Down
105 changes: 105 additions & 0 deletions src/odemis/driver/test/actuator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import time
import unittest
import unittest.mock

import odemis
import simulated_test
Expand Down Expand Up @@ -192,6 +193,110 @@ def tearDown(self):
super(FixedPositionsTest, self).tearDown()


class TestFixedPositionsActuatorAdaptiveRef(unittest.TestCase):
"""Tests the adaptive ref_start learning in _quick_reference()."""

def setUp(self):
self.dependency = simulated.Stage("sstage_adaptive", "test", {"a"})
self.dev = FixedPositionsActuator(
"stage_adaptive", "stage",
{"x": self.dependency}, "a",
{0: "pos0", 0.01: "pos1", 0.02: "pos2",
0.03: "pos3", 0.04: "pos4", 0.05: "pos5"},
cycle=0.06,
)
# Provide a mock reference() on the dependency since simulated.Stage doesn't have one
self.dependency.reference = unittest.mock.MagicMock(
return_value=model.InstantaneousFuture()
)

def tearDown(self):
self.dev.terminate()
self.dependency.terminate()

def _run_quick_reference_with_timing(self, t_premove: float, t_ref: float):
"""
Call _quick_reference() with mocked time.monotonic() so that the
reported pre-move duration is t_premove and the reference duration is t_ref.

:param t_premove: simulated pre-move duration in seconds
:param t_ref: simulated reference duration in seconds
"""
# time.monotonic() is called 4 times: start_premove, end_premove, start_ref, end_ref
side_effects = [0.0, t_premove, t_premove, t_premove + t_ref]
with unittest.mock.patch("odemis.driver.actuator.time.monotonic",
side_effect=side_effects):
self.dev._quick_reference()
Comment thread
pieleric marked this conversation as resolved.

def test_initial_state(self):
"""Without explicit ref_start, auto mode is enabled and the first candidate is cycle * 0.1."""
self.assertTrue(self.dev._ref_start_auto)
self.assertAlmostEqual(self.dev._ref_start, 0.06 * 0.1)
self.assertEqual(self.dev._ref_candidate_times, [None, None])
Comment thread
pieleric marked this conversation as resolved.
Comment thread
pieleric marked this conversation as resolved.

def test_explicit_ref_start_disables_adaptation(self):
"""With an explicit ref_start, adaptive learning should be disabled."""
dep2 = simulated.Stage("sstage_explicit", "test", {"a"})
dev2 = FixedPositionsActuator(
"stage_explicit", "stage",
{"x": dep2}, "a",
{0: "pos0", 0.01: "pos1", 0.02: "pos2",
0.03: "pos3", 0.04: "pos4", 0.05: "pos5"},
cycle=0.06,
ref_start=0.005,
)
try:
self.assertFalse(dev2._ref_start_auto)
self.assertAlmostEqual(dev2._ref_start, 0.005)
finally:
dev2.terminate()
dep2.terminate()

def test_slow_candidate_a_switches_to_candidate_b(self):
"""If candidate A is slow (t_ref > 2 * t_premove), switch to candidate B next time."""
# t_premove=0.1 s, t_ref=0.5 s → 0.5 > 2*0.1, so "long"
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.5)

self.assertAlmostEqual(self.dev._ref_candidate_times[0], 0.5)
self.assertIsNone(self.dev._ref_candidate_times[1])
self.assertAlmostEqual(self.dev._ref_start, 0.06 * 0.9)
self.assertTrue(self.dev._ref_start_auto)

def test_fast_candidate_a_stays_on_candidate_a(self):
"""If candidate A is fast (t_ref <= 2 * t_premove), keep using candidate A."""
# t_premove=0.1 s, t_ref=0.15 s → 0.15 <= 2*0.1, so "fast"
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.15)

self.assertAlmostEqual(self.dev._ref_candidate_times[0], 0.15)
self.assertIsNone(self.dev._ref_candidate_times[1])
self.assertAlmostEqual(self.dev._ref_start, 0.06 * 0.1)
self.assertFalse(self.dev._ref_start_auto)

def test_both_candidates_tried_picks_faster_b(self):
"""After both candidates are timed, the one with the smaller t_ref is used permanently."""
# First call: slow A → switches to B
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.5)
# Second call: B is faster
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.2)

self.assertAlmostEqual(self.dev._ref_candidate_times[0], 0.5)
self.assertAlmostEqual(self.dev._ref_candidate_times[1], 0.2)
self.assertAlmostEqual(self.dev._ref_start, 0.06 * 0.9) # B is faster
self.assertFalse(self.dev._ref_start_auto) # learning complete

def test_both_candidates_tried_picks_faster_a(self):
"""If A is faster than B after both are tried, A is selected permanently."""
# First call: slow A → switches to B
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.5)
# Second call: B is even slower than A
self._run_quick_reference_with_timing(t_premove=0.1, t_ref=0.8)

self.assertAlmostEqual(self.dev._ref_candidate_times[0], 0.5)
self.assertAlmostEqual(self.dev._ref_candidate_times[1], 0.8)
self.assertAlmostEqual(self.dev._ref_start, 0.06 * 0.1) # A is faster
self.assertFalse(self.dev._ref_start_auto) # learning complete


class FixedPositionsTestAntibacklash(unittest.TestCase):

def setUp(self):
Expand Down
Loading