Compare commits
3 Commits
1d1170f19c
...
3d53f00ec9
| Author | SHA1 | Date | |
|---|---|---|---|
| 3d53f00ec9 | |||
| 3a15bd655e | |||
| 65cb8c201d |
337
main.py
337
main.py
@ -11,6 +11,8 @@ from typing import (
|
|||||||
TypedDict,
|
TypedDict,
|
||||||
Any,
|
Any,
|
||||||
cast,
|
cast,
|
||||||
|
Dict,
|
||||||
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@ -25,6 +27,7 @@ from threading import Thread
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
from pydantic import BaseModel, computed_field
|
from pydantic import BaseModel, computed_field
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
import struct
|
||||||
import awkward as ak
|
import awkward as ak
|
||||||
from awkward import Array as AwkwardArray, Record as AwkwardRecord
|
from awkward import Array as AwkwardArray, Record as AwkwardRecord
|
||||||
from app.model import AlgoReport
|
from app.model import AlgoReport
|
||||||
@ -32,35 +35,39 @@ from app.utils import Instant
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from aiomqtt import Client as MqttClient, Message as MqttMessage
|
from aiomqtt import Client as MqttClient, Message as MqttMessage
|
||||||
|
from app.proto.hr_packet import HrPacket, HrOnlyPacket, HrPpgPacket, HrConfidence
|
||||||
|
import betterproto
|
||||||
|
|
||||||
|
|
||||||
SELECT_BAND_ID = 17
|
MQTT_BROKER: Final[str] = "weihua-iot.cn"
|
||||||
|
|
||||||
|
|
||||||
MQTT_BROKER: Final[str] = "192.168.2.189"
|
|
||||||
MQTT_BROKER_PORT: Final[int] = 1883
|
MQTT_BROKER_PORT: Final[int] = 1883
|
||||||
MAX_LENGTH = 600
|
MAX_LENGTH = 600
|
||||||
TOPIC: Final[str] = "GwData"
|
TOPIC: Final[str] = "/hr/region/1/band/#"
|
||||||
NDArray = np.ndarray
|
UDP_DEVICE_ID: Final[int] = 0xFF
|
||||||
|
|
||||||
|
NDArray = np.ndarray
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
class AppHistory(TypedDict):
|
class DeviceHistory(TypedDict):
|
||||||
timescape: deque[datetime]
|
timescape: deque[datetime]
|
||||||
hr_data: deque[float]
|
hr_data: deque[float]
|
||||||
hr_conf: deque[float] # in %
|
|
||||||
|
|
||||||
|
|
||||||
# https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825
|
# https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825
|
||||||
class AppState(TypedDict):
|
class AppState(TypedDict):
|
||||||
worker_thread: Thread
|
worker_thread: Thread
|
||||||
message_queue: MemoryObjectReceiveStream[bytes]
|
message_queue: MemoryObjectReceiveStream[Tuple[datetime, bytes]]
|
||||||
|
mqtt_message_queue: MemoryObjectReceiveStream[Tuple[datetime, MqttMessage]]
|
||||||
task_group: TaskGroup
|
task_group: TaskGroup
|
||||||
history: AppHistory
|
device_histories: Dict[Union[int, str], DeviceHistory] # device_id -> DeviceHistory
|
||||||
refresh_inst: Instant
|
|
||||||
|
|
||||||
|
|
||||||
|
BUSY_POLLING_INTERVAL_S: Final[float] = 0.001
|
||||||
|
BATCH_MESSAGE_INTERVAL_S: Final[float] = 0.1
|
||||||
|
NORMAL_REFRESH_INTERVAL_S: Final[float] = 0.5
|
||||||
|
QUEUE_BUFFER_SIZE: Final[int] = 32
|
||||||
|
|
||||||
UDP_SERVER_HOST: Final[str] = "localhost"
|
UDP_SERVER_HOST: Final[str] = "localhost"
|
||||||
UDP_SERVER_PORT: Final[int] = 50_000
|
UDP_SERVER_PORT: Final[int] = 50_000
|
||||||
MAX_LENGTH = 600
|
MAX_LENGTH = 600
|
||||||
@ -75,28 +82,6 @@ def unwrap(value: Optional[T]) -> T:
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
# /**
|
|
||||||
# * @brief Structure of the Heart Rate Measurement characteristic
|
|
||||||
# *
|
|
||||||
# * @see https://www.bluetooth.com/specifications/gss/
|
|
||||||
# * @see section 3.116 Heart Rate Measurement of the document: GATT Specification Supplement.
|
|
||||||
# */
|
|
||||||
# struct ble_hr_measurement_flag_t {
|
|
||||||
# // LSB first
|
|
||||||
|
|
||||||
# /*
|
|
||||||
# * 0: uint8_t
|
|
||||||
# * 1: uint16_t
|
|
||||||
# */
|
|
||||||
# bool heart_rate_value_format : 1;
|
|
||||||
# bool sensor_contact_detected : 1;
|
|
||||||
# bool sensor_contact_supported : 1;
|
|
||||||
# bool energy_expended_present : 1;
|
|
||||||
# bool rr_interval_present : 1;
|
|
||||||
# uint8_t reserved : 3;
|
|
||||||
# };
|
|
||||||
|
|
||||||
|
|
||||||
def parse_ble_hr_measurement(data: bytes) -> Optional[int]:
|
def parse_ble_hr_measurement(data: bytes) -> Optional[int]:
|
||||||
"""
|
"""
|
||||||
Parse BLE Heart Rate Measurement characteristic data according to Bluetooth specification.
|
Parse BLE Heart Rate Measurement characteristic data according to Bluetooth specification.
|
||||||
@ -106,6 +91,29 @@ def parse_ble_hr_measurement(data: bytes) -> Optional[int]:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Heart rate value in BPM, or None if parsing fails
|
Heart rate value in BPM, or None if parsing fails
|
||||||
|
Note:
|
||||||
|
```cpp
|
||||||
|
/**
|
||||||
|
* @brief Structure of the Heart Rate Measurement characteristic
|
||||||
|
*
|
||||||
|
* @see https://www.bluetooth.com/specifications/gss/
|
||||||
|
* @see section 3.116 Heart Rate Measurement of the document: GATT Specification Supplement.
|
||||||
|
*/
|
||||||
|
struct ble_hr_measurement_flag_t {
|
||||||
|
// LSB first
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 0: uint8_t
|
||||||
|
* 1: uint16_t
|
||||||
|
*/
|
||||||
|
bool heart_rate_value_format : 1;
|
||||||
|
bool sensor_contact_detected : 1;
|
||||||
|
bool sensor_contact_supported : 1;
|
||||||
|
bool energy_expended_present : 1;
|
||||||
|
bool rr_interval_present : 1;
|
||||||
|
uint8_t reserved : 3;
|
||||||
|
};
|
||||||
|
```
|
||||||
"""
|
"""
|
||||||
if len(data) < 2:
|
if len(data) < 2:
|
||||||
return None
|
return None
|
||||||
@ -131,39 +139,93 @@ def parse_ble_hr_measurement(data: bytes) -> Optional[int]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def hr_confidence_to_percentage(confidence: HrConfidence) -> float:
|
||||||
|
"""Convert HrConfidence enum to percentage value"""
|
||||||
|
if confidence == HrConfidence.ZERO:
|
||||||
|
return 0 # mid-point of [0,25)
|
||||||
|
elif confidence == HrConfidence.LOW:
|
||||||
|
return 37.5 # mid-point of [25,50)
|
||||||
|
elif confidence == HrConfidence.MEDIUM:
|
||||||
|
return 62.5 # mid-point of [50,75)
|
||||||
|
elif confidence == HrConfidence.HIGH:
|
||||||
|
return 100 # mid-point of (75,100]
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Invalid HrConfidence: {confidence}")
|
||||||
|
|
||||||
|
|
||||||
|
def create_device_history() -> DeviceHistory:
|
||||||
|
"""Create a new device history structure"""
|
||||||
|
return {
|
||||||
|
"timescape": deque(maxlen=MAX_LENGTH),
|
||||||
|
"hr_data": deque(maxlen=MAX_LENGTH),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_device_name(device_id: Union[int, str]) -> str:
|
||||||
|
"""Get display name for device"""
|
||||||
|
if device_id == UDP_DEVICE_ID:
|
||||||
|
return "UDP Device"
|
||||||
|
return f"Device {device_id}"
|
||||||
|
|
||||||
|
|
||||||
@st.cache_resource
|
@st.cache_resource
|
||||||
def resource(params: Any = None):
|
def resource(params: Any = None):
|
||||||
set_ev = anyio.Event()
|
set_ev = anyio.Event()
|
||||||
tx, rx = create_memory_object_stream[bytes]()
|
tx, rx = create_memory_object_stream[Tuple[datetime, bytes]](
|
||||||
|
max_buffer_size=QUEUE_BUFFER_SIZE
|
||||||
|
)
|
||||||
|
mqtt_tx, mqtt_rx = create_memory_object_stream[Tuple[datetime, MqttMessage]](
|
||||||
|
max_buffer_size=QUEUE_BUFFER_SIZE
|
||||||
|
)
|
||||||
tg: Optional[TaskGroup] = None
|
tg: Optional[TaskGroup] = None
|
||||||
|
|
||||||
async def poll_task():
|
async def udp_task():
|
||||||
nonlocal set_ev
|
async with await create_udp_socket(
|
||||||
nonlocal tg
|
local_host=UDP_SERVER_HOST, local_port=UDP_SERVER_PORT, reuse_port=True
|
||||||
|
) as udp:
|
||||||
|
logger.info(
|
||||||
|
"UDP server listening on {}:{}", UDP_SERVER_HOST, UDP_SERVER_PORT
|
||||||
|
)
|
||||||
|
async for packet, _ in udp:
|
||||||
|
timestamp = datetime.now()
|
||||||
|
await tx.send((timestamp, packet))
|
||||||
|
|
||||||
|
async def mqtt_task():
|
||||||
|
async with MqttClient(MQTT_BROKER, port=MQTT_BROKER_PORT) as client:
|
||||||
|
await client.subscribe(TOPIC)
|
||||||
|
logger.info(
|
||||||
|
"Subscribed to MQTT broker {}:{} topic {}",
|
||||||
|
MQTT_BROKER,
|
||||||
|
MQTT_BROKER_PORT,
|
||||||
|
TOPIC,
|
||||||
|
)
|
||||||
|
async for message in client.messages:
|
||||||
|
timestamp = datetime.now()
|
||||||
|
await mqtt_tx.send((timestamp, message))
|
||||||
|
|
||||||
|
async def combined_task():
|
||||||
|
nonlocal set_ev, tg
|
||||||
tg = anyio.create_task_group()
|
tg = anyio.create_task_group()
|
||||||
set_ev.set()
|
set_ev.set()
|
||||||
async with tg:
|
async with tg:
|
||||||
async with await create_udp_socket(
|
async with anyio.create_task_group() as inner_tg:
|
||||||
local_host=UDP_SERVER_HOST, local_port=UDP_SERVER_PORT, reuse_port=True
|
inner_tg.start_soon(udp_task)
|
||||||
) as udp:
|
inner_tg.start_soon(mqtt_task)
|
||||||
async for packet, _ in udp:
|
|
||||||
await tx.send(packet)
|
|
||||||
|
|
||||||
tr = Thread(target=anyio.run, args=(poll_task,))
|
tr = Thread(target=anyio.run, args=(combined_task,))
|
||||||
tr.start()
|
tr.start()
|
||||||
|
|
||||||
while not set_ev.is_set():
|
while not set_ev.is_set():
|
||||||
sleep(0.001)
|
sleep(BUSY_POLLING_INTERVAL_S)
|
||||||
logger.info("Poll task initialized")
|
|
||||||
|
logger.info("UDP and MQTT tasks initialized in single thread")
|
||||||
|
|
||||||
state: AppState = {
|
state: AppState = {
|
||||||
"worker_thread": tr,
|
"worker_thread": tr,
|
||||||
"message_queue": rx,
|
"message_queue": rx,
|
||||||
|
"mqtt_message_queue": mqtt_rx,
|
||||||
"task_group": unwrap(tg),
|
"task_group": unwrap(tg),
|
||||||
"history": {
|
"device_histories": {},
|
||||||
"timescape": deque(maxlen=MAX_LENGTH),
|
|
||||||
"hr_data": deque(maxlen=MAX_LENGTH),
|
|
||||||
"hr_conf": deque(maxlen=MAX_LENGTH),
|
|
||||||
},
|
|
||||||
"refresh_inst": Instant(),
|
|
||||||
}
|
}
|
||||||
logger.info("Resource created")
|
logger.info("Resource created")
|
||||||
return state
|
return state
|
||||||
@ -171,23 +233,27 @@ def resource(params: Any = None):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
state = resource()
|
state = resource()
|
||||||
history = state["history"]
|
device_histories = state["device_histories"]
|
||||||
|
|
||||||
def on_export():
|
def on_export():
|
||||||
file_name = f"history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
|
file_name = f"history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
|
||||||
logger.info(f"Exporting to {file_name}")
|
logger.info(f"Exporting to {file_name}")
|
||||||
rec = ak.Record(history)
|
|
||||||
|
# Export all device histories
|
||||||
|
export_data = {
|
||||||
|
device_id: ak.Record(dev_hist)
|
||||||
|
for device_id, dev_hist in device_histories.items()
|
||||||
|
}
|
||||||
|
rec = ak.Record(export_data)
|
||||||
ak.to_parquet(rec, file_name)
|
ak.to_parquet(rec, file_name)
|
||||||
|
|
||||||
def on_clear():
|
def on_clear():
|
||||||
nonlocal history
|
nonlocal device_histories
|
||||||
logger.info("Clearing history")
|
logger.info("Clearing history")
|
||||||
history["timescape"].clear()
|
device_histories.clear()
|
||||||
history["hr_data"].clear()
|
|
||||||
history["hr_conf"].clear()
|
|
||||||
|
|
||||||
# https://docs.streamlit.io/develop/api-reference/layout
|
# https://docs.streamlit.io/develop/api-reference/layout
|
||||||
st.title("MAX-BAND Visualizer")
|
st.title("HR Visualizer")
|
||||||
with st.container(border=True):
|
with st.container(border=True):
|
||||||
c1, c2 = st.columns(2)
|
c1, c2 = st.columns(2)
|
||||||
with c1:
|
with c1:
|
||||||
@ -203,43 +269,132 @@ def main():
|
|||||||
on_click=on_clear,
|
on_click=on_clear,
|
||||||
)
|
)
|
||||||
|
|
||||||
placeholder = st.empty()
|
# Device selection
|
||||||
md_placeholder = st.empty()
|
if device_histories:
|
||||||
|
selected_devices = st.multiselect(
|
||||||
|
"Select devices to display:",
|
||||||
|
options=list(device_histories.keys()),
|
||||||
|
default=list(device_histories.keys()),
|
||||||
|
format_func=get_device_name,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
selected_devices = []
|
||||||
|
|
||||||
while True:
|
# Process UDP messages (treat as device_id = 0)
|
||||||
try:
|
try:
|
||||||
message = state["message_queue"].receive_nowait()
|
while True:
|
||||||
except anyio.WouldBlock:
|
timestamp, packet = state["message_queue"].receive_nowait()
|
||||||
continue
|
hr_value = parse_ble_hr_measurement(packet)
|
||||||
hr_value = parse_ble_hr_measurement(message)
|
if hr_value is not None:
|
||||||
if hr_value is None:
|
now = timestamp
|
||||||
logger.error("Failed to parse heart rate measurement data")
|
|
||||||
continue
|
|
||||||
|
|
||||||
with placeholder.container():
|
if UDP_DEVICE_ID not in device_histories:
|
||||||
history["hr_data"].append(float(hr_value))
|
device_histories[UDP_DEVICE_ID] = create_device_history()
|
||||||
history["hr_conf"].append(
|
|
||||||
100.0
|
|
||||||
) # Default confidence since we're not parsing it
|
|
||||||
fig_hr, fig_pd = st.tabs(["Heart Rate", "PD"])
|
|
||||||
|
|
||||||
with fig_hr:
|
dev_hist = device_histories[UDP_DEVICE_ID]
|
||||||
st.plotly_chart(
|
dev_hist["timescape"].append(now)
|
||||||
go.Figure(
|
dev_hist["hr_data"].append(float(hr_value))
|
||||||
data=[
|
|
||||||
go.Scatter(
|
logger.debug("UDP Device: HR={}", hr_value)
|
||||||
y=list(history["hr_data"]),
|
except anyio.WouldBlock:
|
||||||
mode="lines",
|
pass
|
||||||
name="HR",
|
|
||||||
),
|
# Process MQTT messages
|
||||||
go.Scatter(
|
try:
|
||||||
y=list(history["hr_conf"]),
|
while True:
|
||||||
mode="lines",
|
timestamp, mqtt_message = state["mqtt_message_queue"].receive_nowait()
|
||||||
name="HR Confidence",
|
if mqtt_message.payload:
|
||||||
),
|
try:
|
||||||
]
|
payload_bytes = mqtt_message.payload
|
||||||
|
if isinstance(payload_bytes, str):
|
||||||
|
payload_bytes = payload_bytes.encode("utf-8")
|
||||||
|
elif not isinstance(payload_bytes, bytes):
|
||||||
|
continue
|
||||||
|
|
||||||
|
hr_packet = HrPacket()
|
||||||
|
hr_packet.parse(payload_bytes)
|
||||||
|
|
||||||
|
now = timestamp
|
||||||
|
|
||||||
|
device_id = None
|
||||||
|
hr_value = None
|
||||||
|
|
||||||
|
if hr_packet.hr_only_packet:
|
||||||
|
packet = hr_packet.hr_only_packet
|
||||||
|
device_id = packet.id
|
||||||
|
hr_value = packet.hr
|
||||||
|
elif hr_packet.hr_ppg_packet:
|
||||||
|
packet = hr_packet.hr_ppg_packet
|
||||||
|
device_id = packet.id
|
||||||
|
hr_value = packet.hr
|
||||||
|
|
||||||
|
if device_id is not None and hr_value is not None:
|
||||||
|
if device_id not in device_histories:
|
||||||
|
device_histories[device_id] = create_device_history()
|
||||||
|
|
||||||
|
dev_hist = device_histories[device_id]
|
||||||
|
dev_hist["timescape"].append(now)
|
||||||
|
dev_hist["hr_data"].append(float(hr_value))
|
||||||
|
|
||||||
|
logger.debug("Device {}: HR={}", device_id, hr_value)
|
||||||
|
except (ValueError, TypeError, IndexError, UnicodeDecodeError) as e:
|
||||||
|
logger.error("MQTT protobuf message parsing: {}", e)
|
||||||
|
except anyio.WouldBlock:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Update visualization - HR Graphs
|
||||||
|
if device_histories:
|
||||||
|
st.subheader("Heart Rate Data")
|
||||||
|
|
||||||
|
# Create plots for selected devices
|
||||||
|
traces = []
|
||||||
|
colors = [
|
||||||
|
"red",
|
||||||
|
"green",
|
||||||
|
"blue",
|
||||||
|
"orange",
|
||||||
|
"purple",
|
||||||
|
"brown",
|
||||||
|
"pink",
|
||||||
|
"gray",
|
||||||
|
"olive",
|
||||||
|
"cyan",
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, device_id in enumerate(selected_devices):
|
||||||
|
if device_id in device_histories:
|
||||||
|
dev_hist = device_histories[device_id]
|
||||||
|
if dev_hist["hr_data"] and dev_hist["timescape"]:
|
||||||
|
color = colors[i % len(colors)]
|
||||||
|
traces.append(
|
||||||
|
go.Scatter(
|
||||||
|
x=list(dev_hist["timescape"]),
|
||||||
|
y=list(dev_hist["hr_data"]),
|
||||||
|
mode="lines+markers",
|
||||||
|
name=get_device_name(device_id),
|
||||||
|
line=dict(color=color),
|
||||||
|
marker=dict(size=4),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
if traces:
|
||||||
|
fig = go.Figure(data=traces)
|
||||||
|
fig.update_layout(
|
||||||
|
title="Heart Rate Monitor",
|
||||||
|
xaxis_title="Time",
|
||||||
|
yaxis_title="Heart Rate (BPM)",
|
||||||
|
hovermode="x unified",
|
||||||
|
showlegend=True,
|
||||||
|
height=500,
|
||||||
|
)
|
||||||
|
st.plotly_chart(fig, use_container_width=True)
|
||||||
|
else:
|
||||||
|
st.info("No heart rate data available for selected devices")
|
||||||
|
else:
|
||||||
|
st.info("No devices connected yet. Waiting for data...")
|
||||||
|
|
||||||
|
sleep(NORMAL_REFRESH_INTERVAL_S)
|
||||||
|
st.rerun()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user