from dataclasses import dataclass from typing import Generator, Optional, Protocol, Tuple, TypeAlias, TypedDict, Union import numpy as np from jaxtyping import Int, Float, Num, jaxtyped from scipy.optimize import linear_sum_assignment NDArray = np.ndarray @dataclass class LinearMotionNoInputModel: F: Num[NDArray, "n n"] Q: Num[NDArray, "n n"] @dataclass class LinearMeasurementModel: H: Num[NDArray, "m n"] R: Num[NDArray, "m m"] Measurement = Num[NDArray, "m"] @dataclass class GaussianState: x: Num[NDArray, "n"] P: Num[NDArray, "n n"] @dataclass(kw_only=True) class CvModelGaussianState(GaussianState): """ constant velocity model with no input. Note that the state vector is `[x, y, v_x, v_y]`. This class can only verify the shape of the state vector and covariance matrix, but not the content/order of the state vector. """ x: Num[NDArray, "4"] P: Num[NDArray, "4 4"] @staticmethod def from_gaussian(state: GaussianState) -> "CvModelGaussianState": assert state.x.shape == (4,), "state must have 4 elements" assert state.P.shape == (4, 4), "covariance must be 4x4" return CvModelGaussianState(x=state.x, P=state.P) @property def position(self) -> Num[NDArray, "2"]: return self.x[:2] @property def velocity(self) -> Num[NDArray, "2"]: return self.x[2:] def predict( state: GaussianState, motion_model: LinearMotionNoInputModel, ) -> GaussianState: x = state.x P = state.P F = motion_model.F Q = motion_model.Q assert x.shape[0] == F.shape[0], "state and transition model are not compatible" assert F.shape[0] == F.shape[1], "transition model is not square" assert ( F.shape[0] == Q.shape[0] ), "transition model and noise model are not compatible" x_priori = F @ x P_priori = F @ P @ F.T + Q return GaussianState(x=x_priori, P=P_priori) @dataclass class PosterioriResult: # updated state state: GaussianState innovation: NDArray r""" y. Innovation refers to the difference between the observed measurement and the predicted measurement. Also known as the residual. .. math:: y = z - H x_{\text{priori}} """ innovation_covariance: NDArray r""" S. Innovation covariance refers to the covariance of the innovation (or residual) vector. .. math:: S = H P H^T + R """ posteriori_measurement: NDArray r""" z_posteriori. The updated measurement prediction. .. math:: z_{\text{posteriori}} = H x_{\text{posteriori}} """ mahalanobis_distance: NDArray r""" The Mahalanobis distance is a measure of the distance between a point P and a distribution D, introduced by P. Mahalanobis in 1936. .. math:: \sqrt{y^T S^{-1} y} """ squared_mahalanobis_distance: NDArray """ If you are using the distance for statistical tests, such as identifying outliers, the squared Mahalanobis distance is often used because it corresponds to the chi-squared distribution when the underlying distribution is multivariate normal. """ def predict_measurement( state: GaussianState, measure_model: LinearMeasurementModel, ) -> Measurement: x = state.x H = measure_model.H return H @ x # type: ignore def update( measurement: Measurement, state: GaussianState, measure_model: LinearMeasurementModel, ) -> PosterioriResult: x = state.x P = state.P H = measure_model.H R = measure_model.R assert x.shape[0] == H.shape[1], "state and measurement model are not compatible" assert H.shape[0] == R.shape[0], "measurement model is not square" assert H.shape[0] == R.shape[1], "measurement model is not square" z = measurement inv = np.linalg.inv # innovation # the priori measurement residual y = z - H @ x # innovation covariance S = H @ P @ H.T + R # Kalman gain K = P @ H.T @ inv(S) # posteriori state x_posteriori = x + K @ y # dummy identity matrix I = np.eye(P.shape[0]) # posteriori covariance I_KH = I - K @ H P_posteriori = I_KH @ P @ I_KH.T + K @ R @ K.T posteriori_state = GaussianState(x=x_posteriori, P=P_posteriori) posteriori_measurement = H @ x_posteriori s_m = y.T @ inv(S) @ y return PosterioriResult( state=posteriori_state, innovation=y, innovation_covariance=S, posteriori_measurement=posteriori_measurement, mahalanobis_distance=np.sqrt(s_m), squared_mahalanobis_distance=s_m, ) def cv_model( v_x: float, v_y: float, dt: float, q: float, r: float, ) -> Tuple[ LinearMotionNoInputModel, LinearMeasurementModel, GaussianState, ]: """ Create a constant velocity model with no input Args: v_x: initial velocity in x direction v_y: initial velocity in y direction dt: time interval q: process noise r: measurement noise Returns: motion_model: motion model measure_model: measurement model state: initial state """ # yapf: disable F = np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]]) H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]]) # yapf: enable Q = q * np.eye(4) R = r * np.eye(2) P = np.eye(4) motion_model = LinearMotionNoInputModel(F=F, Q=Q) measure_model = LinearMeasurementModel(H=H, R=R) state = GaussianState(x=np.array([0, 0, v_x, v_y]), P=P) return motion_model, measure_model, state def outer_distance(x: NDArray, y: NDArray) -> NDArray: """ Here's equivalent python code: ```python res = jnp.empty((x.shape[0], y.shape[0])) for i in range(x.shape[0]): for j in range(y.shape[0]): # res[i, j] = jnp.linalg.norm(x[i] - y[j]) res = res.at[i, j].set(jnp.linalg.norm(x[i] - y[j])) return res ``` See Also -------- `outer product `_ """ x_expanded = x[:, None, :] y_expanded = y[None, :, :] diff = y_expanded - x_expanded return np.linalg.norm(diff, axis=-1) @dataclass class Tracking: id: int state: GaussianState survived_time_steps: int missed_time_steps: int @dataclass class TrackerParams: dt: float = 1.0 cov_threshold: float = 4.0 tentative_mahalanobis_threshold: float = 10.0 confirm_mahalanobis_threshold: float = 10.0 forming_tracks_euclidean_threshold: float = 25.0 survival_steps_threshold: int = 3 class Tracker: """ A simple GNN tracker """ _last_measurements: NDArray = np.empty((0, 2), dtype=np.float32) _tentative_tracks: list[Tracking] = [] _confirmed_tracks: list[Tracking] = [] _last_id: int = 0 def __init__(self): self._last_measurements = np.empty((0, 2), dtype=np.float32) self._tentative_tracks = [] self._confirmed_tracks = [] @staticmethod def _predict(tracks: list[Tracking], dt: float = 1.0): return [ Tracking( id=track.id, state=predict(track.state, Tracker.motion_model(dt=dt)), survived_time_steps=track.survived_time_steps, missed_time_steps=track.missed_time_steps, ) for track in tracks ] @staticmethod def _data_associate_and_update( measurements: NDArray, tracks: list[Tracking], distance_threshold: float = 3 ) -> NDArray: """ Match tracks with measurements and update the tracks Parameters ---------- [in] measurements: Float["a 2"] [in,out] tracks: Tracking["b"] Returns ---------- return Float["... 2"] the unmatched measurements Effect ---------- find the best match by minimum Mahalanobis distance, please note that I assume the state has been predicted """ if len(tracks) == 0: return measurements def _update(measurement: NDArray, tracking: Tracking): return update(measurement, tracking.state, Tracker.measurement_model()) def outer_posteriori( measurements: NDArray, tracks: list[Tracking] ) -> list[list[PosterioriResult]]: """ calculate the outer posteriori for each measurement and track Parameters ---------- [in] measurements: Float["a 2"] [in] tracks: Tracking["b"] Returns ---------- PosterioriResult["a b"] """ return [ [_update(measurement, tracking) for measurement in measurements] for tracking in tracks ] def posteriori_to_mahalanobis( posteriori: list[list[PosterioriResult]], ) -> NDArray: """ Parameters ---------- [in] posteriori: PosterioriResult["a b"] Returns ---------- Float["a b"] """ return np.array( [[r_m.mahalanobis_distance for r_m in p_t] for p_t in posteriori], dtype=np.float32, ) posteriors = outer_posteriori(measurements, tracks) distances = posteriori_to_mahalanobis(posteriors) row, col = linear_sum_assignment(np.array(distances)) row = np.array(row) col = np.array(col) def to_be_deleted() -> Generator[Tuple[int, int], None, None]: for i, j in zip(row, col): post: PosterioriResult = posteriors[i][j] if post.mahalanobis_distance > distance_threshold: yield i, j for i, j in to_be_deleted(): row = row[row != i] col = col[col != j] for i, j in zip(row, col): track: Tracking = tracks[i] post: PosterioriResult = posteriors[i][j] track.state = post.state track.survived_time_steps += 1 tracks[i] = track for i, track in enumerate(tracks): if i not in row: # reset the survived time steps once missed track.missed_time_steps += 1 tracks[i] = track # remove measurements that have been matched left_measurements = np.delete(measurements, col, axis=0) return left_measurements def _tracks_from_past_measurements( self, measurements: NDArray, dt: float = 1.0, distance_threshold: float = 3.0 ): """ consume the last measurements and create tentative tracks from them Note ---- mutate self._tentative_tracks and self._last_measurements """ if self._last_measurements.shape[0] == 0: self._last_measurements = measurements return distances = outer_distance(self._last_measurements, measurements) row, col = linear_sum_assignment(distances) row = np.array(row) col = np.array(col) def to_be_deleted() -> Generator[Tuple[int, int], None, None]: for i, j in zip(row, col): euclidean_distance = distances[i, j] if euclidean_distance > distance_threshold: yield i, j for i, j in to_be_deleted(): row = row[row != i] col = col[col != j] for i, j in zip(row, col): coord = measurements[j] vel = (coord - self._last_measurements[i]) / dt s = np.concatenate([coord, vel]) state = GaussianState(x=s, P=np.eye(4)) track = Tracking( id=self._last_id, state=state, survived_time_steps=0, missed_time_steps=0, ) self._last_id += 1 self._tentative_tracks.append(track) # update the last measurements with the unmatched measurements self._last_measurements = np.delete(measurements, col, axis=0) def _transfer_tentative_to_confirmed(self, survival_steps_threshold: int = 3): """ transfer tentative tracks to confirmed tracks Note ---- mutate self._tentative_tracks and self._confirmed_tracks in place """ for i, track in enumerate(self._tentative_tracks): if track.survived_time_steps > survival_steps_threshold: self._confirmed_tracks.append(track) self._tentative_tracks.pop(i) @staticmethod def _track_cov_deleter(tracks: list[Tracking], cov_threshold: float = 4.0): """ delete tracks with covariance trace greater than threshold Parameters ---------- [in,out] tracks: list[Tracking] cov_threshold: float the threshold of the covariance trace Note ---- mutate tracks in place """ for i, track in enumerate(tracks): # https://numpy.org/doc/stable/reference/generated/numpy.trace.html if np.trace(track.state.P) > cov_threshold: tracks.pop(i) def next_measurements(self, measurements: NDArray, params: TrackerParams): self._confirmed_tracks = self._predict(self._confirmed_tracks, params.dt) self._tentative_tracks = self._predict(self._tentative_tracks, params.dt) left_ = self._data_associate_and_update( measurements, self._confirmed_tracks, params.confirm_mahalanobis_threshold ) left = self._data_associate_and_update( left_, self._tentative_tracks, params.tentative_mahalanobis_threshold ) self._transfer_tentative_to_confirmed(params.survival_steps_threshold) self._tracks_from_past_measurements( left, params.dt, params.forming_tracks_euclidean_threshold ) self._track_cov_deleter(self._tentative_tracks, params.cov_threshold) self._track_cov_deleter(self._confirmed_tracks, params.cov_threshold) @property def confirmed_tracks(self): return self._confirmed_tracks @staticmethod def motion_model(dt: float = 1, q: float = 0.05) -> LinearMotionNoInputModel: """ a constant velocity motion model """ # yapf: disable F = np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]]) # yapf: enable Q = q * np.eye(4) return LinearMotionNoInputModel(F=F, Q=Q) @staticmethod def measurement_model(r: float = 0.75) -> LinearMeasurementModel: # yapf: disable H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]]) # yapf: enable R = r * np.eye(2) return LinearMeasurementModel(H=H, R=R)