From 1b447c782d1bea14a805d5ca13077ddea4800bab Mon Sep 17 00:00:00 2001 From: laoboli <1293528695@qq.com> Date: Wed, 13 May 2026 10:04:48 +0800 Subject: [PATCH] feat:data debug config. --- .gitignore | 1 + config.sample.yaml | 2 +- config/config.go | 1 + controllers/mqtt_listener_config.go | 94 +++++++++++++++++++++++++++++ main.go | 8 +++ models/mqtt_listener_setting.go | 49 +++++++++++++++ mqtt/listener.go | 23 ++++--- mqtt/listener_settings.go | 79 ++++++++++++++++++++++++ routes/routes.go | 2 + 9 files changed, 251 insertions(+), 8 deletions(-) create mode 100644 controllers/mqtt_listener_config.go create mode 100644 models/mqtt_listener_setting.go create mode 100644 mqtt/listener_settings.go diff --git a/.gitignore b/.gitignore index 2e1ce1e..8dd1759 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ config.yaml *.md *.csv *.docx +export*.sql diff --git a/config.sample.yaml b/config.sample.yaml index c607f2f..2fc49e7 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -21,7 +21,7 @@ mqtt: region: "+" use_tls: true qos: 0 - enable_measurement_subscriptions: false + enable_measurement_subscriptions: true enable_training_event_subscription: true swagger: enabled: true diff --git a/config/config.go b/config/config.go index 994dd92..8e15d77 100644 --- a/config/config.go +++ b/config/config.go @@ -60,6 +60,7 @@ func InitConfig() { viper.AddConfigPath("./") viper.SetConfigName("config") viper.SetConfigType("yaml") + viper.SetDefault("mqtt.enable_measurement_subscriptions", true) if err := viper.ReadInConfig(); err != nil { panic("Failed to read config: " + err.Error()) } diff --git a/controllers/mqtt_listener_config.go b/controllers/mqtt_listener_config.go new file mode 100644 index 0000000..cfed800 --- /dev/null +++ b/controllers/mqtt_listener_config.go @@ -0,0 +1,94 @@ +package controllers + +import ( + "hr_receiver/config" + "hr_receiver/models" + "hr_receiver/mqtt" + "log" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +type mqttListenerConfigRequest struct { + Enabled bool `json:"enabled"` + ExpireDays int `json:"expireDays"` + DeleteExpired bool `json:"deleteExpired"` +} + +// @Summary 获取MQTT监听存储配置 +// @Description 获取测量数据监听启用状态、过期天数和过期删除开关 +// @Tags 系统调试 +// @Produce json +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "查询成功" +// @Router /admin/system-debug/mqtt/listener-config [get] +func (sc *SystemDebugController) GetMqttListenerConfig(c *gin.Context) { + writeSuccess(c, http.StatusOK, "query success", mqtt.GetListenerStorageConfig()) +} + +// @Summary 更新MQTT监听存储配置 +// @Description 更新测量数据监听启用状态、过期天数和过期删除开关 +// @Tags 系统调试 +// @Accept json +// @Produce json +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "更新成功" +// @Router /admin/system-debug/mqtt/listener-config [put] +func (sc *SystemDebugController) UpdateMqttListenerConfig(c *gin.Context) { + var payload mqttListenerConfigRequest + if err := c.ShouldBindJSON(&payload); err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + cfg, err := mqtt.UpdateListenerStorageConfig(config.DB, mqtt.ListenerStorageConfig{ + Enabled: payload.Enabled, + ExpireDays: payload.ExpireDays, + DeleteExpired: payload.DeleteExpired, + }) + if err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + writeSuccess(c, http.StatusOK, "update success", cfg) +} + +func StartMqttMeasurementCleanupJob(db *gorm.DB) { + go func() { + runCleanup := func() { + if err := cleanupExpiredMqttMeasurementData(db); err != nil { + log.Printf("mqtt measurement cleanup failed: %v", err) + } + } + + runCleanup() + + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + for range ticker.C { + runCleanup() + } + }() +} + +func cleanupExpiredMqttMeasurementData(db *gorm.DB) error { + cfg := mqtt.GetListenerStorageConfig() + if !cfg.DeleteExpired { + return nil + } + + cutoffMillis := time.Now().AddDate(0, 0, -cfg.ExpireDays).UnixMilli() + modelsToClean := []interface{}{ + &models.MqttHeartRateRecord{}, + &models.MqttStepCountRecord{}, + &models.MqttGatewayStatusRecord{}, + } + for _, model := range modelsToClean { + if err := db.Unscoped().Where("received_at < ?", cutoffMillis).Delete(model).Error; err != nil { + return err + } + } + return nil +} diff --git a/main.go b/main.go index 696362f..70ea30b 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ func main() { &models.MqttStepCountRecord{}, &models.MqttGatewayStatusRecord{}, &models.MqttTrainingSessionRecord{}, + &models.MqttListenerSetting{}, &models.Gateway{}, &models.AIAnalysisRecord{}, &models.AIPricingConfig{}, @@ -72,15 +73,22 @@ func main() { if err := models.EnsureDefaultProductPrototypes(config.DB); err != nil { log.Printf("default product prototypes init failed: %v", err) } + if err := models.EnsureDefaultMqttListenerSetting(config.DB); err != nil { + log.Printf("default mqtt listener setting init failed: %v", err) + } if err := models.EnsureDefaultProjectProductTemplates(config.DB); err != nil { log.Printf("default project product templates init failed: %v", err) } + if err := mqtt.InitListenerStorageConfig(config.DB); err != nil { + log.Printf("mqtt listener config init failed: %v", err) + } if err := mqtt.Start(config.DB, config.App.MQTT); err != nil { log.Printf("mqtt listener start failed: %v", err) } mqtt.InitDebugService(config.DB, config.App.MQTT) controllers.StartLessonPlanCleanupJob(config.DB) + controllers.StartMqttMeasurementCleanupJob(config.DB) // 启动服务 r := routes.SetupRouter() diff --git a/models/mqtt_listener_setting.go b/models/mqtt_listener_setting.go new file mode 100644 index 0000000..52788b4 --- /dev/null +++ b/models/mqtt_listener_setting.go @@ -0,0 +1,49 @@ +package models + +import "gorm.io/gorm" + +const ( + DefaultMqttMeasurementExpireDays = 7 + MqttListenerSettingSingletonID = 1 +) + +type MqttListenerSetting struct { + gorm.Model + Enabled bool `gorm:"not null;default:true" json:"enabled"` + ExpireDays int `gorm:"type:int;not null;default:7" json:"expireDays"` + DeleteExpired bool `gorm:"not null;default:true" json:"deleteExpired"` +} + +func (MqttListenerSetting) TableName() string { + return "mqtt_listener_settings" +} + +func DefaultMqttListenerSetting() MqttListenerSetting { + return MqttListenerSetting{ + Model: gorm.Model{ID: MqttListenerSettingSingletonID}, + Enabled: true, + ExpireDays: DefaultMqttMeasurementExpireDays, + DeleteExpired: true, + } +} + +func EnsureDefaultMqttListenerSetting(db *gorm.DB) error { + defaults := DefaultMqttListenerSetting() + + var existing MqttListenerSetting + err := db.First(&existing, MqttListenerSettingSingletonID).Error + if err == nil { + updates := map[string]interface{}{} + if existing.ExpireDays <= 0 { + updates["expire_days"] = DefaultMqttMeasurementExpireDays + } + if len(updates) == 0 { + return nil + } + return db.Model(&existing).Updates(updates).Error + } + if err != nil && err != gorm.ErrRecordNotFound { + return err + } + return db.Create(&defaults).Error +} diff --git a/mqtt/listener.go b/mqtt/listener.go index 99b9120..172a809 100644 --- a/mqtt/listener.go +++ b/mqtt/listener.go @@ -128,13 +128,13 @@ func (l *Listener) connect() error { func (l *Listener) subscribe(client mqtt.Client) error { var topics []string - if l.cfg.EnableMeasurementSubscriptions { - topics = append(topics, - fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region), - fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region), - fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region), - ) - } + // Measurement topics stay subscribed so runtime config changes take effect + // immediately without requiring an MQTT reconnect. + topics = append(topics, + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region), + ) if l.cfg.EnableTrainingEventSubscription { topics = append(topics, "/whgw/v2/region/test/+/+") } @@ -180,12 +180,21 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) { switch payload := packet.Choice.(type) { case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement: + if !GetListenerStorageConfig().Enabled { + return + } record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now) l.enqueue(&record) case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement: + if !GetListenerStorageConfig().Enabled { + return + } record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now) l.enqueue(&record) case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus: + if !GetListenerStorageConfig().Enabled { + return + } record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now) l.enqueue(&record) default: diff --git a/mqtt/listener_settings.go b/mqtt/listener_settings.go new file mode 100644 index 0000000..215bad3 --- /dev/null +++ b/mqtt/listener_settings.go @@ -0,0 +1,79 @@ +package mqtt + +import ( + "fmt" + "hr_receiver/models" + "sync/atomic" + + "gorm.io/gorm" +) + +type ListenerStorageConfig struct { + Enabled bool `json:"enabled"` + ExpireDays int `json:"expireDays"` + DeleteExpired bool `json:"deleteExpired"` +} + +var listenerStorageConfig atomic.Value + +func InitListenerStorageConfig(db *gorm.DB) error { + cfg, err := loadListenerStorageConfig(db) + if err != nil { + return err + } + storeListenerStorageConfig(cfg) + return nil +} + +func GetListenerStorageConfig() ListenerStorageConfig { + if cfg, ok := listenerStorageConfig.Load().(ListenerStorageConfig); ok { + return normalizeListenerStorageConfig(cfg) + } + return normalizeListenerStorageConfig(ListenerStorageConfig{ + Enabled: true, + ExpireDays: models.DefaultMqttMeasurementExpireDays, + DeleteExpired: true, + }) +} + +func UpdateListenerStorageConfig(db *gorm.DB, cfg ListenerStorageConfig) (ListenerStorageConfig, error) { + cfg = normalizeListenerStorageConfig(cfg) + if cfg.ExpireDays <= 0 { + return ListenerStorageConfig{}, fmt.Errorf("expireDays must be greater than 0") + } + + record := models.MqttListenerSetting{ + Model: gorm.Model{ID: models.MqttListenerSettingSingletonID}, + Enabled: cfg.Enabled, + ExpireDays: cfg.ExpireDays, + DeleteExpired: cfg.DeleteExpired, + } + if err := db.Save(&record).Error; err != nil { + return ListenerStorageConfig{}, err + } + storeListenerStorageConfig(cfg) + return cfg, nil +} + +func loadListenerStorageConfig(db *gorm.DB) (ListenerStorageConfig, error) { + var record models.MqttListenerSetting + if err := db.First(&record, models.MqttListenerSettingSingletonID).Error; err != nil { + return ListenerStorageConfig{}, err + } + return normalizeListenerStorageConfig(ListenerStorageConfig{ + Enabled: record.Enabled, + ExpireDays: record.ExpireDays, + DeleteExpired: record.DeleteExpired, + }), nil +} + +func storeListenerStorageConfig(cfg ListenerStorageConfig) { + listenerStorageConfig.Store(normalizeListenerStorageConfig(cfg)) +} + +func normalizeListenerStorageConfig(cfg ListenerStorageConfig) ListenerStorageConfig { + if cfg.ExpireDays <= 0 { + cfg.ExpireDays = models.DefaultMqttMeasurementExpireDays + } + return cfg +} diff --git a/routes/routes.go b/routes/routes.go index 508064c..5aebe39 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -156,6 +156,8 @@ func SetupRouter() *gin.Engine { admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus) admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt) admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt) + admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig) + admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig) admin.GET("/statistics/ai-analysis-records", statisticsController.ListAIAnalysisRecords) admin.GET("/statistics/ai-analysis-records/:id/pdf", statisticsController.DownloadAIAnalysisRecordPDF)