Add sensor bias estimation components and multi-sensor fusion bias example#1217
Add sensor bias estimation components and multi-sensor fusion bias example#1217sdhiscocks wants to merge 15 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1217 +/- ##
==========================================
+ Coverage 94.56% 94.65% +0.09%
==========================================
Files 233 236 +3
Lines 16756 16981 +225
Branches 2355 2383 +28
==========================================
+ Hits 15845 16074 +229
+ Misses 611 609 -2
+ Partials 300 298 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
csherman-dstl
left a comment
There was a problem hiding this comment.
All looks good just needs some docstrings
| class TimeBiasFeeder(_BiasFeeder): | ||
| """Time Bias Feeder | ||
|
|
||
| Apply bias to detection timestamp and overall timestamp yielded. |
There was a problem hiding this comment.
My understanding is that these feeders remove the bias from the detection, but the docstrings for all of them state that they add the bias
| expected_rotated = build_rotation_matrix(-feeder.bias) \ | ||
| @ (det.state_vector - det.measurement_model.translation_offset) |
There was a problem hiding this comment.
| expected_rotated = build_rotation_matrix(-feeder.bias) \ | |
| @ (det.state_vector - det.measurement_model.translation_offset) | |
| expected_rotated = build_rotation_matrix(-feeder.bias[:3]) \ | |
| @ (det.state_vector + feeder.bias[3:]) |
|
|
||
|
|
||
| class BiasModelWrapper(MeasurementModel): | ||
| """Abstract wrapper that applies bias values to an existing MeasurementModel. |
There was a problem hiding this comment.
Similarly to comment on feeders, are these not taking in an already biased sensor and removing the bias?
| new_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=pred_time) | ||
| self.bias_state.state_vector = new_bias_state.state_vector | ||
| self.bias_state.covar = new_bias_state.covar | ||
| self.bias_state.timestamp = new_bias_state.timestamp |
There was a problem hiding this comment.
This logic is presumably only required due to the fact that the bias_feeder and bias_updater both use the bias_state but don't get it from each other.
Does it make more sense for the bias_feeder to take the updater as a property instead of the state, or to use a bias_track (like in the example) to just add the updated bias to
hpritchett-dstl
left a comment
There was a problem hiding this comment.
Looks good to me. Minor changes in wording, especially for the updater, as it's a bit more involved than a typical updater.
| # Predict bias | ||
| pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) | ||
| if self.bias_state.timestamp is None: | ||
| self.bias_state.timestamp = pred_time | ||
| else: | ||
| new_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=pred_time) | ||
| self.bias_state.state_vector = new_bias_state.state_vector | ||
| self.bias_state.covar = new_bias_state.covar | ||
| self.bias_state.timestamp = new_bias_state.timestamp | ||
|
|
||
| # Create joint state | ||
| states = [hypothesis.prediction for hypothesis in hypotheses] | ||
| applied_bias = next( | ||
| (h.measurement.measurement_model.applied_bias | ||
| for h in hypotheses | ||
| if hasattr(h.measurement.measurement_model, 'applied_bias')), | ||
| np.zeros((ndim_bias, 1))) | ||
| delta_bias = self.bias_state.state_vector - applied_bias | ||
| states.append(GaussianState(delta_bias, self.bias_state.covar)) | ||
| combined_pred = GaussianState( | ||
| np.vstack([state.state_vector for state in states]).view(StateVector), | ||
| block_diag(*[state.covar for state in states]).view(CovarianceMatrix), | ||
| timestamp=pred_time, | ||
| ) | ||
|
|
||
| # Create joint measurement | ||
| offset = 0 | ||
| models = [] | ||
| for prediction, measurement in ( | ||
| (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): | ||
| models.append(self.bias_model_wrapper( | ||
| ndim_state=combined_pred.state_vector.shape[0], | ||
| measurement_model=measurement.measurement_model, | ||
| state_mapping=[offset + n for n in range(prediction.ndim)], | ||
| bias_mapping=list(range(-ndim_bias, 0)) | ||
| )) | ||
| offset += prediction.ndim | ||
| combined_meas = Detection( | ||
| np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), | ||
| timestamp=pred_time, | ||
| measurement_model=CombinedReversibleGaussianMeasurementModel(models)) | ||
|
|
||
| # Update bias | ||
| update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) | ||
| rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias | ||
| self.bias_state.state_vector = self.bias_state.state_vector + rel_delta_bias | ||
| if self.max_bias is not None: | ||
| self.bias_state.state_vector = \ | ||
| np.min([abs(self.bias_state.state_vector), self.max_bias], axis=0) \ | ||
| * np.sign(self.bias_state.state_vector) | ||
| self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] | ||
|
|
There was a problem hiding this comment.
Suggest using an intermediate predicted_bias_state variable to help with clarity when reading the code. something like:
| # Predict bias | |
| pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) | |
| if self.bias_state.timestamp is None: | |
| self.bias_state.timestamp = pred_time | |
| else: | |
| new_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=pred_time) | |
| self.bias_state.state_vector = new_bias_state.state_vector | |
| self.bias_state.covar = new_bias_state.covar | |
| self.bias_state.timestamp = new_bias_state.timestamp | |
| # Create joint state | |
| states = [hypothesis.prediction for hypothesis in hypotheses] | |
| applied_bias = next( | |
| (h.measurement.measurement_model.applied_bias | |
| for h in hypotheses | |
| if hasattr(h.measurement.measurement_model, 'applied_bias')), | |
| np.zeros((ndim_bias, 1))) | |
| delta_bias = self.bias_state.state_vector - applied_bias | |
| states.append(GaussianState(delta_bias, self.bias_state.covar)) | |
| combined_pred = GaussianState( | |
| np.vstack([state.state_vector for state in states]).view(StateVector), | |
| block_diag(*[state.covar for state in states]).view(CovarianceMatrix), | |
| timestamp=pred_time, | |
| ) | |
| # Create joint measurement | |
| offset = 0 | |
| models = [] | |
| for prediction, measurement in ( | |
| (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): | |
| models.append(self.bias_model_wrapper( | |
| ndim_state=combined_pred.state_vector.shape[0], | |
| measurement_model=measurement.measurement_model, | |
| state_mapping=[offset + n for n in range(prediction.ndim)], | |
| bias_mapping=list(range(-ndim_bias, 0)) | |
| )) | |
| offset += prediction.ndim | |
| combined_meas = Detection( | |
| np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), | |
| timestamp=pred_time, | |
| measurement_model=CombinedReversibleGaussianMeasurementModel(models)) | |
| # Update bias | |
| update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) | |
| rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias | |
| self.bias_state.state_vector = self.bias_state.state_vector + rel_delta_bias | |
| if self.max_bias is not None: | |
| self.bias_state.state_vector = \ | |
| np.min([abs(self.bias_state.state_vector), self.max_bias], axis=0) \ | |
| * np.sign(self.bias_state.state_vector) | |
| self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] | |
| # Predict bias | |
| prediction_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) | |
| if self.bias_state.timestamp is not None: | |
| predicted_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=prediction_time) | |
| else: | |
| predicted_bias_state = self.bias_state | |
| predicted_bias_state.timestamp = prediction_time | |
| # Create joint state | |
| states = [hypothesis.prediction for hypothesis in hypotheses] | |
| applied_bias = next( | |
| (h.measurement.measurement_model.applied_bias | |
| for h in hypotheses | |
| if hasattr(h.measurement.measurement_model, 'applied_bias')), | |
| np.zeros((ndim_bias, 1))) | |
| delta_bias = predicted_bias_state.state_vector - applied_bias | |
| states.append(GaussianState(delta_bias, predicted_bias_state.covar)) | |
| combined_pred = GaussianState( | |
| np.vstack([state.state_vector for state in states]).view(StateVector), | |
| block_diag(*[state.covar for state in states]).view(CovarianceMatrix), | |
| timestamp=prediction_time, | |
| ) | |
| # Create joint measurement | |
| offset = 0 | |
| models = [] | |
| for prediction, measurement in ( | |
| (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): | |
| models.append(self.bias_model_wrapper( | |
| ndim_state=combined_pred.state_vector.shape[0], | |
| measurement_model=measurement.measurement_model, | |
| state_mapping=[offset + n for n in range(prediction.ndim)], | |
| bias_mapping=list(range(-ndim_bias, 0)) | |
| )) | |
| offset += prediction.ndim | |
| combined_meas = Detection( | |
| np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), | |
| timestamp=prediction_time, | |
| measurement_model=CombinedReversibleGaussianMeasurementModel(models)) | |
| # Update bias | |
| update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) | |
| rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias | |
| self.bias_state.state_vector = predicted_bias_state.state_vector + rel_delta_bias | |
| if self.max_bias is not None: | |
| self.bias_state.state_vector = \ | |
| np.min([abs(self.bias_state.state_vector), self.max_bias], axis=0) \ | |
| * np.sign(self.bias_state.state_vector) | |
| self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] | |
| self.bias_state.timestamp = predicted_bias_state.timestamp | |
There was a problem hiding this comment.
can you confirm if this is accounted for in c09ac5a ?
There was a problem hiding this comment.
mostly, i think it's still worth labelling self.bias_state as "prior_bias_state" on line 79, which then gets changed to predicted_bias_state after the prediction step on line 87, then we created an updated state which is appended to the track.
|
|
||
| # Update bias | ||
| update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) | ||
| rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias |
There was a problem hiding this comment.
What does rel stand for?
| bias i.e. all measurements from the same sensor. | ||
| """ | ||
| measurement_model = None | ||
| bias_state: GaussianState = Property(doc="Prior bias") |
There was a problem hiding this comment.
I feel like there should be a bit more explicit documentation somewhere (possibly not in this exact line) expanding on that this object is manipulated within this class, and is not returned, and so you must point to the same object utilized in your feeder
| delta_orient = state_vector[self.bias_mapping[:3], :] | ||
| delta_trans = state_vector[self.bias_mapping[3:], :] | ||
| bias_model = copy.copy(self.measurement_model) | ||
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orient | ||
| bias_model.translation_offset = bias_model.translation_offset - delta_trans | ||
| state_vectors.append(bias_model.function( | ||
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) |
There was a problem hiding this comment.
| delta_orient = state_vector[self.bias_mapping[:3], :] | |
| delta_trans = state_vector[self.bias_mapping[3:], :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orient | |
| bias_model.translation_offset = bias_model.translation_offset - delta_trans | |
| state_vectors.append(bias_model.function( | |
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) | |
| delta_orientation = state_vector[self.bias_mapping[:3], :] | |
| delta_translation = state_vector[self.bias_mapping[3:], :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orientation | |
| bias_model.translation_offset = bias_model.translation_offset - delta_translation | |
| state_vectors.append(bias_model.function( | |
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) |
| delta_trans = state_vector[self.bias_mapping, :] | ||
| bias_model = copy.copy(self.measurement_model) | ||
| bias_model.translation_offset = bias_model.translation_offset - delta_trans | ||
| state_vectors.append(bias_model.function( | ||
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) | ||
| if len(state_vectors) == 1: | ||
| return state_vectors[0] | ||
| else: | ||
| return StateVectors(state_vectors) |
There was a problem hiding this comment.
| delta_trans = state_vector[self.bias_mapping, :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.translation_offset = bias_model.translation_offset - delta_trans | |
| state_vectors.append(bias_model.function( | |
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) | |
| if len(state_vectors) == 1: | |
| return state_vectors[0] | |
| else: | |
| return StateVectors(state_vectors) | |
| delta_translation = state_vector[self.bias_mapping, :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.translation_offset = bias_model.translation_offset - delta_translation | |
| state_vectors.append(bias_model.function( | |
| State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) | |
| if len(state_vectors) == 1: | |
| return state_vectors[0] | |
| else: | |
| return StateVectors(state_vectors) |
| delta_orient = state_vector[self.bias_mapping, :] | ||
| bias_model = copy.copy(self.measurement_model) | ||
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orient |
There was a problem hiding this comment.
| delta_orient = state_vector[self.bias_mapping, :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orient | |
| delta_orientation = state_vector[self.bias_mapping, :] | |
| bias_model = copy.copy(self.measurement_model) | |
| bias_model.rotation_offset = bias_model.rotation_offset - delta_orientation |
| delta_t = state_vector[self.bias_mapping[0], 0] | ||
| predicted_state_vectors.append(self.transition_model.function( | ||
| State(state_vector[self.state_mapping, :]), | ||
| time_interval=datetime.timedelta(seconds=-delta_t), |
There was a problem hiding this comment.
| delta_t = state_vector[self.bias_mapping[0], 0] | |
| predicted_state_vectors.append(self.transition_model.function( | |
| State(state_vector[self.state_mapping, :]), | |
| time_interval=datetime.timedelta(seconds=-delta_t), | |
| delta_time = state_vector[self.bias_mapping[0], 0] | |
| predicted_state_vectors.append(self.transition_model.function( | |
| State(state_vector[self.state_mapping, :]), | |
| time_interval=datetime.timedelta(seconds=-delta_time), |
| # %% | ||
| # This example demonstrates how to simulate and estimate a drifting bias in the position of a sensor platform. | ||
| # Specifically, the platform at index 0 (and its sensor) will have a time-varying bias applied to its position. | ||
| # We use Stone-Soup's bias wrappers and feeders to estimate this changing bias from sensor measurements. |
There was a problem hiding this comment.
| # We use Stone-Soup's bias wrappers and feeders to estimate this changing bias from sensor measurements. | |
| # We use Stone-Soup's bias wrappers, feeders, and updater to estimate this changing bias from sensor measurements. |
|
|
||
| # sphinx_gallery_thumbnail_number = 3 | ||
|
|
||
| plotter = Plotterly(dimension=1, axis_labels=['Bias', 'Time']) |
There was a problem hiding this comment.
| plotter = Plotterly(dimension=1, axis_labels=['Bias', 'Time']) | |
| plotter = Plotterly(dimension=1, axis_labels=['Time', 'Bias']) |
c09ac5a to
96d5420
Compare
| text=[self._format_state_text(state) for state in track], | ||
| **scatter_kwargs) | ||
|
|
||
| if uncertainty: |
There was a problem hiding this comment.
Nice work on implementing the uncertainty in 1D, it's definitely a needed addition and looks good.
I just wanted to share an alternative approach I’ve used locally: plotting error bars. They give a slightly different visual cue for uncertainty.
Here’s a snippet in case you want to explore it:
if particle:
raise NotImplementedError
# Create copy of scatter kwargs
track_scatter_kwargs = {**scatter_kwargs}
if uncertainty:
mapping_0 = mapping[0]
err_y = [np.sqrt(state.covar[mapping_0, mapping_0])
for state in track.states]
track_scatter_kwargs['error_y'] = dict(
type='data', # value of error bar given in data coordinates
array=err_y,
visible=True
)
self.fig.add_scatter(
x=[state.timestamp for state in track],
y=[float(getattr(state, 'mean', state.state_vector)[mapping[0]])
for state in track],
text=[self._format_state_text(state) for state in track],
**track_scatter_kwargs)
Your current implementation works well, but thought I’d offer it as another option as I'd already written the code but hadn't got around to a PR. Let me know what you think.
This moves update logic from feeder to updater class which is more fitting to the wider design. This also now includes measurement prediction which factors in sensor bias, and such better handles data association, etc. This enables example now to include a prior bias making bias more visible and demonstrating this functionality.
271f03d to
6c3e464
Compare
This has components which can be used to estimate sensor bias in time, translation and/or orientation by using model wrappers, and feeders.