From 84217c929e63f09a52a83448f2f4b42482d6402f Mon Sep 17 00:00:00 2001 From: laoboli <1293528695@qq.com> Date: Wed, 29 Apr 2026 09:02:35 +0800 Subject: [PATCH] feat: system debug. --- .gitignore | 1 + controllers/system_debug.go | 102 ++++++++++++++ main.go | 1 + mqtt/debug_service.go | 273 ++++++++++++++++++++++++++++++++++++ routes/routes.go | 6 + 5 files changed, 383 insertions(+) create mode 100644 controllers/system_debug.go create mode 100644 mqtt/debug_service.go diff --git a/.gitignore b/.gitignore index 186cd20..2e1ce1e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ config.yaml *.pdf *.md *.csv +*.docx diff --git a/controllers/system_debug.go b/controllers/system_debug.go new file mode 100644 index 0000000..4c6f604 --- /dev/null +++ b/controllers/system_debug.go @@ -0,0 +1,102 @@ +package controllers + +import ( + "errors" + "hr_receiver/models" + "hr_receiver/mqtt" + "hr_receiver/util" + "net/http" + "strings" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +type SystemDebugController struct{} + +type mqttDebugStartRequest struct { + PersistToDatabase bool `json:"persistToDatabase"` +} + +var debugUpgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func NewSystemDebugController() *SystemDebugController { + return &SystemDebugController{} +} + +func (sc *SystemDebugController) MqttStatus(c *gin.Context) { + service := mqtt.GetDebugService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt debug service unavailable") + return + } + writeSuccess(c, http.StatusOK, "query success", service.Status()) +} + +func (sc *SystemDebugController) StartMqtt(c *gin.Context) { + service := mqtt.GetDebugService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt debug service unavailable") + return + } + var payload mqttDebugStartRequest + if err := c.ShouldBindJSON(&payload); err != nil && !errors.Is(err, http.ErrBodyNotAllowed) { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + if err := service.Start(payload.PersistToDatabase); err != nil { + writeError(c, http.StatusInternalServerError, err.Error()) + return + } + writeSuccess(c, http.StatusOK, "start success", service.Status()) +} + +func (sc *SystemDebugController) StopMqtt(c *gin.Context) { + service := mqtt.GetDebugService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt debug service unavailable") + return + } + service.Stop() + writeSuccess(c, http.StatusOK, "stop success", service.Status()) +} + +func (sc *SystemDebugController) MqttWebSocket(c *gin.Context) { + service := mqtt.GetDebugService() + if service == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "mqtt debug service unavailable"}) + return + } + + token := strings.TrimSpace(c.Query("token")) + if token == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"}) + return + } + claims, err := util.ParseToken(token) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) + return + } + if claims.Role != models.UserRoleSuperAdmin { + c.JSON(http.StatusForbidden, gin.H{"error": "super admin required"}) + return + } + + conn, err := debugUpgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + return + } + service.AddSubscriber(conn) + defer service.RemoveSubscriber(conn) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + break + } + } +} diff --git a/main.go b/main.go index 31bce31..6395027 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,7 @@ func main() { if err := mqtt.Start(config.DB, config.App.MQTT); err != nil { log.Printf("mqtt listener start failed: %v", err) } + mqtt.InitDebugService(config.DB, config.App.MQTT) controllers.StartLessonPlanCleanupJob(config.DB) // 启动服务 diff --git a/mqtt/debug_service.go b/mqtt/debug_service.go new file mode 100644 index 0000000..a1207bf --- /dev/null +++ b/mqtt/debug_service.go @@ -0,0 +1,273 @@ +package mqtt + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "hr_receiver/config" + "hr_receiver/models" + "log" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gorilla/websocket" + "google.golang.org/protobuf/proto" + "gorm.io/gorm" + "gorm.io/gorm/clause" + whgw_hrpb "hr_receiver/proto" +) + +type DebugStatus struct { + Active bool `json:"active"` + ClientConnected bool `json:"clientConnected"` + PersistToDatabase bool `json:"persistToDatabase"` + Region string `json:"region"` + SubscriberCount int `json:"subscriberCount"` +} + +type DebugEvent struct { + CardKey string `json:"cardKey"` + Kind string `json:"kind"` + RegionID uint32 `json:"regionId"` + ReceivedAt int64 `json:"receivedAt"` + Topic string `json:"topic"` + HeartRate *models.MqttHeartRateRecord `json:"heartRate,omitempty"` + StepCount *models.MqttStepCountRecord `json:"stepCount,omitempty"` + GatewayStatus *models.MqttGatewayStatusRecord `json:"gatewayStatus,omitempty"` +} + +type DebugService struct { + cfg config.MQTTConfig + client mqtt.Client + db *gorm.DB + mu sync.RWMutex + persistToDatabase bool + subscribers map[*websocket.Conn]struct{} + active bool +} + +var globalDebugService *DebugService + +func InitDebugService(db *gorm.DB, cfg config.MQTTConfig) { + globalDebugService = &DebugService{ + cfg: cfg, + db: db, + subscribers: make(map[*websocket.Conn]struct{}), + } +} + +func GetDebugService() *DebugService { + return globalDebugService +} + +func (s *DebugService) Status() DebugStatus { + s.mu.RLock() + defer s.mu.RUnlock() + return DebugStatus{ + Active: s.active, + ClientConnected: s.client != nil && s.client.IsConnected(), + PersistToDatabase: s.persistToDatabase, + Region: s.cfg.Region, + SubscriberCount: len(s.subscribers), + } +} + +func (s *DebugService) Start(persistToDatabase bool) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.active && s.client != nil && s.client.IsConnected() { + s.persistToDatabase = persistToDatabase + return nil + } + if err := validateConfig(s.cfg); err != nil { + return err + } + + client, err := s.connectLocked(persistToDatabase) + if err != nil { + return err + } + s.client = client + s.persistToDatabase = persistToDatabase + s.active = true + return nil +} + +func (s *DebugService) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.client != nil && s.client.IsConnected() { + s.client.Disconnect(250) + } + s.client = nil + s.active = false + s.persistToDatabase = false +} + +func (s *DebugService) AddSubscriber(conn *websocket.Conn) { + s.mu.Lock() + s.subscribers[conn] = struct{}{} + s.mu.Unlock() +} + +func (s *DebugService) RemoveSubscriber(conn *websocket.Conn) { + s.mu.Lock() + delete(s.subscribers, conn) + s.mu.Unlock() + _ = conn.Close() +} + +func (s *DebugService) connectLocked(persistToDatabase bool) (mqtt.Client, error) { + opts := mqtt.NewClientOptions() + scheme := "tcp" + if s.cfg.UseTLS { + scheme = "ssl" + opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}) + } + broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port) + opts.AddBroker(broker) + opts.SetClientID(fmt.Sprintf("%s-debug-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano())) + opts.SetUsername(s.cfg.Username) + opts.SetPassword(s.cfg.Password) + opts.SetKeepAlive(60 * time.Second) + opts.SetAutoReconnect(false) + opts.SetConnectRetry(false) + opts.SetDefaultPublishHandler(s.handleMessage) + opts.SetOnConnectHandler(func(client mqtt.Client) { + if err := s.subscribe(client); err != nil { + log.Printf("mqtt debug subscribe failed: %v", err) + return + } + log.Printf("mqtt debug connected to %s persist=%v", broker, persistToDatabase) + }) + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + log.Printf("mqtt debug connection lost: %v", err) + s.mu.Lock() + if s.client == client { + s.client = nil + s.active = false + } + s.mu.Unlock() + }) + + client := mqtt.NewClient(opts) + token := client.Connect() + if !token.WaitTimeout(15 * time.Second) { + return nil, fmt.Errorf("mqtt debug connect timeout") + } + if err := token.Error(); err != nil { + return nil, err + } + return client, nil +} + +func (s *DebugService) subscribe(client mqtt.Client) error { + topics := []string{ + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", s.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", s.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", s.cfg.Region), + } + for _, topic := range topics { + token := client.Subscribe(topic, byte(s.cfg.QoS), s.handleMessage) + if !token.WaitTimeout(10 * time.Second) { + return fmt.Errorf("mqtt debug subscribe timeout for topic %s", topic) + } + if err := token.Error(); err != nil { + return fmt.Errorf("mqtt debug subscribe topic %s: %w", topic, err) + } + log.Printf("mqtt debug subscribed: %s", topic) + } + return nil +} + +func (s *DebugService) handleMessage(_ mqtt.Client, msg mqtt.Message) { + defer func() { + if r := recover(); r != nil { + log.Printf("mqtt debug handle panic topic=%s err=%v", msg.Topic(), r) + } + }() + if len(msg.Payload()) == 0 { + return + } + + now := time.Now().UnixMilli() + var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg + if err := proto.Unmarshal(msg.Payload(), &packet); err != nil { + log.Printf("mqtt debug payload parse failed topic=%s err=%v", msg.Topic(), err) + return + } + + switch payload := packet.Choice.(type) { + case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement: + record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now) + s.maybePersist(&record) + s.broadcast(DebugEvent{ + CardKey: fmt.Sprintf("%d-%d", record.RegionID, record.BandID), + HeartRate: &record, + Kind: "heart_rate", + ReceivedAt: now, + RegionID: record.RegionID, + Topic: msg.Topic(), + }) + case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement: + record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now) + s.maybePersist(&record) + s.broadcast(DebugEvent{ + CardKey: fmt.Sprintf("%d-%d", record.RegionID, record.BandID), + Kind: "step_count", + ReceivedAt: now, + RegionID: record.RegionID, + StepCount: &record, + Topic: msg.Topic(), + }) + case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus: + record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now) + s.maybePersist(&record) + s.broadcast(DebugEvent{ + CardKey: fmt.Sprintf("%d-%s", record.RegionID, record.GatewayMAC), + GatewayStatus: &record, + Kind: "gateway_status", + ReceivedAt: now, + RegionID: record.RegionID, + Topic: msg.Topic(), + }) + default: + log.Printf("mqtt debug payload ignored topic=%s", msg.Topic()) + } +} + +func (s *DebugService) maybePersist(record interface{}) { + s.mu.RLock() + enabled := s.persistToDatabase + s.mu.RUnlock() + if !enabled { + return + } + if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil { + log.Printf("mqtt debug persist failed type=%T err=%v", record, err) + } +} + +func (s *DebugService) broadcast(event DebugEvent) { + payload, err := json.Marshal(event) + if err != nil { + log.Printf("mqtt debug marshal failed err=%v", err) + return + } + + s.mu.RLock() + conns := make([]*websocket.Conn, 0, len(s.subscribers)) + for conn := range s.subscribers { + conns = append(conns, conn) + } + s.mu.RUnlock() + + for _, conn := range conns { + if err := conn.WriteMessage(websocket.TextMessage, payload); err != nil { + log.Printf("mqtt debug websocket send failed err=%v", err) + s.RemoveSubscriber(conn) + } + } +} diff --git a/routes/routes.go b/routes/routes.go index e39e432..e09405b 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -16,6 +16,7 @@ func SetupRouter() *gin.Engine { lessonPlanController := controllers.NewLessonPlanController() kindergartenAdminController := controllers.NewKindergartenAdminController() userAdminController := controllers.NewUserAdminController() + systemDebugController := controllers.NewSystemDebugController() v1 := r.Group("/api/v1") { @@ -54,7 +55,12 @@ func SetupRouter() *gin.Engine { admin.POST("/users", userAdminController.Create) admin.PUT("/users/:id", userAdminController.Update) admin.DELETE("/users/:id", userAdminController.Delete) + + admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus) + admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt) + admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt) } + v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket) v1.GET("/lesson-plans/share/:code/download", lessonPlanController.DownloadByShareCode) public := v1.Group("") {