8 Commits

8 changed files with 658 additions and 155 deletions

179
.gitignore vendored
View File

@ -1 +1,178 @@
*.parquet # Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# user
.DS_Store
*.parquet

View File

@ -1,4 +1,7 @@
{ {
"python.analysis.autoImportCompletions": true, "python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "standard" "python.analysis.typeCheckingMode": "standard",
"cSpell.words": [
"timescape"
]
} }

213
app/model/__init__.py Normal file
View File

@ -0,0 +1,213 @@
from dataclasses import dataclass
from enum import IntEnum
import struct
from typing import ClassVar, Tuple, Final, LiteralString
from pydantic import BaseModel, Field, computed_field
class AlgoOpMode(IntEnum):
"""Equivalent to max::ALGO_OP_MODE"""
CONTINUOUS_HRM_CONTINUOUS_SPO2 = 0x00 # Continuous HRM, continuous SpO2
CONTINUOUS_HRM_ONE_SHOT_SPO2 = 0x01 # Continuous HRM, one-shot SpO2
CONTINUOUS_HRM = 0x02 # Continuous HRM
SAMPLED_HRM = 0x03 # Sampled HRM
SAMPLED_HRM_ONE_SHOT_SPO2 = 0x04 # Sampled HRM, one-shot SpO2
ACTIVITY_TRACKING_ONLY = 0x05 # Activity tracking only
SPO2_CALIBRATION = 0x06 # SpO2 calibration
class ActivateClass(IntEnum):
"""Equivalent to max::ACTIVATE_CLASS"""
REST = 0
WALK = 1
RUN = 2
BIKE = 3
class SPO2State(IntEnum):
"""Equivalent to max::SPO2_STATE"""
LED_ADJUSTMENT = 0
COMPUTATION = 1
SUCCESS = 2
TIMEOUT = 3
class SCDState(IntEnum):
"""Equivalent to max::SCD_STATE"""
UNDETECTED = 0
OFF_SKIN = 1
ON_SOME_SUBJECT = 2
ON_SKIN = 3
class AlgoModelData(BaseModel):
op_mode: AlgoOpMode
hr: int # uint16, 10x calculated heart rate
hr_conf: int # uint8, confidence level in %
rr: int # uint16, 10x RR interval in ms
rr_conf: int # uint8
activity_class: ActivateClass
r: int # uint16, 1000x SpO2 R value
spo2_conf: int # uint8
spo2: int # uint16, 10x SpO2 %
spo2_percent_complete: int # uint8
spo2_low_signal_quality_flag: int # uint8
spo2_motion_flag: int # uint8
spo2_low_pi_flag: int # uint8
spo2_unreliable_r_flag: int # uint8
spo2_state: SPO2State
scd_contact_state: SCDState
# don't include reserved into the struct
# uint32
_FORMAT: ClassVar[LiteralString] = "<BHBHBBHBHBBBBBBBI"
@computed_field
@property
def hr_f(self) -> float:
"""Heart rate in beats per minute"""
return self.hr / 10.0
@computed_field
@property
def spo2_f(self) -> float:
"""SpO2 percentage"""
return self.spo2 / 10.0
@computed_field
@property
def r_f(self) -> float:
"""SpO2 R value"""
return self.r / 1000.0
@computed_field
@property
def rr_f(self) -> float:
"""RR interval in milliseconds"""
return self.rr / 10.0
@classmethod
def unmarshal(cls, data: bytes) -> "AlgoModelData":
values = struct.unpack(cls._FORMAT, data)
return cls(
op_mode=values[0],
hr=values[1],
hr_conf=values[2],
rr=values[3],
rr_conf=values[4],
activity_class=values[5],
r=values[6],
spo2_conf=values[7],
spo2=values[8],
spo2_percent_complete=values[9],
spo2_low_signal_quality_flag=values[10],
spo2_motion_flag=values[11],
spo2_low_pi_flag=values[12],
spo2_unreliable_r_flag=values[13],
spo2_state=values[14],
scd_contact_state=values[15],
)
class AlgoReport(BaseModel):
led_1: int # uint32
led_2: int # uint32
led_3: int # uint32
accel_x: int # int16, in uint of g
accel_y: int # int16, in uint of g
accel_z: int # int16, in uint of g
data: AlgoModelData
@classmethod
def unmarshal(cls, buf: bytes) -> "AlgoReport":
FORMAT: Final[str] = "<IIIhhh"
led_1, led_2, led_3, accel_x, accel_y, accel_z = struct.unpack(
FORMAT, buf[: struct.calcsize(FORMAT)]
)
data = AlgoModelData.unmarshal(buf[struct.calcsize(FORMAT) :])
return cls(
led_1=led_1,
led_2=led_2,
led_3=led_3,
accel_x=accel_x,
accel_y=accel_y,
accel_z=accel_z,
data=data,
)
class HrConfidence(IntEnum):
"""Equivalent to max::HR_CONFIDENCE"""
ZERO = 0
LOW = 1
MEDIUM = 2
HIGH = 3
def hr_confidence_to_num(hr_confidence: HrConfidence) -> float:
if hr_confidence == HrConfidence.ZERO:
return 0
elif hr_confidence == HrConfidence.LOW:
return 25
elif hr_confidence == HrConfidence.MEDIUM:
return 62.5
elif hr_confidence == HrConfidence.HIGH:
return 100
else:
raise ValueError(f"Invalid HR confidence: {hr_confidence}")
@dataclass
class HrStatusFlags:
# 2 bits
hr_confidence: HrConfidence
# 1 bit
is_active: bool
# 1 bit
is_on_skin: bool
# 4 bits
battery_level: int
@staticmethod
def unmarshal(data: bytes) -> "HrStatusFlags":
val = data[0]
return HrStatusFlags(
hr_confidence=HrConfidence(val & 0b11),
is_active=(val & 0b100) != 0,
is_on_skin=(val & 0b1000) != 0,
battery_level=val >> 4,
)
@dataclass
class HrPacket:
# 8 bits
status: HrStatusFlags
# 8 bits
id: int
# 8 bits
hr: int
# 8 bits (as `n`) + n x 24 bits
raw_data: list[int]
@staticmethod
def unmarshal(data: bytes) -> "HrPacket":
status = HrStatusFlags.unmarshal(data[0:1])
id = data[1]
hr = data[2]
raw_data_count = data[3]
raw_data_payload = data[4:]
if len(raw_data_payload) != (expected_raw_data_len := raw_data_count * 3):
raise ValueError(
f"Invalid raw data payload length: {len(raw_data_payload)}, expected {expected_raw_data_len}"
)
raw_data = [
int.from_bytes(raw_data_payload[i : i + 3], "little")
for i in range(0, expected_raw_data_len, 3)
]
return HrPacket(status, id, hr, raw_data)

70
app/utils/__init__.py Normal file
View File

@ -0,0 +1,70 @@
import time
from datetime import timedelta
class Instant:
"""A measurement of a monotonically nondecreasing clock."""
_time: float
@staticmethod
def clock() -> float:
"""Get current clock time in microseconds."""
return time.monotonic()
def __init__(self):
"""Initialize with current clock time."""
self._time = self.clock()
@classmethod
def now(cls) -> "Instant":
"""Create new Instant with current time."""
return cls()
def elapsed(self) -> timedelta:
"""Get elapsed time as timedelta."""
now = self.clock()
diff = now - self._time
return timedelta(seconds=diff)
def elapsed_ms(self) -> int:
"""Get elapsed time in milliseconds."""
return int(self.elapsed().total_seconds() * 1000)
def has_elapsed_ms(self, ms: int) -> bool:
"""Check if specified milliseconds have elapsed."""
return self.elapsed_ms() >= ms
def mut_every_ms(self, ms: int) -> bool:
"""Check if time has elapsed and reset if true."""
if self.elapsed_ms() >= ms:
self.mut_reset()
return True
return False
def has_elapsed(self, duration: timedelta) -> bool:
"""Check if specified duration has elapsed."""
return self.elapsed() >= duration
def mut_every(self, duration: timedelta) -> bool:
"""Check if duration has elapsed and reset if true."""
if self.has_elapsed(duration):
self.mut_reset()
return True
return False
def mut_reset(self) -> None:
"""Reset the timer to current time."""
self._time = self.clock()
def mut_elapsed_and_reset(self) -> timedelta:
"""Get elapsed time and reset timer."""
now = self.clock()
diff = now - self._time
duration = timedelta(microseconds=diff)
self._time = now
return duration
def count(self) -> float:
"""Get the internal time counter value."""
return self._time

63
ble_forward.py Normal file
View File

@ -0,0 +1,63 @@
import anyio
from bleak import BleakScanner, BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from typing import Final, Optional
from loguru import logger
from anyio import create_udp_socket, create_connected_udp_socket
DEVICE_NAME: Final[str] = "MAX-HUB"
UDP_SERVER_HOST: Final[str] = "localhost"
UDP_SERVER_PORT: Final[int] = 50_000
BLE_HR_SERVICE_UUID: Final[str] = "180D"
BLE_HR_CHARACTERISTIC_RAW_UUID: Final[str] = "ff241160-8a02-4626-b499-b1572d2b5a29"
async def main():
async def find_device():
while True:
device = await BleakScanner.find_device_by_name(DEVICE_NAME)
if device:
return device
else:
logger.info("Device not found, retrying...")
async with await create_connected_udp_socket(
remote_host=UDP_SERVER_HOST, remote_port=UDP_SERVER_PORT
) as udp:
device = await find_device()
async with BleakClient(device) as client:
logger.info("Connected to target device")
async def find_service(uuid: str):
services = await client.get_services()
for s in services:
if uuid.lower() in s.uuid.lower():
return s
for s in services:
logger.info("Service: {}", s.uuid)
raise ValueError(f"Service not found: {uuid}")
async def find_char(service_uuid: str, char_uuid: str):
service = await find_service(service_uuid)
char = service.get_characteristic(char_uuid)
if char is None:
raise ValueError(f"Characteristic not found: {char_uuid}")
return char
hr_raw_char = await find_char(
BLE_HR_SERVICE_UUID, BLE_HR_CHARACTERISTIC_RAW_UUID
)
async def on_hr_data(char: BleakGATTCharacteristic, data: bytearray):
logger.info("raw={}", data.hex())
await udp.send(data)
logger.info("Starting notify")
await client.start_notify(hr_raw_char, on_hr_data)
ev = anyio.Event()
await ev.wait()
if __name__ == "__main__":
anyio.run(main)

269
main.py
View File

@ -18,32 +18,43 @@ import numpy as np
import plotly.graph_objects as go import plotly.graph_objects as go
import streamlit as st import streamlit as st
import anyio import anyio
from anyio.abc import TaskGroup from anyio.abc import TaskGroup, UDPSocket
from anyio import create_memory_object_stream from anyio import create_memory_object_stream, create_udp_socket
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream
from aiomqtt import Client as MqttClient, Message as MqttMessage
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from datetime import datetime from datetime import datetime, timedelta
import awkward as ak import awkward as ak
from awkward import Array as AwkwardArray, Record as AwkwardRecord from awkward import Array as AwkwardArray, Record as AwkwardRecord
import orjson from app.model import AlgoReport, HrPacket, hr_confidence_to_num
from app.utils import Instant
from collections import deque
from dataclasses import dataclass
class AppHistory(TypedDict):
timescape: deque[datetime]
hr_data: deque[float]
hr_conf: deque[float] # in %
accel_x_data: deque[int]
accel_y_data: deque[int]
accel_z_data: deque[int]
pd_data: deque[int]
# https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825 # https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825
class AppState(TypedDict): class AppState(TypedDict):
worker_thread: Thread worker_thread: Thread
client: MqttClient message_queue: MemoryObjectReceiveStream[bytes]
message_queue: MemoryObjectReceiveStream[MqttMessage]
task_group: TaskGroup task_group: TaskGroup
history: dict[str, AwkwardArray] history: AppHistory
refresh_inst: Instant
MQTT_BROKER: Final[str] = "192.168.2.189" UDP_SERVER_HOST: Final[str] = "localhost"
MQTT_BROKER_PORT: Final[int] = 1883 UDP_SERVER_PORT: Final[int] = 50_000
MAX_LENGTH = 600 MAX_LENGTH = 600
TOPIC: Final[str] = "GwData"
NDArray = np.ndarray NDArray = np.ndarray
T = TypeVar("T") T = TypeVar("T")
@ -57,179 +68,133 @@ def unwrap(value: Optional[T]) -> T:
@st.cache_resource @st.cache_resource
def resource(params: Any = None): def resource(params: Any = None):
client: Optional[MqttClient] = None set_ev = anyio.Event()
tx, rx = create_memory_object_stream[MqttMessage]() tx, rx = create_memory_object_stream[bytes]()
tg: Optional[TaskGroup] = None tg: Optional[TaskGroup] = None
async def main(): async def poll_task():
nonlocal set_ev
nonlocal tg nonlocal tg
nonlocal client
tg = anyio.create_task_group() tg = anyio.create_task_group()
set_ev.set()
async with tg: async with tg:
client = MqttClient(MQTT_BROKER, port=MQTT_BROKER_PORT) async with await create_udp_socket(
async with client: local_host=UDP_SERVER_HOST, local_port=UDP_SERVER_PORT, reuse_port=True
await client.subscribe(TOPIC) ) as udp:
logger.info( async for packet, _ in udp:
"Subscribed {}:{} to topic {}", MQTT_BROKER, MQTT_BROKER_PORT, TOPIC await tx.send(packet)
)
# https://aiomqtt.bo3hm.com/subscribing-to-a-topic.html
async for message in client.messages:
await tx.send(message)
tr = Thread(target=anyio.run, args=(main,)) tr = Thread(target=anyio.run, args=(poll_task,))
tr.start() tr.start()
sleep(0.1) while not set_ev.is_set():
sleep(0.01)
logger.info("Poll task initialized")
state: AppState = { state: AppState = {
"worker_thread": tr, "worker_thread": tr,
"client": unwrap(client),
"message_queue": rx, "message_queue": rx,
"task_group": unwrap(tg), "task_group": unwrap(tg),
"history": {}, "history": {
"timescape": deque(maxlen=MAX_LENGTH),
"hr_data": deque(maxlen=MAX_LENGTH),
"hr_conf": deque(maxlen=MAX_LENGTH),
"accel_x_data": deque(maxlen=MAX_LENGTH),
"accel_y_data": deque(maxlen=MAX_LENGTH),
"accel_z_data": deque(maxlen=MAX_LENGTH),
"pd_data": deque(maxlen=MAX_LENGTH),
},
"refresh_inst": Instant(),
} }
logger.info("Resource created")
return state return state
class GwMessage(TypedDict):
v: int
mid: int
time: int
ip: str
mac: str
devices: list[Any]
rssi: int
class DeviceMessage(BaseModel):
mac: str
"""
Hex string, capital letters, e.g. "D6AF1CA9C491"
"""
service: str
"""
Hex string, capital letters, e.g. "180D"
"""
characteristic: str
"""
Hex string, capital letters, e.g. "2A37"
"""
value: str
"""
Hex string, capital letters, e.g. "0056"
"""
rssi: int
@property
def value_bytes(self) -> bytes:
return bytes.fromhex(self.value)
def get_device_data(message: GwMessage) -> List[DeviceMessage]:
"""
devices
[[5,"D6AF1CA9C491","180D","2A37","0056",-58],[5,"A09E1AE4E710","180D","2A37","0055",-50]]
unknown, mac addr, service, characteristic, value (hex), rssi
"""
l: list[DeviceMessage] = []
for d in message["devices"]:
x, mac, service, characteristic, value, rssi = d
l.append(
DeviceMessage(
mac=mac,
service=service,
characteristic=characteristic,
value=value,
rssi=rssi,
)
)
return l
def payload_to_hr(payload: bytes) -> int:
"""
ignore the first byte, parse the rest as a big-endian integer
Bit 0 (Heart Rate Format)
0: Heart rate value is 8 bits
1: Heart rate value is 16 bits
Bit 3 (Energy Expended)
Indicates whether energy expended data is present
Bit 4 (RR Interval)
Indicates whether RR interval data is present
"""
flags = payload[0]
if flags & 0b00000001:
return int.from_bytes(payload[1:3], "big")
else:
return payload[1]
def main(): def main():
state = resource() state = resource()
logger.info("Resource created")
history = state["history"] history = state["history"]
def push_new_message(message: GwMessage):
dms = get_device_data(message)
now = datetime.now()
for dm in dms:
rec = AwkwardRecord(
{
"time": now,
"value": payload_to_hr(dm.value_bytes),
"rssi": dm.rssi,
}
)
if dm.mac not in history:
history[dm.mac] = AwkwardArray([rec])
else:
history[dm.mac] = ak.concatenate([history[dm.mac], [rec]])
if len(history[dm.mac]) > MAX_LENGTH:
history[dm.mac] = AwkwardArray(history[dm.mac][-MAX_LENGTH:])
def on_export(): def on_export():
now = datetime.now() file_name = f"history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
filename = f"export-{now.strftime('%Y-%m-%d-%H-%M-%S')}.parquet" logger.info(f"Exporting to {file_name}")
ak.to_parquet([history], filename) rec = ak.Record(history)
logger.info("Export to {}", filename) ak.to_parquet(rec, file_name)
def on_clear(): def on_clear():
history.clear() nonlocal history
logger.info("History cleared") logger.info("Clearing history")
history["timescape"].clear()
history["hr_data"].clear()
history["hr_conf"].clear()
history["accel_x_data"].clear()
history["accel_y_data"].clear()
history["accel_z_data"].clear()
# https://docs.streamlit.io/develop/api-reference/layout
st.title("MAX-BAND Visualizer")
with st.container(border=True):
c1, c2 = st.columns(2)
with c1:
st.button(
"Export",
help="Export the current data to a parquet file",
on_click=on_export,
)
with c2:
st.button(
"Clear",
help="Clear the current data",
on_click=on_clear,
)
placeholder = st.empty()
md_placeholder = st.empty()
st.button(
"Export", help="Export the current data to a parquet file", on_click=on_export
)
st.button("Clear", help="Clear the current data", on_click=on_clear)
pannel = st.empty()
while True: while True:
try: try:
message = state["message_queue"].receive_nowait() message = state["message_queue"].receive_nowait()
except anyio.WouldBlock: except anyio.WouldBlock:
continue continue
m: str try:
if isinstance(message.payload, str): packet = HrPacket.unmarshal(message)
m = message.payload except ValueError as e:
elif isinstance(message.payload, bytes): logger.error(f"bad packet: {e}")
m = message.payload.decode("utf-8")
else:
logger.warning("Unknown message type: {}", type(message.payload))
continue continue
d = cast(GwMessage, orjson.loads(m))
push_new_message(d)
def to_scatter(key: str, dev_history: AwkwardArray): with placeholder.container():
x = ak.to_numpy(dev_history["time"]) history["hr_data"].append(packet.hr)
y = ak.to_numpy(dev_history["value"]) history["hr_conf"].append(hr_confidence_to_num(packet.status.hr_confidence))
return go.Scatter(x=x, y=y, mode="lines+markers", name=key) history["pd_data"].extend(packet.raw_data)
fig_hr, fig_pd = st.tabs(["Heart Rate", "PD"])
scatters = [to_scatter(k, el) for k, el in history.items()] with fig_hr:
fig = go.Figure(scatters) st.plotly_chart(
pannel.plotly_chart(fig) go.Figure(
data=[
go.Scatter(
y=list(history["hr_data"]),
mode="lines",
name="HR",
),
go.Scatter(
y=list(history["hr_conf"]),
mode="lines",
name="HR Confidence",
),
]
)
)
with fig_pd:
st.plotly_chart(
go.Figure(
data=[
go.Scatter(
y=list(history["pd_data"]),
mode="lines",
name="PD",
)
]
)
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
# 1659A202

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
aiomqtt==2.3.0
anyio==4.6.2.post1
awkward==2.7.4
loguru==0.7.2
numpy==2.2.2
orjson==3.10.15
plotly==5.19.0
pydantic==2.10.6
streamlit==1.40.1

5
run.sh
View File

@ -1 +1,4 @@
python -m streamlit run main.py #!/usr/bin/env bash
# python -m streamlit run main.py
python3.12 -m streamlit run main.py