from typing import ( Annotated, AsyncGenerator, Final, Generator, List, Literal, Optional, Tuple, TypeVar, TypedDict, Any, cast, ) from loguru import logger import numpy as np import plotly.graph_objects as go import streamlit as st import anyio from anyio.abc import TaskGroup, UDPSocket from anyio import create_memory_object_stream, create_udp_socket from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream from threading import Thread from time import sleep from pydantic import BaseModel, computed_field from datetime import datetime, timedelta import awkward as ak from awkward import Array as AwkwardArray, Record as AwkwardRecord from app.model import AlgoReport from app.utils import Instant from collections import deque from dataclasses import dataclass from aiomqtt import Client as MqttClient, Message as MqttMessage SELECT_BAND_ID = 17 MQTT_BROKER: Final[str] = "192.168.2.189" MQTT_BROKER_PORT: Final[int] = 1883 MAX_LENGTH = 600 TOPIC: Final[str] = "GwData" NDArray = np.ndarray T = TypeVar("T") class AppHistory(TypedDict): timescape: deque[datetime] hr_data: deque[float] hr_conf: deque[float] # in % # https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825 class AppState(TypedDict): worker_thread: Thread message_queue: MemoryObjectReceiveStream[bytes] task_group: TaskGroup history: AppHistory refresh_inst: Instant UDP_SERVER_HOST: Final[str] = "localhost" UDP_SERVER_PORT: Final[int] = 50_000 MAX_LENGTH = 600 NDArray = np.ndarray T = TypeVar("T") def unwrap(value: Optional[T]) -> T: if value is None: raise ValueError("Value is None") return value # /** # * @brief Structure of the Heart Rate Measurement characteristic # * # * @see https://www.bluetooth.com/specifications/gss/ # * @see section 3.116 Heart Rate Measurement of the document: GATT Specification Supplement. # */ # struct ble_hr_measurement_flag_t { # // LSB first # /* # * 0: uint8_t # * 1: uint16_t # */ # bool heart_rate_value_format : 1; # bool sensor_contact_detected : 1; # bool sensor_contact_supported : 1; # bool energy_expended_present : 1; # bool rr_interval_present : 1; # uint8_t reserved : 3; # }; def parse_ble_hr_measurement(data: bytes) -> Optional[int]: """ Parse BLE Heart Rate Measurement characteristic data according to Bluetooth specification. Args: data: Raw bytes from the heart rate measurement characteristic Returns: Heart rate value in BPM, or None if parsing fails """ if len(data) < 2: return None # First byte contains flags flags = data[0] # Bit 0: Heart Rate Value Format (0 = uint8, 1 = uint16) heart_rate_value_format = (flags & 0x01) != 0 try: if heart_rate_value_format: # 16-bit heart rate value (little endian) if len(data) < 3: return None hr_value = int.from_bytes(data[1:3], byteorder="little") else: # 8-bit heart rate value hr_value = data[1] return hr_value except (IndexError, ValueError): return None @st.cache_resource def resource(params: Any = None): set_ev = anyio.Event() tx, rx = create_memory_object_stream[bytes]() tg: Optional[TaskGroup] = None async def poll_task(): nonlocal set_ev nonlocal tg tg = anyio.create_task_group() set_ev.set() async with tg: async with await create_udp_socket( local_host=UDP_SERVER_HOST, local_port=UDP_SERVER_PORT, reuse_port=True ) as udp: async for packet, _ in udp: await tx.send(packet) tr = Thread(target=anyio.run, args=(poll_task,)) tr.start() while not set_ev.is_set(): sleep(0.001) logger.info("Poll task initialized") state: AppState = { "worker_thread": tr, "message_queue": rx, "task_group": unwrap(tg), "history": { "timescape": deque(maxlen=MAX_LENGTH), "hr_data": deque(maxlen=MAX_LENGTH), "hr_conf": deque(maxlen=MAX_LENGTH), }, "refresh_inst": Instant(), } logger.info("Resource created") return state def main(): state = resource() history = state["history"] def on_export(): file_name = f"history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet" logger.info(f"Exporting to {file_name}") rec = ak.Record(history) ak.to_parquet(rec, file_name) def on_clear(): nonlocal history logger.info("Clearing history") history["timescape"].clear() history["hr_data"].clear() history["hr_conf"].clear() # https://docs.streamlit.io/develop/api-reference/layout st.title("MAX-BAND Visualizer") with st.container(border=True): c1, c2 = st.columns(2) with c1: st.button( "Export", help="Export the current data to a parquet file", on_click=on_export, ) with c2: st.button( "Clear", help="Clear the current data", on_click=on_clear, ) placeholder = st.empty() md_placeholder = st.empty() while True: try: message = state["message_queue"].receive_nowait() except anyio.WouldBlock: continue hr_value = parse_ble_hr_measurement(message) if hr_value is None: logger.error("Failed to parse heart rate measurement data") continue with placeholder.container(): history["hr_data"].append(float(hr_value)) history["hr_conf"].append( 100.0 ) # Default confidence since we're not parsing it fig_hr, fig_pd = st.tabs(["Heart Rate", "PD"]) with fig_hr: st.plotly_chart( go.Figure( data=[ go.Scatter( y=list(history["hr_data"]), mode="lines", name="HR", ), go.Scatter( y=list(history["hr_conf"]), mode="lines", name="HR Confidence", ), ] ) ) if __name__ == "__main__": main()