From 096689d6f51851e1a59df4b11ab1bf0217d147b6 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 27 Feb 2025 16:00:13 +0800 Subject: [PATCH] init --- .gitignore | 167 ++++++++ .vscode/settings.json | 4 + README.md | 1 + app/tracker/__init__.py | 500 ++++++++++++++++++++++++ app/tracker/bboxes_tracker.py | 554 +++++++++++++++++++++++++++ app/tracker/single_object_tracker.py | 164 ++++++++ app/typing/__init__.py | 92 +++++ app/typing/constant.py | 1 + requirements.txt | 6 + 9 files changed, 1489 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 README.md create mode 100644 app/tracker/__init__.py create mode 100644 app/tracker/bboxes_tracker.py create mode 100644 app/tracker/single_object_tracker.py create mode 100644 app/typing/__init__.py create mode 100644 app/typing/constant.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ce9e10e --- /dev/null +++ b/.gitignore @@ -0,0 +1,167 @@ +output +dataset +checkpoint +sample +video +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a59dbf5 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.analysis.typeCheckingMode": "basic", + "python.analysis.diagnosticMode": "workspace" +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..8533024 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +Simple GNN tracker \ No newline at end of file diff --git a/app/tracker/__init__.py b/app/tracker/__init__.py new file mode 100644 index 0000000..49aefa1 --- /dev/null +++ b/app/tracker/__init__.py @@ -0,0 +1,500 @@ +from dataclasses import dataclass +from typing import Generator, Optional, Protocol, Tuple, TypeAlias, TypedDict, Union + +import numpy as np +from jaxtyping import Int, Float, Num, jaxtyped +from scipy.optimize import linear_sum_assignment + +NDArray = np.ndarray + + +@dataclass +class LinearMotionNoInputModel: + F: Num[NDArray, "n n"] + Q: Num[NDArray, "n n"] + + +@dataclass +class LinearMeasurementModel: + H: Num[NDArray, "m n"] + R: Num[NDArray, "m m"] + + +Measurement = Num[NDArray, "m"] + + +@dataclass +class GaussianState: + x: Num[NDArray, "n"] + P: Num[NDArray, "n n"] + + +@dataclass(kw_only=True) +class CvModelGaussianState(GaussianState): + """ + constant velocity model with no input. + + Note that the state vector is `[x, y, v_x, v_y]`. + + This class can only verify the shape of the state vector and covariance matrix, + but not the content/order of the state vector. + """ + + x: Num[NDArray, "4"] + P: Num[NDArray, "4 4"] + + @staticmethod + def from_gaussian(state: GaussianState) -> "CvModelGaussianState": + assert state.x.shape == (4,), "state must have 4 elements" + assert state.P.shape == (4, 4), "covariance must be 4x4" + return CvModelGaussianState(x=state.x, P=state.P) + + @property + def position(self) -> Num[NDArray, "2"]: + return self.x[:2] + + @property + def velocity(self) -> Num[NDArray, "2"]: + return self.x[2:] + + +def predict( + state: GaussianState, + motion_model: LinearMotionNoInputModel, +) -> GaussianState: + x = state.x + P = state.P + F = motion_model.F + Q = motion_model.Q + assert x.shape[0] == F.shape[0], "state and transition model are not compatible" + assert F.shape[0] == F.shape[1], "transition model is not square" + assert ( + F.shape[0] == Q.shape[0] + ), "transition model and noise model are not compatible" + x_priori = F @ x + P_priori = F @ P @ F.T + Q + return GaussianState(x=x_priori, P=P_priori) + + +@dataclass +class PosterioriResult: + # updated state + state: GaussianState + innovation: NDArray + r""" + y. Innovation refers to the difference between the observed measurement and the predicted measurement. Also known as the residual. + + .. math:: + + y = z - H x_{\text{priori}} + """ + innovation_covariance: NDArray + r""" + S. Innovation covariance refers to the covariance of the innovation (or residual) vector. + + .. math:: + + S = H P H^T + R + """ + posteriori_measurement: NDArray + r""" + z_posteriori. The updated measurement prediction. + + .. math:: + + z_{\text{posteriori}} = H x_{\text{posteriori}} + """ + mahalanobis_distance: NDArray + r""" + The Mahalanobis distance is a measure of the distance between a point P and a distribution D, introduced by P. Mahalanobis in 1936. + + .. math:: + + \sqrt{y^T S^{-1} y} + """ + squared_mahalanobis_distance: NDArray + """ + If you are using the distance for statistical tests, such as identifying + outliers, the squared Mahalanobis distance is often used because it corresponds + to the chi-squared distribution when the underlying distribution is multivariate + normal. + """ + + +def predict_measurement( + state: GaussianState, + measure_model: LinearMeasurementModel, +) -> Measurement: + x = state.x + H = measure_model.H + return H @ x # type: ignore + + +def update( + measurement: Measurement, + state: GaussianState, + measure_model: LinearMeasurementModel, +) -> PosterioriResult: + x = state.x + P = state.P + H = measure_model.H + R = measure_model.R + assert x.shape[0] == H.shape[1], "state and measurement model are not compatible" + assert H.shape[0] == R.shape[0], "measurement model is not square" + assert H.shape[0] == R.shape[1], "measurement model is not square" + z = measurement + inv = np.linalg.inv + # innovation + # the priori measurement residual + y = z - H @ x + # innovation covariance + S = H @ P @ H.T + R + # Kalman gain + K = P @ H.T @ inv(S) + # posteriori state + x_posteriori = x + K @ y + # dummy identity matrix + I = np.eye(P.shape[0]) + # posteriori covariance + I_KH = I - K @ H + P_posteriori = I_KH @ P @ I_KH.T + K @ R @ K.T + posteriori_state = GaussianState(x=x_posteriori, P=P_posteriori) + posteriori_measurement = H @ x_posteriori + s_m = y.T @ inv(S) @ y + return PosterioriResult( + state=posteriori_state, + innovation=y, + innovation_covariance=S, + posteriori_measurement=posteriori_measurement, + mahalanobis_distance=np.sqrt(s_m), + squared_mahalanobis_distance=s_m, + ) + + +def cv_model( + v_x: float, + v_y: float, + dt: float, + q: float, + r: float, +) -> Tuple[ + LinearMotionNoInputModel, + LinearMeasurementModel, + GaussianState, +]: + """ + Create a constant velocity model with no input + + Args: + v_x: initial velocity in x direction + v_y: initial velocity in y direction + dt: time interval + q: process noise + r: measurement noise + + Returns: + motion_model: motion model + measure_model: measurement model + state: initial state + """ + # yapf: disable + F = np.array([[1, 0, dt, 0], + [0, 1, 0, dt], + [0, 0, 1, 0], + [0, 0, 0, 1]]) + H = np.array([[1, 0, 0, 0], + [0, 1, 0, 0]]) + # yapf: enable + Q = q * np.eye(4) + R = r * np.eye(2) + P = np.eye(4) + motion_model = LinearMotionNoInputModel(F=F, Q=Q) + measure_model = LinearMeasurementModel(H=H, R=R) + state = GaussianState(x=np.array([0, 0, v_x, v_y]), P=P) + return motion_model, measure_model, state + + +def outer_distance(x: NDArray, y: NDArray) -> NDArray: + """ + Here's equivalent python code: + + ```python + res = jnp.empty((x.shape[0], y.shape[0])) + for i in range(x.shape[0]): + for j in range(y.shape[0]): + # res[i, j] = jnp.linalg.norm(x[i] - y[j]) + res = res.at[i, j].set(jnp.linalg.norm(x[i] - y[j])) + return res + ``` + + See Also + -------- + `outer product `_ + """ + + x_expanded = x[:, None, :] + y_expanded = y[None, :, :] + diff = y_expanded - x_expanded + return np.linalg.norm(diff, axis=-1) + + +@dataclass +class Tracking: + id: int + state: GaussianState + survived_time_steps: int + missed_time_steps: int + + +@dataclass +class TrackerParams: + dt: float = 1.0 + cov_threshold: float = 4.0 + tentative_mahalanobis_threshold: float = 10.0 + confirm_mahalanobis_threshold: float = 10.0 + forming_tracks_euclidean_threshold: float = 25.0 + survival_steps_threshold: int = 3 + + +class Tracker: + """ + A simple GNN tracker + """ + + _last_measurements: NDArray = np.empty((0, 2), dtype=np.float32) + _tentative_tracks: list[Tracking] = [] + _confirmed_tracks: list[Tracking] = [] + _last_id: int = 0 + + def __init__(self): + self._last_measurements = np.empty((0, 2), dtype=np.float32) + self._tentative_tracks = [] + self._confirmed_tracks = [] + + @staticmethod + def _predict(tracks: list[Tracking], dt: float = 1.0): + return [ + Tracking( + id=track.id, + state=predict(track.state, Tracker.motion_model(dt=dt)), + survived_time_steps=track.survived_time_steps, + missed_time_steps=track.missed_time_steps, + ) + for track in tracks + ] + + @staticmethod + def _data_associate_and_update( + measurements: NDArray, tracks: list[Tracking], distance_threshold: float = 3 + ) -> NDArray: + """ + Match tracks with measurements and update the tracks + + Parameters + ---------- + [in] measurements: Float["a 2"] + [in,out] tracks: Tracking["b"] + + Returns + ---------- + return + Float["... 2"] the unmatched measurements + + Effect + ---------- + find the best match by minimum Mahalanobis distance, please note that I assume the state has been predicted + """ + if len(tracks) == 0: + return measurements + + def _update(measurement: NDArray, tracking: Tracking): + return update(measurement, tracking.state, Tracker.measurement_model()) + + def outer_posteriori( + measurements: NDArray, tracks: list[Tracking] + ) -> list[list[PosterioriResult]]: + """ + calculate the outer posteriori for each measurement and track + + Parameters + ---------- + [in] measurements: Float["a 2"] + [in] tracks: Tracking["b"] + + Returns + ---------- + PosterioriResult["a b"] + """ + return [ + [_update(measurement, tracking) for measurement in measurements] + for tracking in tracks + ] + + def posteriori_to_mahalanobis( + posteriori: list[list[PosterioriResult]], + ) -> NDArray: + """ + Parameters + ---------- + [in] posteriori: PosterioriResult["a b"] + + Returns + ---------- + Float["a b"] + """ + return np.array( + [[r_m.mahalanobis_distance for r_m in p_t] for p_t in posteriori], + dtype=np.float32, + ) + + posteriors = outer_posteriori(measurements, tracks) + distances = posteriori_to_mahalanobis(posteriors) + row, col = linear_sum_assignment(np.array(distances)) + row = np.array(row) + col = np.array(col) + + def to_be_deleted() -> Generator[Tuple[int, int], None, None]: + for i, j in zip(row, col): + post: PosterioriResult = posteriors[i][j] + if post.mahalanobis_distance > distance_threshold: + yield i, j + + for i, j in to_be_deleted(): + row = row[row != i] + col = col[col != j] + + for i, j in zip(row, col): + track: Tracking = tracks[i] + post: PosterioriResult = posteriors[i][j] + track.state = post.state + track.survived_time_steps += 1 + tracks[i] = track + + for i, track in enumerate(tracks): + if i not in row: + # reset the survived time steps once missed + track.missed_time_steps += 1 + tracks[i] = track + # remove measurements that have been matched + left_measurements = np.delete(measurements, col, axis=0) + return left_measurements + + def _tracks_from_past_measurements( + self, measurements: NDArray, dt: float = 1.0, distance_threshold: float = 3.0 + ): + """ + consume the last measurements and create tentative tracks from them + + Note + ---- + mutate self._tentative_tracks and self._last_measurements + """ + if self._last_measurements.shape[0] == 0: + self._last_measurements = measurements + return + distances = outer_distance(self._last_measurements, measurements) + row, col = linear_sum_assignment(distances) + row = np.array(row) + col = np.array(col) + + def to_be_deleted() -> Generator[Tuple[int, int], None, None]: + for i, j in zip(row, col): + euclidean_distance = distances[i, j] + if euclidean_distance > distance_threshold: + yield i, j + + for i, j in to_be_deleted(): + row = row[row != i] + col = col[col != j] + + for i, j in zip(row, col): + coord = measurements[j] + vel = (coord - self._last_measurements[i]) / dt + s = np.concatenate([coord, vel]) + state = GaussianState(x=s, P=np.eye(4)) + track = Tracking( + id=self._last_id, + state=state, + survived_time_steps=0, + missed_time_steps=0, + ) + self._last_id += 1 + self._tentative_tracks.append(track) + # update the last measurements with the unmatched measurements + self._last_measurements = np.delete(measurements, col, axis=0) + + def _transfer_tentative_to_confirmed(self, survival_steps_threshold: int = 3): + """ + transfer tentative tracks to confirmed tracks + + Note + ---- + mutate self._tentative_tracks and self._confirmed_tracks in place + """ + for i, track in enumerate(self._tentative_tracks): + if track.survived_time_steps > survival_steps_threshold: + self._confirmed_tracks.append(track) + self._tentative_tracks.pop(i) + + @staticmethod + def _track_cov_deleter(tracks: list[Tracking], cov_threshold: float = 4.0): + """ + delete tracks with covariance trace greater than threshold + + Parameters + ---------- + [in,out] tracks: list[Tracking] + cov_threshold: float + the threshold of the covariance trace + + Note + ---- + mutate tracks in place + """ + for i, track in enumerate(tracks): + # https://numpy.org/doc/stable/reference/generated/numpy.trace.html + if np.trace(track.state.P) > cov_threshold: + tracks.pop(i) + + def next_measurements(self, measurements: NDArray, params: TrackerParams): + self._confirmed_tracks = self._predict(self._confirmed_tracks, params.dt) + self._tentative_tracks = self._predict(self._tentative_tracks, params.dt) + left_ = self._data_associate_and_update( + measurements, self._confirmed_tracks, params.confirm_mahalanobis_threshold + ) + left = self._data_associate_and_update( + left_, self._tentative_tracks, params.tentative_mahalanobis_threshold + ) + self._transfer_tentative_to_confirmed(params.survival_steps_threshold) + self._tracks_from_past_measurements( + left, params.dt, params.forming_tracks_euclidean_threshold + ) + self._track_cov_deleter(self._tentative_tracks, params.cov_threshold) + self._track_cov_deleter(self._confirmed_tracks, params.cov_threshold) + + @property + def confirmed_tracks(self): + return self._confirmed_tracks + + @staticmethod + def motion_model(dt: float = 1, q: float = 0.05) -> LinearMotionNoInputModel: + """ + a constant velocity motion model + """ + # yapf: disable + F = np.array([[1, 0, dt, 0], + [0, 1, 0, dt], + [0, 0, 1, 0], + [0, 0, 0, 1]]) + # yapf: enable + Q = q * np.eye(4) + return LinearMotionNoInputModel(F=F, Q=Q) + + @staticmethod + def measurement_model(r: float = 0.75) -> LinearMeasurementModel: + # yapf: disable + H = np.array([[1, 0, 0, 0], + [0, 1, 0, 0]]) + # yapf: enable + R = r * np.eye(2) + return LinearMeasurementModel(H=H, R=R) diff --git a/app/tracker/bboxes_tracker.py b/app/tracker/bboxes_tracker.py new file mode 100644 index 0000000..e7618fe --- /dev/null +++ b/app/tracker/bboxes_tracker.py @@ -0,0 +1,554 @@ +from dataclasses import dataclass +from enum import Enum, auto +from typing import ( + Callable, + Generator, + Optional, + Tuple, + TypedDict, + Union, + cast, +) + +from loguru import logger +import numpy as np +from jaxtyping import Float, Int, Num, jaxtyped +from pydantic import BaseModel +from scipy.optimize import linear_sum_assignment +from typeguard import typechecked + +from app.typing import BoundingBoxFormat +from app.typing.constant import AREA_FILTER_THRESHOLD + + +class BoxTrackerConfig(BaseModel): + dt: float = 1.0 + cov_threshold: float = 4.0 + tentative_mahalanobis_threshold: float = 10.0 + confirm_mahalanobis_threshold: float = 10.0 + forming_tracks_euclidean_threshold: float = 25.0 + survival_steps_threshold: int = 3 + max_preserved_history_bounding_boxes: int = 10 + + @staticmethod + def default() -> "BoxTrackerConfig": + return BoxTrackerConfig( + dt=1.0, + cov_threshold=4.0, + tentative_mahalanobis_threshold=10.0, + confirm_mahalanobis_threshold=10.0, + forming_tracks_euclidean_threshold=25.0, + survival_steps_threshold=3, + max_preserved_history_bounding_boxes=10, + ) + + +from . import ( + CvModelGaussianState, + GaussianState, + LinearMeasurementModel, + LinearMotionNoInputModel, + NDArray, + PosterioriResult, + outer_distance, + predict, + update, +) + + +class TrackingState(Enum): + Tentative = auto() + Confirmed = auto() + + +class BoxTrackingDict(TypedDict): + id: int + bounding_box: NDArray + state_x: NDArray + state_P: NDArray + + +@dataclass +class BoxTracking: + id: int + state: CvModelGaussianState + survived_time_steps: int + missed_time_steps: int + last_n_bounding_boxes: Num[NDArray, "N 4"] + """ + History of bounding boxes in a sliding window, with the latest one at the end. + The window size is determined by the `max_preserved_history_bounding_boxes` parameter. + """ + + @property + def last_bounding_box(self) -> Num[NDArray, "4"]: + b = cast(NDArray, self.last_n_bounding_boxes[-1]) + assert b.shape == (4,) + return b + + def to_dict(self) -> BoxTrackingDict: + return { + "id": self.id, + "bounding_box": self.last_bounding_box, + "state_x": self.state.x, + "state_P": self.state.P, + } + + +@dataclass +class CreateTrackingEvent: + group: TrackingState + id: int + tracking: BoxTracking + + +@dataclass +class RemoveTrackingEvent: + group: TrackingState + id: int + tracking: BoxTracking + + +@dataclass +class MatchedTrackingEvent: + group: TrackingState + id: int + matched_bounding_box: Num[NDArray, "4"] + + +TrackingEvent = Union[CreateTrackingEvent, RemoveTrackingEvent, MatchedTrackingEvent] + + +def bounding_boxes_to_center( + bounding_boxes: Num[NDArray, "N 4"], format: BoundingBoxFormat +) -> Num[NDArray, "N 2"]: + if format == "xyxy": + return (bounding_boxes[:, :2] + bounding_boxes[:, 2:]) / 2 + if format == "xywh": + return bounding_boxes[:, :2] + (bounding_boxes[:, 2:] / 2) + raise ValueError(f"Unsupported bounding box format: {format}") + + +def bounding_box_to_center( + bounding_box: Num[NDArray, "4"], format: BoundingBoxFormat +) -> Num[NDArray, "2"]: + if format == "xyxy": + return (bounding_box[:2] + bounding_box[2:]) / 2 + if format == "xywh": + return bounding_box[:2] + (bounding_box[2:] / 2) + raise ValueError(f"Unsupported bounding box format: {format}") + + +def bounding_boxes_area( + bounding_boxes: Num[NDArray, "N 4"], format: BoundingBoxFormat +) -> Num[NDArray, "N"]: + if format == "xyxy": + return (bounding_boxes[:, 2] - bounding_boxes[:, 0]) * ( + bounding_boxes[:, 3] - bounding_boxes[:, 1] + ) + if format == "xywh": + return bounding_boxes[:, 2] * bounding_boxes[:, 3] + raise ValueError(f"Unsupported bounding box format: {format}") + + +class BoxTracker: + """ + A simple GNN tracker, but for tracking targets with bounding boxes + + TODO: use score to help data association + """ + + _last_measurements: NDArray = np.empty((0, 2), dtype=np.float32) + _tentative_tracks: list[BoxTracking] = [] + _confirmed_tracks: list[BoxTracking] = [] + _last_id: int = 0 + _params: BoxTrackerConfig + _bounding_boxes_format: BoundingBoxFormat + + def __init__( + self, + params: BoxTrackerConfig, + bounding_boxes_format: BoundingBoxFormat, + ): + self._last_measurements = np.empty((0, 2), dtype=np.float32) + self._tentative_tracks = [] + self._confirmed_tracks = [] + self._last_id = 0 + self._params = params + self._bounding_boxes_format = bounding_boxes_format + + def reset(self): + self._last_id = 0 + self._last_measurements = np.empty((0, 2), dtype=np.float32) + self._tentative_tracks = [] + self._confirmed_tracks = [] + + def _push_new_bounding_box( + self, old_bbs: Num[NDArray, "N 4"], new_bb: Num[NDArray, "4"] + ) -> Num[NDArray, "N 4"]: + bbs = np.append(old_bbs, np.expand_dims(new_bb, axis=0), axis=0) + if bbs.shape[0] > self._params.max_preserved_history_bounding_boxes: + bbs = bbs[-self._params.max_preserved_history_bounding_boxes :] + return bbs + + def _predict(self, tracks: list[BoxTracking], dt: float = 1.0): + def _predict_one(track: BoxTracking): + new_st = predict(track.state, BoxTracker.motion_model(dt=dt)) + o_cx, o_cy = bounding_box_to_center( + track.last_bounding_box, self._bounding_boxes_format + ) + n_cx, n_cy, _v_x, _v_y = new_st.x + + delta_x, delta_y = n_cx - o_cx, n_cy - o_cy + if self._bounding_boxes_format == "xyxy": + x_0, y_0, x_1, y_1 = track.last_bounding_box + new_bb = np.array( + [x_0 + delta_x, y_0 + delta_y, x_1 + delta_x, y_1 + delta_y] + ) + elif self._bounding_boxes_format == "xywh": + x_0, y_0, w, h = track.last_bounding_box + new_bb = np.array([x_0 + delta_x - w / 2, y_0 + delta_y - h / 2, w, h]) + else: + raise ValueError( + f"Unsupported bounding box format: {self._bounding_boxes_format}" + ) + new_bbs = self._push_new_bounding_box(track.last_n_bounding_boxes, new_bb) + return BoxTracking( + id=track.id, + state=CvModelGaussianState.from_gaussian(new_st), + survived_time_steps=track.survived_time_steps, + missed_time_steps=track.missed_time_steps, + last_n_bounding_boxes=new_bbs, + ) + + return [_predict_one(track) for track in tracks] + + @jaxtyped(typechecker=typechecked) + def _data_associate_and_update( + self, + select_array: TrackingState, + measurements: Num[NDArray, "N 2"], + bounding_boxes: Num[NDArray, "N 4"], + ) -> Tuple[list[MatchedTrackingEvent], Num[NDArray, "M 2"], Num[NDArray, "M 4"]]: + """ + Match tracks with measurements and update the tracks + + Parameters + ---------- + [in] measurements: Float["a 2"] + [in,out] tracks: Tracking["b"] the tracking list (tentative or confirmed) to be updated (mutated in place) + + Returns + ---------- + return + Float["... 2"] the unmatched measurements + + Effect + ---------- + find the best match by minimum Mahalanobis distance, please note that I assume the state has been predicted + """ + evs: list[MatchedTrackingEvent] = [] + assert measurements.ndim == 2 + assert measurements.shape[1] == 2 + + assert bounding_boxes.ndim == 2 + assert bounding_boxes.shape[1] == 4 + + assert bounding_boxes.shape[0] == measurements.shape[0] + + if select_array == TrackingState.Tentative: + tracks = self._tentative_tracks + distance_threshold = self._params.tentative_mahalanobis_threshold + elif select_array == TrackingState.Confirmed: + tracks = self._confirmed_tracks + distance_threshold = self._params.confirm_mahalanobis_threshold + else: + raise ValueError("Unexpected tracking state {}".format(select_array)) + + if len(tracks) == 0: + return evs, measurements, bounding_boxes + + def _update(measurement: NDArray, tracking: BoxTracking): + return update(measurement, tracking.state, BoxTracker.measurement_model()) + + def outer_posteriori( + measurements: NDArray, tracks: list[BoxTracking] + ) -> list[list[PosterioriResult]]: + """ + calculate the outer posteriori for each measurement and track + + Parameters + ---------- + [in] measurements: Float["a 2"] + [in] tracks: Tracking["b"] + + Returns + ---------- + PosterioriResult["a b"] + """ + return [ + [_update(measurement, tracking) for measurement in measurements] + for tracking in tracks + ] + + def posteriori_to_mahalanobis( + posteriori: list[list[PosterioriResult]], + ) -> NDArray: + """ + Parameters + ---------- + [in] posteriori: PosterioriResult["a b"] + + Returns + ---------- + Float["a b"] + """ + return np.array( + [[r_m.mahalanobis_distance for r_m in p_t] for p_t in posteriori], + dtype=np.float32, + ) + + posteriors = outer_posteriori(measurements, tracks) + distances = posteriori_to_mahalanobis(posteriors) + row, col = linear_sum_assignment(np.array(distances)) + row = np.array(row) + col = np.array(col) + + def to_be_deleted() -> Generator[Tuple[int, int], None, None]: + for i, j in zip(row, col): + post: PosterioriResult = posteriors[i][j] + if post.mahalanobis_distance > distance_threshold: + yield i, j + + for i, j in to_be_deleted(): + row = row[row != i] + col = col[col != j] + + # update matched tracks + for i, j in zip(row, col): + track = cast(BoxTracking, tracks[i]) + post: PosterioriResult = posteriors[i][j] + track.state = CvModelGaussianState.from_gaussian(post.state) + track.survived_time_steps += 1 + track.last_n_bounding_boxes = self._push_new_bounding_box( + track.last_n_bounding_boxes, bounding_boxes[j] + ) + tracks[i] = track + evs.append( + MatchedTrackingEvent( + group=select_array, + id=track.id, + matched_bounding_box=bounding_boxes[j], + ) + ) + + # missed tracks + # note that it just for statistical purpose + # the tracking should be removed by the covariance threshold + for i, track in enumerate(tracks): + if i not in row: + track.missed_time_steps += 1 + tracks[i] = track + + # remove measurements that have been matched + left_measurements = np.delete(measurements, col, axis=0) + left_bounding_boxes = np.delete(bounding_boxes, col, axis=0) + return evs, left_measurements, left_bounding_boxes + + @jaxtyped(typechecker=typechecked) + def _tracks_from_past_measurements( + self, + measurements: Num[NDArray, "N 2"], + bounding_boxes: Num[NDArray, "N 4"], + dt: float = 1.0, + distance_threshold: float = 3.0, + ): + """ + consume the last measurements and create tentative tracks from them + + Note + ---- + mutate self._tentative_tracks and self._last_measurements + """ + evs: list[CreateTrackingEvent] = [] + if self._last_measurements.shape[0] == 0: + self._last_measurements = measurements + return evs + distances = outer_distance(self._last_measurements, measurements) + row, col = linear_sum_assignment(distances) + row = np.array(row) + col = np.array(col) + + def to_be_deleted() -> Generator[Tuple[int, int], None, None]: + for i, j in zip(row, col): + euclidean_distance = distances[i, j] + if euclidean_distance > distance_threshold: + yield i, j + + for i, j in to_be_deleted(): + row = row[row != i] + col = col[col != j] + + for i, j in zip(row, col): + coord = measurements[j] + vel = (coord - self._last_measurements[i]) / dt + s = np.concatenate([coord, vel]) + state = GaussianState(x=s, P=np.eye(4)) + track = BoxTracking( + id=self._last_id, + state=CvModelGaussianState.from_gaussian(state), + survived_time_steps=0, + missed_time_steps=0, + last_n_bounding_boxes=np.expand_dims(bounding_boxes[j], axis=0), + ) + self._last_id += 1 + self._tentative_tracks.append(track) + evs.append( + CreateTrackingEvent( + group=TrackingState.Tentative, id=track.id, tracking=track + ) + ) + # update the last measurements with the unmatched measurements + self._last_measurements = np.delete(measurements, col, axis=0) + return evs + + def _transfer_tentative_to_confirmed(self, survival_steps_threshold: int = 3): + """ + transfer tentative tracks to confirmed tracks + + Note + ---- + mutate self._tentative_tracks and self._confirmed_tracks in place + """ + evs: list[CreateTrackingEvent] = [] + for i, track in enumerate(self._tentative_tracks): + if track.survived_time_steps > survival_steps_threshold: + self._confirmed_tracks.append(track) + self._tentative_tracks.pop(i) + evs.append( + CreateTrackingEvent( + group=TrackingState.Confirmed, id=track.id, tracking=track + ) + ) + return evs + + def _track_cov_deleter( + self, track_to_use: TrackingState, cov_threshold: float = 4.0 + ): + """ + delete tracks with covariance trace greater than threshold + + Parameters + ---------- + [in,out] tracks: list[BoxTracking] + cov_threshold: float + the threshold of the covariance trace + + Returns + ---------- + list[BoxTracking] + the deleted tracks + + Note + ---- + mutate tracks in place + """ + if track_to_use == TrackingState.Tentative: + tracks = self._tentative_tracks + elif track_to_use == TrackingState.Confirmed: + tracks = self._confirmed_tracks + else: + raise ValueError("Unexpected tracking state {}".format(track_to_use)) + ret: list[RemoveTrackingEvent] = [] + for i, track in enumerate(tracks): + # https://numpy.org/doc/stable/reference/generated/numpy.trace.html + if np.trace(track.state.P) > cov_threshold: + tracks.pop(i) + ret.append( + RemoveTrackingEvent(group=track_to_use, id=track.id, tracking=track) + ) + return ret + + def next_measurements( + self, + bounding_boxes: Num[NDArray, "N 4"], + ): + evs: list[TrackingEvent] + areas = bounding_boxes_area(bounding_boxes, self._bounding_boxes_format) + # 10 x 10 is too small for a normal bounding box + # filter out + # TODO: use area as gating threshold + if any(areas <= AREA_FILTER_THRESHOLD): + logger.trace( + "too small bounding boxes; bboxes={}; areas={}", + bounding_boxes, + areas, + ) + bounding_boxes = np.delete( + bounding_boxes, np.where(areas <= AREA_FILTER_THRESHOLD), axis=0 + ) + + measurements = bounding_boxes_to_center( + bounding_boxes, self._bounding_boxes_format + ) + self._confirmed_tracks = self._predict(self._confirmed_tracks, self._params.dt) + self._tentative_tracks = self._predict(self._tentative_tracks, self._params.dt) + c_evs, c_left_m, c_left_bb = ( + self._data_associate_and_update( # pylint: disable=E1102 + TrackingState.Confirmed, measurements, bounding_boxes + ) + ) + t_evs, t_left_m, t_left_bb = ( + self._data_associate_and_update( # pylint: disable=E1102 + TrackingState.Tentative, c_left_m, c_left_bb + ) + ) + create_c_evs = self._transfer_tentative_to_confirmed( + self._params.survival_steps_threshold + ) + # target initialize + create_t_evs = self._tracks_from_past_measurements( # pylint: disable=E1102 + t_left_m, + t_left_bb, + self._params.dt, + self._params.forming_tracks_euclidean_threshold, + ) + del_t_evs = self._track_cov_deleter( + TrackingState.Tentative, self._params.cov_threshold + ) + del_c_evs = self._track_cov_deleter( + TrackingState.Confirmed, self._params.cov_threshold + ) + evs = c_evs + t_evs + create_c_evs + create_t_evs + del_t_evs + del_c_evs + return evs + + @property + def confirmed_trackings(self): + return self._confirmed_tracks + + @property + def bounding_box_format(self): + return self._bounding_boxes_format + + @staticmethod + def motion_model(dt: float = 1, q: float = 0.05) -> LinearMotionNoInputModel: + """ + a constant velocity motion model + """ + # yapf: disable + F = np.array([[1, 0, dt, 0], + [0, 1, 0, dt], + [0, 0, 1, 0], + [0, 0, 0, 1]]) + # yapf: enable + Q = q * np.eye(4) + return LinearMotionNoInputModel(F=F, Q=Q) + + @staticmethod + def measurement_model(r: float = 0.75) -> LinearMeasurementModel: + # yapf: disable + H = np.array([[1, 0, 0, 0], + [0, 1, 0, 0]]) + # yapf: enable + R = r * np.eye(2) + return LinearMeasurementModel(H=H, R=R) diff --git a/app/tracker/single_object_tracker.py b/app/tracker/single_object_tracker.py new file mode 100644 index 0000000..94561c7 --- /dev/null +++ b/app/tracker/single_object_tracker.py @@ -0,0 +1,164 @@ +from enum import Enum +from typing import Callable, Optional, Tuple + +import numpy as np +from jaxtyping import Num, jaxtyped +from loguru import logger +from typeguard import typechecked + +from app.typing import BoundingBoxFormat, NDArray + +from .bboxes_tracker import BoxTracker, BoxTrackerConfig, BoxTracking + + +@jaxtyped(typechecker=typechecked) +def bounding_box_area(tracker: Num[NDArray, "N 4"], format: BoundingBoxFormat) -> float: + if format == "xyxy": + return float( + np.mean((tracker[:, 2] - tracker[:, 0]) * (tracker[:, 3] - tracker[:, 1])) + ) + elif format == "xywh": + return float(np.mean(tracker[:, 2] * tracker[:, 3])) + else: + raise ValueError(f"Unknown bounding box format: {format}") + + +class TrackingIdType(Enum): + Overridden = "overridden" + Selected = "selected" + General = "general" + + +TrackingId = Tuple[int, TrackingIdType] + + +def find_suitable_tracking_id( + tracking: list[BoxTracking], format: BoundingBoxFormat +) -> Optional[int]: + if len(tracking) == 0: + return None + elif len(tracking) == 1: + return tracking[0].id + else: + i = np.argmax( + [ + bounding_box_area(tracker.last_n_bounding_boxes, format) + for tracker in tracking + ] + ) + return tracking[i].id + + +class SingleObjectTracker: + _tracker: BoxTracker + _overridden_tracking_id: Optional[int] = None + _selected_tracking_id: Optional[int] = None + _bounding_box_format: BoundingBoxFormat + _on_lost_tracking: Optional[Callable[[BoxTracking], None]] = None + """ + (tracking, tracking_id) -> None + """ + _on_tracking_acquired: Optional[Callable[[list[BoxTracking], TrackingId], None]] = ( + None + ) + """ + (trackings, tracking_id) -> None + """ + + def __init__( + self, + tracker_param: BoxTrackerConfig, + bounding_box_format: BoundingBoxFormat = "xyxy", + ): + self._selected_tracking_id = None + self._tracker = BoxTracker(tracker_param, bounding_box_format) + + def reset(self): + self._tracker.reset() + + @property + def confirmed(self): + return self._tracker.confirmed_trackings + + @property + def confirmed_trackings(self): + """ + alias of `confirmed` + """ + return self.confirmed + + def get_by_id( + self, tracking_id: int, trackings: list[BoxTracking] + ) -> Optional[BoxTracking]: + assert tracking_id is not None + try: + return next(filter(lambda x: x.id == tracking_id, trackings)) + except StopIteration: + return None + + def try_get_by_overridden_id(self) -> Optional[BoxTracking]: + """ + If successfully get the tracking, mutate self._selected_tracking_id. + Otherwise, set self._overridden_tracking_id to None. + """ + overridden_id = self._overridden_tracking_id + if overridden_id is None: + return None + sel: Optional[BoxTracking] = self.get_by_id(overridden_id, self.confirmed) + if sel is None: + self._overridden_tracking_id = None + logger.trace( + "Overridden tracking id {} not found in {}", + overridden_id, + self.confirmed, + ) + else: + if ( + self._selected_tracking_id is None + or self._selected_tracking_id != overridden_id + ): + self._selected_tracking_id = overridden_id + logger.info("Acquired tracking id {} by override", overridden_id) + if self._on_tracking_acquired is not None: + self._on_tracking_acquired( + self.confirmed, (overridden_id, TrackingIdType.Overridden) + ) + return sel + + def try_get_by_selected_id(self) -> Optional[BoxTracking]: + """ + If no selected tracking, find the one with `find_suitable_tracking_id`. + """ + selected_id = self._selected_tracking_id + if selected_id is None: + selected_id = find_suitable_tracking_id( + self.confirmed, self._bounding_box_format + ) + if selected_id is None: + return None + sel: Optional[BoxTracking] = self.get_by_id(selected_id, self.confirmed) + if sel is None: + self._selected_tracking_id = None + logger.warning( + "Selected tracking id {} not found in {}", selected_id, self.confirmed + ) + else: + if ( + self._selected_tracking_id is None + or self._selected_tracking_id != selected_id + ): + self._selected_tracking_id = selected_id + logger.info("Acquired tracking id {}", selected_id) + if self._on_tracking_acquired is not None: + self._on_tracking_acquired( + self.confirmed, (selected_id, TrackingIdType.Selected) + ) + return sel + + @property + def bounding_box_format(self) -> BoundingBoxFormat: + return self._tracker.bounding_box_format + + def next_measurements(self, boxes: Num[NDArray, "N 4"]): + self._tracker.next_measurements(boxes) + return self.confirmed diff --git a/app/typing/__init__.py b/app/typing/__init__.py new file mode 100644 index 0000000..251f3a2 --- /dev/null +++ b/app/typing/__init__.py @@ -0,0 +1,92 @@ +from typing import Any, Union +import numpy as np +from jaxtyping import Float, Int, Shaped, Num, jaxtyped +from typing import ( + Literal, + List, + Dict, + TypedDict, + Any, + cast, + Tuple, + Optional, + Sequence, + Deque, +) + +try: + from cv2.typing import MatLike +except ImportError: + MatLike = np.ndarray + +NDArray = np.ndarray + +BoundingBoxFormat = Literal["xyxy", "xywh"] + + +class DetectionResult(TypedDict): + """ + Detection result per frame + + N is the number of detected objects + """ + + boxes_num: Num[NDArray, "1"] + boxes: Num[NDArray, "N 4"] + scores: Num[NDArray, "N"] + reference_frame_size: Tuple[int, int] + """ + Height and width of reference frame. + + The bounding box coordinates are relative to this frame. + + If one resizes the reference frame, the bounding box and keypoint coordinates should be scaled accordingly. + """ + + +H36KeyPoints = Float[NDArray, "B F 17 2"] +PersonBasedKeypointLike = Float[NDArray, "B F N 2"] +BoundingBoxes = Float[NDArray, "B F 4"] + + +class KeyPointDetectionResult(TypedDict): + """ + keypoints, bounding boxes, and scores + """ + + skeleton_keypoints: Float[NDArray, "N 17 2"] + skeleton_keypoints_scores: Float[NDArray, "N 17 1"] + bboxes: Float[NDArray, "N 4"] + bboxes_scores: Optional[Float[NDArray, "N"]] + frame_number: int + """ + The frame number in the video sequence. + -1 when the frame number is not available. + """ + reference_frame_size: tuple[int, int] + """ + Height and Width of the reference frame. + + The bounding box coordinates and keypoint coordinates are relative to this frame. + + If one resizes the reference frame, the bounding box and keypoint coordinates should be scaled accordingly. + """ + + +KeyPointDetectionTimeSeries = Sequence[KeyPointDetectionResult] +""" +Each item contains keypoint detection result for all people detected in a single frame. + +Intervals between frames are not guaranteed to be consistent +""" + + +class MixKeypoints(TypedDict): + MixThreeDkeypoints: Num[NDArray, "... N 3"] + Channels: Any + Header: Any + + +class ReferenceFrameSize(TypedDict): + width: int + height: int diff --git a/app/typing/constant.py b/app/typing/constant.py new file mode 100644 index 0000000..0132982 --- /dev/null +++ b/app/typing/constant.py @@ -0,0 +1 @@ +AREA_FILTER_THRESHOLD = 2000 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..80eb91e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +jaxtyping==0.2.38 +loguru==0.5.3 +numpy==1.24.3 +pydantic==2.10.6 +scipy==1.10.1 +typeguard==2.13.3