From 6407307976050754c2a474de0bf6fa43319355d8 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Tue, 11 Feb 2025 15:56:04 +0800 Subject: [PATCH] feat: Add PD data visualization and Instant utility class --- app/utils/__init__.py | 70 +++++++++++++++++++++++++++++++++++++++++++ main.py | 48 +++++++++++++++++++++-------- 2 files changed, 105 insertions(+), 13 deletions(-) create mode 100644 app/utils/__init__.py diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..ef7ef87 --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1,70 @@ +import time +from datetime import timedelta + + +class Instant: + """A measurement of a monotonically nondecreasing clock.""" + + _time: float + + @staticmethod + def clock() -> float: + """Get current clock time in microseconds.""" + return time.monotonic() + + def __init__(self): + """Initialize with current clock time.""" + self._time = self.clock() + + @classmethod + def now(cls) -> "Instant": + """Create new Instant with current time.""" + return cls() + + def elapsed(self) -> timedelta: + """Get elapsed time as timedelta.""" + now = self.clock() + diff = now - self._time + return timedelta(seconds=diff) + + def elapsed_ms(self) -> int: + """Get elapsed time in milliseconds.""" + return int(self.elapsed().total_seconds() * 1000) + + def has_elapsed_ms(self, ms: int) -> bool: + """Check if specified milliseconds have elapsed.""" + return self.elapsed_ms() >= ms + + def mut_every_ms(self, ms: int) -> bool: + """Check if time has elapsed and reset if true.""" + if self.elapsed_ms() >= ms: + self.mut_reset() + return True + return False + + def has_elapsed(self, duration: timedelta) -> bool: + """Check if specified duration has elapsed.""" + return self.elapsed() >= duration + + def mut_every(self, duration: timedelta) -> bool: + """Check if duration has elapsed and reset if true.""" + if self.has_elapsed(duration): + self.mut_reset() + return True + return False + + def mut_reset(self) -> None: + """Reset the timer to current time.""" + self._time = self.clock() + + def mut_elapsed_and_reset(self) -> timedelta: + """Get elapsed time and reset timer.""" + now = self.clock() + diff = now - self._time + duration = timedelta(microseconds=diff) + self._time = now + return duration + + def count(self) -> float: + """Get the internal time counter value.""" + return self._time diff --git a/main.py b/main.py index 0efc8b8..5458820 100644 --- a/main.py +++ b/main.py @@ -24,11 +24,13 @@ from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStre from threading import Thread from time import sleep from pydantic import BaseModel, computed_field -from datetime import datetime +from datetime import datetime, timedelta import awkward as ak from awkward import Array as AwkwardArray, Record as AwkwardRecord from app.model import AlgoReport +from app.utils import Instant from collections import deque +from dataclasses import dataclass class AppHistory(TypedDict): @@ -38,6 +40,7 @@ class AppHistory(TypedDict): accel_x_data: deque[int] accel_y_data: deque[int] accel_z_data: deque[int] + pd_data: deque[int] # https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825 @@ -46,6 +49,7 @@ class AppState(TypedDict): message_queue: MemoryObjectReceiveStream[bytes] task_group: TaskGroup history: AppHistory + refresh_inst: Instant UDP_SERVER_HOST: Final[str] = "localhost" @@ -96,7 +100,9 @@ def resource(params: Any = None): "accel_x_data": deque(maxlen=MAX_LENGTH), "accel_y_data": deque(maxlen=MAX_LENGTH), "accel_z_data": deque(maxlen=MAX_LENGTH), + "pd_data": deque(maxlen=MAX_LENGTH), }, + "refresh_inst": Instant(), } logger.info("Resource created") return state @@ -140,32 +146,35 @@ def main(): ) placeholder = st.empty() + md_placeholder = st.empty() while True: try: message = state["message_queue"].receive_nowait() except anyio.WouldBlock: continue + report = AlgoReport.unmarshal(message) + if state["refresh_inst"].mut_every_ms(500): + md_placeholder.markdown( + f""" + - HR: {report.data.hr_f}bpm + - HR CONF: {report.data.hr_conf}% + - ACTIVITY: {report.data.activity_class.name} + - SCD: {report.data.scd_contact_state.name} + """ + ) with placeholder.container(): - report = AlgoReport.unmarshal(message) history["timescape"].append(datetime.now()) history["hr_data"].append(report.data.hr_f) history["hr_conf"].append(report.data.hr_conf) history["accel_x_data"].append(report.accel_x) history["accel_y_data"].append(report.accel_y) history["accel_z_data"].append(report.accel_z) - - # with st.container(): - # c1, c2 = st.columns(2) - # with c1: - # c1.write(f"HR: {report.data.hr_f}") - # with c2: - # c2.write(f"HR Confidence: {report.data.hr_conf}") - - fig_hr, fig_accel = st.tabs(["Heart Rate", "Accelerometer"]) + history["pd_data"].append(report.led_2) + fig_hr, fig_accel, fig_pd = st.tabs(["Heart Rate", "Accelerometer", "PD"]) with fig_hr: - fig_hr.plotly_chart( + st.plotly_chart( go.Figure( data=[ go.Scatter( @@ -184,7 +193,7 @@ def main(): ) ) with fig_accel: - fig_accel.plotly_chart( + st.plotly_chart( go.Figure( data=[ go.Scatter( @@ -208,6 +217,19 @@ def main(): ] ) ) + with fig_pd: + st.plotly_chart( + go.Figure( + data=[ + go.Scatter( + x=list(history["timescape"]), + y=list(history["pd_data"]), + mode="lines", + name="PD", + ) + ] + ) + ) if __name__ == "__main__":