64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
import anyio
|
|
from bleak import BleakScanner, BleakClient
|
|
from bleak.backends.characteristic import BleakGATTCharacteristic
|
|
from bleak.backends.device import BLEDevice
|
|
from typing import Final, Optional
|
|
from loguru import logger
|
|
from anyio import create_udp_socket, create_connected_udp_socket
|
|
|
|
DEVICE_NAME: Final[str] = "Polar Sense E4E71028"
|
|
UDP_SERVER_HOST: Final[str] = "localhost"
|
|
UDP_SERVER_PORT: Final[int] = 50_000
|
|
BLE_HR_SERVICE_UUID: Final[str] = "180D"
|
|
BLE_HR_MEASUREMENT_CHARACTERISTIC_UUID: Final[str] = "2A37"
|
|
|
|
|
|
async def main():
|
|
async def find_device():
|
|
while True:
|
|
device = await BleakScanner.find_device_by_name(DEVICE_NAME)
|
|
if device:
|
|
return device
|
|
else:
|
|
logger.info("Device not found, retrying...")
|
|
|
|
async with await create_connected_udp_socket(
|
|
remote_host=UDP_SERVER_HOST, remote_port=UDP_SERVER_PORT
|
|
) as udp:
|
|
device = await find_device()
|
|
async with BleakClient(device) as client:
|
|
logger.info("Connected to target device")
|
|
|
|
async def find_service(uuid: str):
|
|
services = await client.get_services()
|
|
for s in services:
|
|
if uuid.lower() in s.uuid.lower():
|
|
return s
|
|
for s in services:
|
|
logger.info("Service: {}", s.uuid)
|
|
raise ValueError(f"Service not found: {uuid}")
|
|
|
|
async def find_char(service_uuid: str, char_uuid: str):
|
|
service = await find_service(service_uuid)
|
|
char = service.get_characteristic(char_uuid)
|
|
if char is None:
|
|
raise ValueError(f"Characteristic not found: {char_uuid}")
|
|
return char
|
|
|
|
hr_measurement_char = await find_char(
|
|
BLE_HR_SERVICE_UUID, BLE_HR_MEASUREMENT_CHARACTERISTIC_UUID
|
|
)
|
|
|
|
async def on_hr_data(char: BleakGATTCharacteristic, data: bytearray):
|
|
logger.info("hr_measurement={}", data.hex())
|
|
await udp.send(data)
|
|
|
|
logger.info("Starting notify")
|
|
await client.start_notify(hr_measurement_char, on_hr_data)
|
|
ev = anyio.Event()
|
|
await ev.wait()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
anyio.run(main)
|