From 1d8159f481035f44b5368d1abeaad3f9ec88108a Mon Sep 17 00:00:00 2001 From: crosstyan Date: Tue, 12 Nov 2024 16:57:49 +0800 Subject: [PATCH] x --- .gitignore | 1 + .vscode/settings.json | 4 + main.py | 229 ++++++++++++++++++++++++++++++++++++++++++ parse_exported.ipynb | 26 +++++ 4 files changed, 260 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 main.py create mode 100644 parse_exported.ipynb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bccc145 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.parquet \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..c0e8e39 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.analysis.autoImportCompletions": true, + "python.analysis.typeCheckingMode": "standard" +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..2b032b1 --- /dev/null +++ b/main.py @@ -0,0 +1,229 @@ +from typing import ( + Annotated, + AsyncGenerator, + Final, + Generator, + List, + Literal, + Optional, + Tuple, + TypeVar, + TypedDict, + Any, + cast, +) + +from loguru import logger +import numpy as np +import plotly.graph_objects as go +import streamlit as st +import anyio +from anyio.abc import TaskGroup +from anyio import create_memory_object_stream +from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream +from aiomqtt import Client as MqttClient, Message as MqttMessage +from threading import Thread +from time import sleep +from pydantic import BaseModel, computed_field +from datetime import datetime +import awkward as ak +from awkward import Array as AwkwardArray, Record as AwkwardRecord +import orjson + + +# https://handmadesoftware.medium.com/streamlit-asyncio-and-mongodb-f85f77aea825 +class AppState(TypedDict): + worker_thread: Thread + client: MqttClient + message_queue: MemoryObjectReceiveStream[MqttMessage] + task_group: TaskGroup + + +MQTT_BROKER: Final[str] = "192.168.2.189" +MQTT_BROKER_PORT: Final[int] = 1883 +TOPIC: Final[str] = "GwData" +NDArray = np.ndarray + +T = TypeVar("T") + + +def unwrap(value: Optional[T]) -> T: + if value is None: + raise ValueError("Value is None") + return value + + +@st.cache_resource +def resource(params: Any = None): + client: Optional[MqttClient] = None + tx, rx = create_memory_object_stream[MqttMessage]() + tg: Optional[TaskGroup] = None + + async def main(): + nonlocal tg + nonlocal client + tg = anyio.create_task_group() + async with tg: + client = MqttClient(MQTT_BROKER, port=MQTT_BROKER_PORT) + async with client: + await client.subscribe(TOPIC) + logger.info( + "Subscribed {}:{} to topic {}", MQTT_BROKER, MQTT_BROKER_PORT, TOPIC + ) + # https://aiomqtt.bo3hm.com/subscribing-to-a-topic.html + async for message in client.messages: + await tx.send(message) + + tr = Thread(target=anyio.run, args=(main,)) + tr.start() + sleep(0.1) + state: AppState = { + "worker_thread": tr, + "client": unwrap(client), + "message_queue": rx, + "task_group": unwrap(tg), + } + return state + + +class GwMessage(TypedDict): + v: int + mid: int + time: int + ip: str + mac: str + devices: list[Any] + rssi: int + + +class DeviceMessage(BaseModel): + mac: str + """ + Hex string, capital letters, e.g. "D6AF1CA9C491" + """ + service: str + """ + Hex string, capital letters, e.g. "180D" + """ + characteristic: str + """ + Hex string, capital letters, e.g. "2A37" + """ + value: str + """ + Hex string, capital letters, e.g. "0056" + """ + rssi: int + + @property + def value_bytes(self) -> bytes: + return bytes.fromhex(self.value) + + +def get_device_data(message: GwMessage) -> List[DeviceMessage]: + """ + devices + + [[5,"D6AF1CA9C491","180D","2A37","0056",-58],[5,"A09E1AE4E710","180D","2A37","0055",-50]] + + unknown, mac addr, service, characteristic, value (hex), rssi + """ + l: list[DeviceMessage] = [] + for d in message["devices"]: + x, mac, service, characteristic, value, rssi = d + l.append( + DeviceMessage( + mac=mac, + service=service, + characteristic=characteristic, + value=value, + rssi=rssi, + ) + ) + return l + + +def payload_to_hr(payload: bytes) -> int: + """ + ignore the first byte, parse the rest as a big-endian integer + + Bit 0 (Heart Rate Format) + 0: Heart rate value is 8 bits + 1: Heart rate value is 16 bits + Bit 3 (Energy Expended) + Indicates whether energy expended data is present + Bit 4 (RR Interval) + Indicates whether RR interval data is present + """ + flags = payload[0] + if flags & 0b00000001: + return int.from_bytes(payload[1:3], "big") + else: + return payload[1] + + +def main(): + state = resource() + logger.info("Resource created") + history: dict[str, AwkwardArray] = {} + + MAX_LENGTH = 500 + + def push_new_message(message: GwMessage): + dms = get_device_data(message) + now = datetime.now() + for dm in dms: + rec = AwkwardRecord( + { + "time": now, + "value": payload_to_hr(dm.value_bytes), + "rssi": dm.rssi, + } + ) + if dm.mac not in history: + history[dm.mac] = AwkwardArray([rec]) + else: + history[dm.mac] = ak.concatenate([history[dm.mac], [rec]]) + if len(history[dm.mac]) > MAX_LENGTH: + history[dm.mac] = AwkwardArray(history[dm.mac][-MAX_LENGTH:]) + + def on_export(): + now = datetime.now() + filename = f"export-{now.strftime('%Y-%m-%d-%H-%M-%S')}.parquet" + ak.to_parquet([history], filename) + logger.info("Export to {}", filename) + + export_btn = st.button( + "Export", help="Export the current data to a parquet file", on_click=on_export + ) + pannel = st.empty() + while True: + try: + message = state["message_queue"].receive_nowait() + except anyio.WouldBlock: + continue + m: str + if isinstance(message.payload, str): + m = message.payload + elif isinstance(message.payload, bytes): + m = message.payload.decode("utf-8") + else: + logger.warning("Unknown message type: {}", type(message.payload)) + continue + d = cast(GwMessage, orjson.loads(m)) + push_new_message(d) + + def to_scatter(key: str, dev_history: AwkwardArray): + x = ak.to_numpy(dev_history["time"]) + y = ak.to_numpy(dev_history["value"]) + return go.Scatter(x=x, y=y, mode="lines+markers", name=key) + + scatters = [to_scatter(k, el) for k, el in history.items()] + fig = go.Figure(scatters) + pannel.plotly_chart(fig) + + +if __name__ == "__main__": + main() + +# 1659A202 diff --git a/parse_exported.ipynb b/parse_exported.ipynb new file mode 100644 index 0000000..7c7c7b3 --- /dev/null +++ b/parse_exported.ipynb @@ -0,0 +1,26 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import awkward as ak" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}