refactor: Improve UDP server configuration and AlgoReport parsing

This commit is contained in:
2025-02-11 12:25:41 +08:00
parent 21e1598072
commit af26793eef
2 changed files with 19 additions and 30 deletions

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass
from enum import IntEnum
import struct
from typing import ClassVar, Tuple
from typing import ClassVar, Tuple, Final, LiteralString
from pydantic import BaseModel, Field, computed_field
@ -61,10 +61,10 @@ class AlgoModelData(BaseModel):
spo2_unreliable_r_flag: int # uint8
spo2_state: SPO2State
scd_contact_state: SCDState
reserved: int # uint32
# don't include reserved into the struct
# uint32
# Format string for struct.unpack
_FORMAT: ClassVar[str] = "<BHBHBBHBBBBBBBBBL" # < for little-endian
_FORMAT: ClassVar[LiteralString] = "<BHBHBBHBHBBBBBBBI"
@computed_field
def hr_f(self) -> float:
@ -87,7 +87,7 @@ class AlgoModelData(BaseModel):
return self.rr / 10.0
@classmethod
def from_bytes(cls, data: bytes) -> "AlgoModelData":
def unmarshal(cls, data: bytes) -> "AlgoModelData":
values = struct.unpack(cls._FORMAT, data)
return cls(
op_mode=values[0],
@ -106,7 +106,6 @@ class AlgoModelData(BaseModel):
spo2_unreliable_r_flag=values[13],
spo2_state=values[14],
scd_contact_state=values[15],
reserved=values[16],
)
@ -114,31 +113,18 @@ class AlgoReport(BaseModel):
led_1: int # uint32
led_2: int # uint32
led_3: int # uint32
accel_x: int # int16
accel_y: int # int16
accel_z: int # int16
accel_x: int # int16, in uint of g
accel_y: int # int16, in uint of g
accel_z: int # int16, in uint of g
data: AlgoModelData
@classmethod
def unmarshal(cls, buf: bytes) -> "AlgoReport":
if len(buf) < 24 + struct.calcsize(AlgoModelData._FORMAT):
raise ValueError("Buffer too small")
# Parse PPG values (3 bytes each, MSB first)
led_1 = int.from_bytes(buf[0:3], byteorder="little")
led_2 = int.from_bytes(buf[3:6], byteorder="little")
led_3 = int.from_bytes(buf[6:9], byteorder="little")
# Skip unused PPG values (bytes 9-17)
# Parse accelerometer values (2 bytes each, MSB first)
accel_x = int.from_bytes(buf[18:20], byteorder="little", signed=True)
accel_y = int.from_bytes(buf[20:22], byteorder="little", signed=True)
accel_z = int.from_bytes(buf[22:24], byteorder="little", signed=True)
# Parse algorithm data
algo_data = AlgoModelData.from_bytes(buf[24:])
FORMAT: Final[str] = "<IIIhhh"
led_1, led_2, led_3, accel_x, accel_y, accel_z = struct.unpack(
FORMAT, buf[: struct.calcsize(FORMAT)]
)
data = AlgoModelData.unmarshal(buf[struct.calcsize(FORMAT) :])
return cls(
led_1=led_1,
led_2=led_2,
@ -146,5 +132,5 @@ class AlgoReport(BaseModel):
accel_x=accel_x,
accel_y=accel_y,
accel_z=accel_z,
data=algo_data,
data=data,
)