feat: system debug.
This commit is contained in:
@@ -0,0 +1,273 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hr_receiver/config"
|
||||
"hr_receiver/models"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/gorilla/websocket"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
whgw_hrpb "hr_receiver/proto"
|
||||
)
|
||||
|
||||
type DebugStatus struct {
|
||||
Active bool `json:"active"`
|
||||
ClientConnected bool `json:"clientConnected"`
|
||||
PersistToDatabase bool `json:"persistToDatabase"`
|
||||
Region string `json:"region"`
|
||||
SubscriberCount int `json:"subscriberCount"`
|
||||
}
|
||||
|
||||
type DebugEvent struct {
|
||||
CardKey string `json:"cardKey"`
|
||||
Kind string `json:"kind"`
|
||||
RegionID uint32 `json:"regionId"`
|
||||
ReceivedAt int64 `json:"receivedAt"`
|
||||
Topic string `json:"topic"`
|
||||
HeartRate *models.MqttHeartRateRecord `json:"heartRate,omitempty"`
|
||||
StepCount *models.MqttStepCountRecord `json:"stepCount,omitempty"`
|
||||
GatewayStatus *models.MqttGatewayStatusRecord `json:"gatewayStatus,omitempty"`
|
||||
}
|
||||
|
||||
type DebugService struct {
|
||||
cfg config.MQTTConfig
|
||||
client mqtt.Client
|
||||
db *gorm.DB
|
||||
mu sync.RWMutex
|
||||
persistToDatabase bool
|
||||
subscribers map[*websocket.Conn]struct{}
|
||||
active bool
|
||||
}
|
||||
|
||||
var globalDebugService *DebugService
|
||||
|
||||
func InitDebugService(db *gorm.DB, cfg config.MQTTConfig) {
|
||||
globalDebugService = &DebugService{
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
subscribers: make(map[*websocket.Conn]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func GetDebugService() *DebugService {
|
||||
return globalDebugService
|
||||
}
|
||||
|
||||
func (s *DebugService) Status() DebugStatus {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return DebugStatus{
|
||||
Active: s.active,
|
||||
ClientConnected: s.client != nil && s.client.IsConnected(),
|
||||
PersistToDatabase: s.persistToDatabase,
|
||||
Region: s.cfg.Region,
|
||||
SubscriberCount: len(s.subscribers),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DebugService) Start(persistToDatabase bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.active && s.client != nil && s.client.IsConnected() {
|
||||
s.persistToDatabase = persistToDatabase
|
||||
return nil
|
||||
}
|
||||
if err := validateConfig(s.cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := s.connectLocked(persistToDatabase)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.client = client
|
||||
s.persistToDatabase = persistToDatabase
|
||||
s.active = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DebugService) Stop() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.client != nil && s.client.IsConnected() {
|
||||
s.client.Disconnect(250)
|
||||
}
|
||||
s.client = nil
|
||||
s.active = false
|
||||
s.persistToDatabase = false
|
||||
}
|
||||
|
||||
func (s *DebugService) AddSubscriber(conn *websocket.Conn) {
|
||||
s.mu.Lock()
|
||||
s.subscribers[conn] = struct{}{}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *DebugService) RemoveSubscriber(conn *websocket.Conn) {
|
||||
s.mu.Lock()
|
||||
delete(s.subscribers, conn)
|
||||
s.mu.Unlock()
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (s *DebugService) connectLocked(persistToDatabase bool) (mqtt.Client, error) {
|
||||
opts := mqtt.NewClientOptions()
|
||||
scheme := "tcp"
|
||||
if s.cfg.UseTLS {
|
||||
scheme = "ssl"
|
||||
opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})
|
||||
}
|
||||
broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port)
|
||||
opts.AddBroker(broker)
|
||||
opts.SetClientID(fmt.Sprintf("%s-debug-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano()))
|
||||
opts.SetUsername(s.cfg.Username)
|
||||
opts.SetPassword(s.cfg.Password)
|
||||
opts.SetKeepAlive(60 * time.Second)
|
||||
opts.SetAutoReconnect(false)
|
||||
opts.SetConnectRetry(false)
|
||||
opts.SetDefaultPublishHandler(s.handleMessage)
|
||||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||||
if err := s.subscribe(client); err != nil {
|
||||
log.Printf("mqtt debug subscribe failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.Printf("mqtt debug connected to %s persist=%v", broker, persistToDatabase)
|
||||
})
|
||||
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
||||
log.Printf("mqtt debug connection lost: %v", err)
|
||||
s.mu.Lock()
|
||||
if s.client == client {
|
||||
s.client = nil
|
||||
s.active = false
|
||||
}
|
||||
s.mu.Unlock()
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
if !token.WaitTimeout(15 * time.Second) {
|
||||
return nil, fmt.Errorf("mqtt debug connect timeout")
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (s *DebugService) subscribe(client mqtt.Client) error {
|
||||
topics := []string{
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", s.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", s.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", s.cfg.Region),
|
||||
}
|
||||
for _, topic := range topics {
|
||||
token := client.Subscribe(topic, byte(s.cfg.QoS), s.handleMessage)
|
||||
if !token.WaitTimeout(10 * time.Second) {
|
||||
return fmt.Errorf("mqtt debug subscribe timeout for topic %s", topic)
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return fmt.Errorf("mqtt debug subscribe topic %s: %w", topic, err)
|
||||
}
|
||||
log.Printf("mqtt debug subscribed: %s", topic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DebugService) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("mqtt debug handle panic topic=%s err=%v", msg.Topic(), r)
|
||||
}
|
||||
}()
|
||||
if len(msg.Payload()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg
|
||||
if err := proto.Unmarshal(msg.Payload(), &packet); err != nil {
|
||||
log.Printf("mqtt debug payload parse failed topic=%s err=%v", msg.Topic(), err)
|
||||
return
|
||||
}
|
||||
|
||||
switch payload := packet.Choice.(type) {
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement:
|
||||
record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now)
|
||||
s.maybePersist(&record)
|
||||
s.broadcast(DebugEvent{
|
||||
CardKey: fmt.Sprintf("%d-%d", record.RegionID, record.BandID),
|
||||
HeartRate: &record,
|
||||
Kind: "heart_rate",
|
||||
ReceivedAt: now,
|
||||
RegionID: record.RegionID,
|
||||
Topic: msg.Topic(),
|
||||
})
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement:
|
||||
record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now)
|
||||
s.maybePersist(&record)
|
||||
s.broadcast(DebugEvent{
|
||||
CardKey: fmt.Sprintf("%d-%d", record.RegionID, record.BandID),
|
||||
Kind: "step_count",
|
||||
ReceivedAt: now,
|
||||
RegionID: record.RegionID,
|
||||
StepCount: &record,
|
||||
Topic: msg.Topic(),
|
||||
})
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus:
|
||||
record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now)
|
||||
s.maybePersist(&record)
|
||||
s.broadcast(DebugEvent{
|
||||
CardKey: fmt.Sprintf("%d-%s", record.RegionID, record.GatewayMAC),
|
||||
GatewayStatus: &record,
|
||||
Kind: "gateway_status",
|
||||
ReceivedAt: now,
|
||||
RegionID: record.RegionID,
|
||||
Topic: msg.Topic(),
|
||||
})
|
||||
default:
|
||||
log.Printf("mqtt debug payload ignored topic=%s", msg.Topic())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DebugService) maybePersist(record interface{}) {
|
||||
s.mu.RLock()
|
||||
enabled := s.persistToDatabase
|
||||
s.mu.RUnlock()
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil {
|
||||
log.Printf("mqtt debug persist failed type=%T err=%v", record, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DebugService) broadcast(event DebugEvent) {
|
||||
payload, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
log.Printf("mqtt debug marshal failed err=%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
conns := make([]*websocket.Conn, 0, len(s.subscribers))
|
||||
for conn := range s.subscribers {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
for _, conn := range conns {
|
||||
if err := conn.WriteMessage(websocket.TextMessage, payload); err != nil {
|
||||
log.Printf("mqtt debug websocket send failed err=%v", err)
|
||||
s.RemoveSubscriber(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user