8 Commits

8 changed files with 658 additions and 155 deletions

177
.gitignore vendored
View File

@ -1 +1,178 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# user
.DS_Store
*.parquet

View File

@ -1,4 +1,7 @@
{
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "standard"
"python.analysis.typeCheckingMode": "standard",
"cSpell.words": [
"timescape"
]
}

213
app/model/__init__.py Normal file
View File

@ -0,0 +1,213 @@
from dataclasses import dataclass
from enum import IntEnum
import struct
from typing import ClassVar, Tuple, Final, LiteralString
from pydantic import BaseModel, Field, computed_field
class AlgoOpMode(IntEnum):
"""Equivalent to max::ALGO_OP_MODE"""
CONTINUOUS_HRM_CONTINUOUS_SPO2 = 0x00 # Continuous HRM, continuous SpO2
CONTINUOUS_HRM_ONE_SHOT_SPO2 = 0x01 # Continuous HRM, one-shot SpO2
CONTINUOUS_HRM = 0x02 # Continuous HRM
SAMPLED_HRM = 0x03 # Sampled HRM
SAMPLED_HRM_ONE_SHOT_SPO2 = 0x04 # Sampled HRM, one-shot SpO2
ACTIVITY_TRACKING_ONLY = 0x05 # Activity tracking only
SPO2_CALIBRATION = 0x06 # SpO2 calibration
class ActivateClass(IntEnum):
"""Equivalent to max::ACTIVATE_CLASS"""
REST = 0
WALK = 1
RUN = 2
BIKE = 3
class SPO2State(IntEnum):
"""Equivalent to max::SPO2_STATE"""
LED_ADJUSTMENT = 0
COMPUTATION = 1
SUCCESS = 2
TIMEOUT = 3
class SCDState(IntEnum):
"""Equivalent to max::SCD_STATE"""
UNDETECTED = 0
OFF_SKIN = 1
ON_SOME_SUBJECT = 2
ON_SKIN = 3
class AlgoModelData(BaseModel):
op_mode: AlgoOpMode
hr: int # uint16, 10x calculated heart rate
hr_conf: int # uint8, confidence level in %
rr: int # uint16, 10x RR interval in ms
rr_conf: int # uint8
activity_class: ActivateClass
r: int # uint16, 1000x SpO2 R value
spo2_conf: int # uint8
spo2: int # uint16, 10x SpO2 %
spo2_percent_complete: int # uint8
spo2_low_signal_quality_flag: int # uint8
spo2_motion_flag: int # uint8
spo2_low_pi_flag: int # uint8
spo2_unreliable_r_flag: int # uint8
spo2_state: SPO2State
scd_contact_state: SCDState
# don't include reserved into the struct
# uint32
_FORMAT: ClassVar[LiteralString] = "<BHBHBBHBHBBBBBBBI"
@computed_field
@property
def hr_f(self) -> float:
"""Heart rate in beats per minute"""
return self.hr / 10.0
@computed_field
@property
def spo2_f(self) -> float:
"""SpO2 percentage"""
return self.spo2 / 10.0
@computed_field
@property
def r_f(self) -> float:
"""SpO2 R value"""
return self.r / 1000.0
@computed_field
@property
def rr_f(self) -> float:
"""RR interval in milliseconds"""
return self.rr / 10.0
@classmethod
def unmarshal(cls, data: bytes) -> "AlgoModelData":
values = struct.unpack(cls._FORMAT, data)
return cls(
op_mode=values[0],
hr=values[1],
hr_conf=values[2],
rr=values[3],
rr_conf=values[4],
activity_class=values[5],
r=values[6],
spo2_conf=values[7],
spo2=values[8],
spo2_percent_complete=values[9],
spo2_low_signal_quality_flag=values[10],
spo2_motion_flag=values[11],
spo2_low_pi_flag=values[12],
spo2_unreliable_r_flag=values[13],
spo2_state=values[14],
scd_contact_state=values[15],
)
class AlgoReport(BaseModel):
led_1: int # uint32
led_2: int # uint32
led_3: int # uint32
accel_x: int # int16, in uint of g
accel_y: int # int16, in uint of g
accel_z: int # int16, in uint of g
data: AlgoModelData
@classmethod
def unmarshal(cls, buf: bytes) -> "AlgoReport":
FORMAT: Final[str] = "<IIIhhh"
led_1, led_2, led_3, accel_x, accel_y, accel_z = struct.unpack(
FORMAT, buf[: struct.calcsize(FORMAT)]
)
data = AlgoModelData.unmarshal(buf[struct.calcsize(FORMAT) :])
return cls(
led_1=led_1,
led_2=led_2,
led_3=led_3,
accel_x=accel_x,
accel_y=accel_y,
accel_z=accel_z,
data=data,
)
class HrConfidence(IntEnum):
"""Equivalent to max::HR_CONFIDENCE"""
ZERO = 0
LOW = 1
MEDIUM = 2
HIGH = 3
def hr_confidence_to_num(hr_confidence: HrConfidence) -> float:
if hr_confidence == HrConfidence.ZERO:
return 0
elif hr_confidence == HrConfidence.LOW:
return 25
elif hr_confidence == HrConfidence.MEDIUM:
return 62.5
elif hr_confidence == HrConfidence.HIGH:
return 100
else:
raise ValueError(f"Invalid HR confidence: {hr_confidence}")
@dataclass
class HrStatusFlags:
# 2 bits
hr_confidence: HrConfidence
# 1 bit
is_active: bool
# 1 bit
is_on_skin: bool
# 4 bits
battery_level: int
@staticmethod
def unmarshal(data: bytes) -> "HrStatusFlags":
val = data[0]
return HrStatusFlags(
hr_confidence=HrConfidence(val & 0b11),
is_active=(val & 0b100) != 0,
is_on_skin=(val & 0b1000) != 0,
battery_level=val >> 4,
)
@dataclass
class HrPacket:
# 8 bits
status: HrStatusFlags
# 8 bits
id: int
# 8 bits
hr: int
# 8 bits (as `n`) + n x 24 bits
raw_data: list[int]
@staticmethod
def unmarshal(data: bytes) -> "HrPacket":
status = HrStatusFlags.unmarshal(data[0:1])
id = data[1]
hr = data[2]
raw_data_count = data[3]
raw_data_payload = data[4:]
if len(raw_data_payload) != (expected_raw_data_len := raw_data_count * 3):
raise ValueError(
f"Invalid raw data payload length: {len(raw_data_payload)}, expected {expected_raw_data_len}"
)
raw_data = [
int.from_bytes(raw_data_payload[i : i + 3], "little")
for i in range(0, expected_raw_data_len, 3)
]
return HrPacket(status, id, hr, raw_data)

70
app/utils/__init__.py Normal file
View File

@ -0,0 +1,70 @@
import time
from datetime import timedelta
class Instant:
"""A measurement of a monotonically nondecreasing clock."""
_time: float
@staticmethod
def clock() -> float:
"""Get current clock time in microseconds."""
return time.monotonic()
def __init__(self):
"""Initialize with current clock time."""
self._time = self.clock()
@classmethod
def now(cls) -> "Instant":
"""Create new Instant with current time."""
return cls()
def elapsed(self) -> timedelta:
"""Get elapsed time as timedelta."""
now = self.clock()
diff = now - self._time
return timedelta(seconds=diff)
def elapsed_ms(self) -> int:
"""Get elapsed time in milliseconds."""
return int(self.elapsed().total_seconds() * 1000)
def has_elapsed_ms(self, ms: int) -> bool:
"""Check if specified milliseconds have elapsed."""
return self.elapsed_ms() >= ms
def mut_every_ms(self, ms: int) -> bool:
"""Check if time has elapsed and reset if true."""
if self.elapsed_ms() >= ms:
self.mut_reset()
return True
return False
def has_elapsed(self, duration: timedelta) -> bool:
"""Check if specified duration has elapsed."""
return self.elapsed() >= duration
def mut_every(self, duration: timedelta) -> bool:
"""Check if duration has elapsed and reset if true."""
if self.has_elapsed(duration):
self.mut_reset()
return True
return False
def mut_reset(self) -> None:
"""Reset the timer to current time."""
self._time = self.clock()
def mut_elapsed_and_reset(self) -> timedelta:
"""Get elapsed time and reset timer."""
now = self.clock()
diff = now - self._time
duration = timedelta(microseconds=diff)
self._time = now
return duration
def count(self) -> float:
"""Get the internal time counter value."""
return self._time

63
ble_forward.py Normal file
View File

@ -0,0 +1,63 @@
import anyio
from bleak import BleakScanner, BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from typing import Final, Optional
from loguru import logger
from anyio import create_udp_socket, create_connected_udp_socket
DEVICE_NAME: Final[str] = "MAX-HUB"
UDP_SERVER_HOST: Final[str] = "localhost"
UDP_SERVER_PORT: Final[int] = 50_000
BLE_HR_SERVICE_UUID: Final[str] = "180D"
BLE_HR_CHARACTERISTIC_RAW_UUID: Final[str] = "ff241160-8a02-4626-b499-b1572d2b5a29"
async def main():
async def find_device():
while True:
device = await BleakScanner.find_device_by_name(DEVICE_NAME)
if device:
return device
else:
logger.info("Device not found, retrying...")
async with await create_connected_udp_socket(
remote_host=UDP_SERVER_HOST, remote_port=UDP_SERVER_PORT
) as udp:
device = await find_device()
async with BleakClient(device) as client:
logger.info("Connected to target device")
async def find_service(uuid: str):
services = await client.get_services()
for s in services:
if uuid.lower() in s.uuid.lower():
return s
for s in services:
logger.info("Service: {}", s.uuid)
raise ValueError(f"Service not found: {uuid}")
async def find_char(service_uuid: str, char_uuid: str):
service = await find_service(service_uuid)
char = service.get_characteristic(char_uuid)
if char is None:
raise ValueError(f"Characteristic not found: {char_uuid}")
return char
hr_raw_char = await find_char(
BLE_HR_SERVICE_UUID, BLE_HR_CHARACTERISTIC_RAW_UUID
)
async def on_hr_data(char: BleakGATTCharacteristic, data: bytearray):
logger.info("raw={}", data.hex())
await udp.send(data)
logger.info("Starting notify")
await client.start_notify(hr_raw_char, on_hr_data)
ev = anyio.Event()
await ev.wait()
if __name__ == "__main__":
anyio.run(main)

265
main.py
View File

@ -18,32 +18,43 @@ import numpy as np
import plotly.graph_objects as go
import streamlit as st
import anyio
from anyio.abc import TaskGroup
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup, UDPSocket
from anyio import create_memory_object_stream, create_udp_socket
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream
from aiomqtt import Client as MqttClient, Message as MqttMessage
from threading import Thread
from time import sleep
from pydantic import BaseModel, computed_field
from datetime import datetime
from datetime import datetime, timedelta
import awkward as ak
from awkward import Array as AwkwardArray, Record as AwkwardRecord
import orjson
from app.model import AlgoReport, HrPacket, hr_confidence_to_num
from app.utils import Instant
from collections import deque
from dataclasses import dataclass
class AppHistory(TypedDict):
timescape: deque[datetime]
hr_data: deque[float]
hr_conf: deque[float] # in %
accel_x_data: deque[int]
accel_y_data: deque[int]
accel_z_data: deque[int]
pd_data: deque[int]
# https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825
class AppState(TypedDict):
worker_thread: Thread
client: MqttClient
message_queue: MemoryObjectReceiveStream[MqttMessage]
message_queue: MemoryObjectReceiveStream[bytes]
task_group: TaskGroup
history: dict[str, AwkwardArray]
history: AppHistory
refresh_inst: Instant
MQTT_BROKER: Final[str] = "192.168.2.189"
MQTT_BROKER_PORT: Final[int] = 1883
UDP_SERVER_HOST: Final[str] = "localhost"
UDP_SERVER_PORT: Final[int] = 50_000
MAX_LENGTH = 600
TOPIC: Final[str] = "GwData"
NDArray = np.ndarray
T = TypeVar("T")
@ -57,179 +68,133 @@ def unwrap(value: Optional[T]) -> T:
@st.cache_resource
def resource(params: Any = None):
client: Optional[MqttClient] = None
tx, rx = create_memory_object_stream[MqttMessage]()
set_ev = anyio.Event()
tx, rx = create_memory_object_stream[bytes]()
tg: Optional[TaskGroup] = None
async def main():
async def poll_task():
nonlocal set_ev
nonlocal tg
nonlocal client
tg = anyio.create_task_group()
set_ev.set()
async with tg:
client = MqttClient(MQTT_BROKER, port=MQTT_BROKER_PORT)
async with client:
await client.subscribe(TOPIC)
logger.info(
"Subscribed {}:{} to topic {}", MQTT_BROKER, MQTT_BROKER_PORT, TOPIC
)
# https://aiomqtt.bo3hm.com/subscribing-to-a-topic.html
async for message in client.messages:
await tx.send(message)
async with await create_udp_socket(
local_host=UDP_SERVER_HOST, local_port=UDP_SERVER_PORT, reuse_port=True
) as udp:
async for packet, _ in udp:
await tx.send(packet)
tr = Thread(target=anyio.run, args=(main,))
tr = Thread(target=anyio.run, args=(poll_task,))
tr.start()
sleep(0.1)
while not set_ev.is_set():
sleep(0.01)
logger.info("Poll task initialized")
state: AppState = {
"worker_thread": tr,
"client": unwrap(client),
"message_queue": rx,
"task_group": unwrap(tg),
"history": {},
"history": {
"timescape": deque(maxlen=MAX_LENGTH),
"hr_data": deque(maxlen=MAX_LENGTH),
"hr_conf": deque(maxlen=MAX_LENGTH),
"accel_x_data": deque(maxlen=MAX_LENGTH),
"accel_y_data": deque(maxlen=MAX_LENGTH),
"accel_z_data": deque(maxlen=MAX_LENGTH),
"pd_data": deque(maxlen=MAX_LENGTH),
},
"refresh_inst": Instant(),
}
logger.info("Resource created")
return state
class GwMessage(TypedDict):
v: int
mid: int
time: int
ip: str
mac: str
devices: list[Any]
rssi: int
class DeviceMessage(BaseModel):
mac: str
"""
Hex string, capital letters, e.g. "D6AF1CA9C491"
"""
service: str
"""
Hex string, capital letters, e.g. "180D"
"""
characteristic: str
"""
Hex string, capital letters, e.g. "2A37"
"""
value: str
"""
Hex string, capital letters, e.g. "0056"
"""
rssi: int
@property
def value_bytes(self) -> bytes:
return bytes.fromhex(self.value)
def get_device_data(message: GwMessage) -> List[DeviceMessage]:
"""
devices
[[5,"D6AF1CA9C491","180D","2A37","0056",-58],[5,"A09E1AE4E710","180D","2A37","0055",-50]]
unknown, mac addr, service, characteristic, value (hex), rssi
"""
l: list[DeviceMessage] = []
for d in message["devices"]:
x, mac, service, characteristic, value, rssi = d
l.append(
DeviceMessage(
mac=mac,
service=service,
characteristic=characteristic,
value=value,
rssi=rssi,
)
)
return l
def payload_to_hr(payload: bytes) -> int:
"""
ignore the first byte, parse the rest as a big-endian integer
Bit 0 (Heart Rate Format)
0: Heart rate value is 8 bits
1: Heart rate value is 16 bits
Bit 3 (Energy Expended)
Indicates whether energy expended data is present
Bit 4 (RR Interval)
Indicates whether RR interval data is present
"""
flags = payload[0]
if flags & 0b00000001:
return int.from_bytes(payload[1:3], "big")
else:
return payload[1]
def main():
state = resource()
logger.info("Resource created")
history = state["history"]
def push_new_message(message: GwMessage):
dms = get_device_data(message)
now = datetime.now()
for dm in dms:
rec = AwkwardRecord(
{
"time": now,
"value": payload_to_hr(dm.value_bytes),
"rssi": dm.rssi,
}
)
if dm.mac not in history:
history[dm.mac] = AwkwardArray([rec])
else:
history[dm.mac] = ak.concatenate([history[dm.mac], [rec]])
if len(history[dm.mac]) > MAX_LENGTH:
history[dm.mac] = AwkwardArray(history[dm.mac][-MAX_LENGTH:])
def on_export():
now = datetime.now()
filename = f"export-{now.strftime('%Y-%m-%d-%H-%M-%S')}.parquet"
ak.to_parquet([history], filename)
logger.info("Export to {}", filename)
file_name = f"history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
logger.info(f"Exporting to {file_name}")
rec = ak.Record(history)
ak.to_parquet(rec, file_name)
def on_clear():
history.clear()
logger.info("History cleared")
nonlocal history
logger.info("Clearing history")
history["timescape"].clear()
history["hr_data"].clear()
history["hr_conf"].clear()
history["accel_x_data"].clear()
history["accel_y_data"].clear()
history["accel_z_data"].clear()
# https://docs.streamlit.io/develop/api-reference/layout
st.title("MAX-BAND Visualizer")
with st.container(border=True):
c1, c2 = st.columns(2)
with c1:
st.button(
"Export", help="Export the current data to a parquet file", on_click=on_export
"Export",
help="Export the current data to a parquet file",
on_click=on_export,
)
st.button("Clear", help="Clear the current data", on_click=on_clear)
pannel = st.empty()
with c2:
st.button(
"Clear",
help="Clear the current data",
on_click=on_clear,
)
placeholder = st.empty()
md_placeholder = st.empty()
while True:
try:
message = state["message_queue"].receive_nowait()
except anyio.WouldBlock:
continue
m: str
if isinstance(message.payload, str):
m = message.payload
elif isinstance(message.payload, bytes):
m = message.payload.decode("utf-8")
else:
logger.warning("Unknown message type: {}", type(message.payload))
try:
packet = HrPacket.unmarshal(message)
except ValueError as e:
logger.error(f"bad packet: {e}")
continue
d = cast(GwMessage, orjson.loads(m))
push_new_message(d)
def to_scatter(key: str, dev_history: AwkwardArray):
x = ak.to_numpy(dev_history["time"])
y = ak.to_numpy(dev_history["value"])
return go.Scatter(x=x, y=y, mode="lines+markers", name=key)
with placeholder.container():
history["hr_data"].append(packet.hr)
history["hr_conf"].append(hr_confidence_to_num(packet.status.hr_confidence))
history["pd_data"].extend(packet.raw_data)
fig_hr, fig_pd = st.tabs(["Heart Rate", "PD"])
scatters = [to_scatter(k, el) for k, el in history.items()]
fig = go.Figure(scatters)
pannel.plotly_chart(fig)
with fig_hr:
st.plotly_chart(
go.Figure(
data=[
go.Scatter(
y=list(history["hr_data"]),
mode="lines",
name="HR",
),
go.Scatter(
y=list(history["hr_conf"]),
mode="lines",
name="HR Confidence",
),
]
)
)
with fig_pd:
st.plotly_chart(
go.Figure(
data=[
go.Scatter(
y=list(history["pd_data"]),
mode="lines",
name="PD",
)
]
)
)
if __name__ == "__main__":
main()
# 1659A202

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
aiomqtt==2.3.0
anyio==4.6.2.post1
awkward==2.7.4
loguru==0.7.2
numpy==2.2.2
orjson==3.10.15
plotly==5.19.0
pydantic==2.10.6
streamlit==1.40.1

5
run.sh
View File

@ -1 +1,4 @@
python -m streamlit run main.py
#!/usr/bin/env bash
# python -m streamlit run main.py
python3.12 -m streamlit run main.py