89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Callable, cast
|
|
|
|
import numpy as np
|
|
import torch
|
|
from jaxtyping import Float, Int
|
|
|
|
from .base_model import BaseModel
|
|
from opengait.utils.common import list2var, np2var
|
|
|
|
|
|
class BaseModelBody(BaseModel):
|
|
"""Base model variant with a separate sequence-level body-prior input."""
|
|
|
|
def inputs_pretreament(
|
|
self,
|
|
inputs: tuple[list[np.ndarray], list[int], list[str], list[str], np.ndarray | None],
|
|
) -> Any:
|
|
seqs_batch, labs_batch, typs_batch, vies_batch, seqL_batch = inputs
|
|
seq_trfs = cast(
|
|
list[Callable[[Any], Any]],
|
|
self.trainer_trfs if self.training else self.evaluator_trfs,
|
|
)
|
|
if len(seqs_batch) != len(seq_trfs):
|
|
raise ValueError(
|
|
"The number of types of input data and transform should be same. "
|
|
f"But got {len(seqs_batch)} and {len(seq_trfs)}"
|
|
)
|
|
if len(seqs_batch) < 2:
|
|
raise ValueError("BaseModelBody expects one visual input and one body-prior input.")
|
|
|
|
requires_grad = bool(self.training)
|
|
visual_seqs = [
|
|
np2var(
|
|
np.asarray([trf(fra) for fra in seq]),
|
|
requires_grad=requires_grad,
|
|
).float()
|
|
for trf, seq in zip(seq_trfs[:-1], seqs_batch[:-1])
|
|
]
|
|
body_trf = seq_trfs[-1]
|
|
body_seq = np2var(
|
|
np.asarray([body_trf(fra) for fra in seqs_batch[-1]]),
|
|
requires_grad=requires_grad,
|
|
).float()
|
|
|
|
labs = list2var(labs_batch).long()
|
|
seqL = np2var(seqL_batch).int() if seqL_batch is not None else None
|
|
|
|
# Preserve a singleton modality axis so DRF can mirror the author stub's
|
|
# `squeeze(1)` behavior while still accepting the same sequence-level prior.
|
|
body_features = aggregate_body_features(body_seq, seqL).unsqueeze(1)
|
|
|
|
if seqL is not None:
|
|
seqL_sum = int(seqL.sum().data.cpu().numpy())
|
|
ipts = [_[:, :seqL_sum] for _ in visual_seqs]
|
|
else:
|
|
ipts = visual_seqs
|
|
return ipts, labs, typs_batch, vies_batch, seqL, body_features
|
|
|
|
|
|
def aggregate_body_features(
|
|
sequence_features: Float[torch.Tensor, "..."],
|
|
seqL: Int[torch.Tensor, "1 batch"] | None,
|
|
) -> Float[torch.Tensor, "batch pairs metrics"]:
|
|
"""Collapse a sampled body-prior sequence back to one vector per sequence."""
|
|
|
|
if seqL is None:
|
|
if sequence_features.ndim < 3:
|
|
raise ValueError(f"Expected body prior with >=3 dims, got shape {tuple(sequence_features.shape)}")
|
|
return sequence_features.mean(dim=1)
|
|
|
|
if sequence_features.ndim < 4:
|
|
raise ValueError(f"Expected packed body prior with >=4 dims, got shape {tuple(sequence_features.shape)}")
|
|
|
|
lengths = seqL[0].tolist()
|
|
flattened = sequence_features.squeeze(0)
|
|
aggregated: list[torch.Tensor] = []
|
|
start = 0
|
|
for length in lengths:
|
|
end = start + int(length)
|
|
aggregated.append(flattened[start:end].mean(dim=0))
|
|
start = end
|
|
return torch.stack(aggregated, dim=0)
|
|
|
|
|
|
# Match the symbol name used by the author-provided DRF stub.
|
|
BaseModel = BaseModelBody
|