feat:data debug config.
This commit is contained in:
@@ -7,3 +7,4 @@ config.yaml
|
||||
*.md
|
||||
*.csv
|
||||
*.docx
|
||||
export*.sql
|
||||
|
||||
+1
-1
@@ -21,7 +21,7 @@ mqtt:
|
||||
region: "+"
|
||||
use_tls: true
|
||||
qos: 0
|
||||
enable_measurement_subscriptions: false
|
||||
enable_measurement_subscriptions: true
|
||||
enable_training_event_subscription: true
|
||||
swagger:
|
||||
enabled: true
|
||||
|
||||
@@ -60,6 +60,7 @@ func InitConfig() {
|
||||
viper.AddConfigPath("./")
|
||||
viper.SetConfigName("config")
|
||||
viper.SetConfigType("yaml")
|
||||
viper.SetDefault("mqtt.enable_measurement_subscriptions", true)
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
panic("Failed to read config: " + err.Error())
|
||||
}
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"hr_receiver/config"
|
||||
"hr_receiver/models"
|
||||
"hr_receiver/mqtt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type mqttListenerConfigRequest struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
ExpireDays int `json:"expireDays"`
|
||||
DeleteExpired bool `json:"deleteExpired"`
|
||||
}
|
||||
|
||||
// @Summary 获取MQTT监听存储配置
|
||||
// @Description 获取测量数据监听启用状态、过期天数和过期删除开关
|
||||
// @Tags 系统调试
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||
// @Router /admin/system-debug/mqtt/listener-config [get]
|
||||
func (sc *SystemDebugController) GetMqttListenerConfig(c *gin.Context) {
|
||||
writeSuccess(c, http.StatusOK, "query success", mqtt.GetListenerStorageConfig())
|
||||
}
|
||||
|
||||
// @Summary 更新MQTT监听存储配置
|
||||
// @Description 更新测量数据监听启用状态、过期天数和过期删除开关
|
||||
// @Tags 系统调试
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "更新成功"
|
||||
// @Router /admin/system-debug/mqtt/listener-config [put]
|
||||
func (sc *SystemDebugController) UpdateMqttListenerConfig(c *gin.Context) {
|
||||
var payload mqttListenerConfigRequest
|
||||
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
cfg, err := mqtt.UpdateListenerStorageConfig(config.DB, mqtt.ListenerStorageConfig{
|
||||
Enabled: payload.Enabled,
|
||||
ExpireDays: payload.ExpireDays,
|
||||
DeleteExpired: payload.DeleteExpired,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
writeSuccess(c, http.StatusOK, "update success", cfg)
|
||||
}
|
||||
|
||||
func StartMqttMeasurementCleanupJob(db *gorm.DB) {
|
||||
go func() {
|
||||
runCleanup := func() {
|
||||
if err := cleanupExpiredMqttMeasurementData(db); err != nil {
|
||||
log.Printf("mqtt measurement cleanup failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
runCleanup()
|
||||
|
||||
ticker := time.NewTicker(24 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
runCleanup()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func cleanupExpiredMqttMeasurementData(db *gorm.DB) error {
|
||||
cfg := mqtt.GetListenerStorageConfig()
|
||||
if !cfg.DeleteExpired {
|
||||
return nil
|
||||
}
|
||||
|
||||
cutoffMillis := time.Now().AddDate(0, 0, -cfg.ExpireDays).UnixMilli()
|
||||
modelsToClean := []interface{}{
|
||||
&models.MqttHeartRateRecord{},
|
||||
&models.MqttStepCountRecord{},
|
||||
&models.MqttGatewayStatusRecord{},
|
||||
}
|
||||
for _, model := range modelsToClean {
|
||||
if err := db.Unscoped().Where("received_at < ?", cutoffMillis).Delete(model).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -50,6 +50,7 @@ func main() {
|
||||
&models.MqttStepCountRecord{},
|
||||
&models.MqttGatewayStatusRecord{},
|
||||
&models.MqttTrainingSessionRecord{},
|
||||
&models.MqttListenerSetting{},
|
||||
&models.Gateway{},
|
||||
&models.AIAnalysisRecord{},
|
||||
&models.AIPricingConfig{},
|
||||
@@ -72,15 +73,22 @@ func main() {
|
||||
if err := models.EnsureDefaultProductPrototypes(config.DB); err != nil {
|
||||
log.Printf("default product prototypes init failed: %v", err)
|
||||
}
|
||||
if err := models.EnsureDefaultMqttListenerSetting(config.DB); err != nil {
|
||||
log.Printf("default mqtt listener setting init failed: %v", err)
|
||||
}
|
||||
if err := models.EnsureDefaultProjectProductTemplates(config.DB); err != nil {
|
||||
log.Printf("default project product templates init failed: %v", err)
|
||||
}
|
||||
if err := mqtt.InitListenerStorageConfig(config.DB); err != nil {
|
||||
log.Printf("mqtt listener config init failed: %v", err)
|
||||
}
|
||||
|
||||
if err := mqtt.Start(config.DB, config.App.MQTT); err != nil {
|
||||
log.Printf("mqtt listener start failed: %v", err)
|
||||
}
|
||||
mqtt.InitDebugService(config.DB, config.App.MQTT)
|
||||
controllers.StartLessonPlanCleanupJob(config.DB)
|
||||
controllers.StartMqttMeasurementCleanupJob(config.DB)
|
||||
|
||||
// 启动服务
|
||||
r := routes.SetupRouter()
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package models
|
||||
|
||||
import "gorm.io/gorm"
|
||||
|
||||
const (
|
||||
DefaultMqttMeasurementExpireDays = 7
|
||||
MqttListenerSettingSingletonID = 1
|
||||
)
|
||||
|
||||
type MqttListenerSetting struct {
|
||||
gorm.Model
|
||||
Enabled bool `gorm:"not null;default:true" json:"enabled"`
|
||||
ExpireDays int `gorm:"type:int;not null;default:7" json:"expireDays"`
|
||||
DeleteExpired bool `gorm:"not null;default:true" json:"deleteExpired"`
|
||||
}
|
||||
|
||||
func (MqttListenerSetting) TableName() string {
|
||||
return "mqtt_listener_settings"
|
||||
}
|
||||
|
||||
func DefaultMqttListenerSetting() MqttListenerSetting {
|
||||
return MqttListenerSetting{
|
||||
Model: gorm.Model{ID: MqttListenerSettingSingletonID},
|
||||
Enabled: true,
|
||||
ExpireDays: DefaultMqttMeasurementExpireDays,
|
||||
DeleteExpired: true,
|
||||
}
|
||||
}
|
||||
|
||||
func EnsureDefaultMqttListenerSetting(db *gorm.DB) error {
|
||||
defaults := DefaultMqttListenerSetting()
|
||||
|
||||
var existing MqttListenerSetting
|
||||
err := db.First(&existing, MqttListenerSettingSingletonID).Error
|
||||
if err == nil {
|
||||
updates := map[string]interface{}{}
|
||||
if existing.ExpireDays <= 0 {
|
||||
updates["expire_days"] = DefaultMqttMeasurementExpireDays
|
||||
}
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
return db.Model(&existing).Updates(updates).Error
|
||||
}
|
||||
if err != nil && err != gorm.ErrRecordNotFound {
|
||||
return err
|
||||
}
|
||||
return db.Create(&defaults).Error
|
||||
}
|
||||
+11
-2
@@ -128,13 +128,13 @@ func (l *Listener) connect() error {
|
||||
|
||||
func (l *Listener) subscribe(client mqtt.Client) error {
|
||||
var topics []string
|
||||
if l.cfg.EnableMeasurementSubscriptions {
|
||||
// Measurement topics stay subscribed so runtime config changes take effect
|
||||
// immediately without requiring an MQTT reconnect.
|
||||
topics = append(topics,
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region),
|
||||
)
|
||||
}
|
||||
if l.cfg.EnableTrainingEventSubscription {
|
||||
topics = append(topics, "/whgw/v2/region/test/+/+")
|
||||
}
|
||||
@@ -180,12 +180,21 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
switch payload := packet.Choice.(type) {
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
default:
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hr_receiver/models"
|
||||
"sync/atomic"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type ListenerStorageConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
ExpireDays int `json:"expireDays"`
|
||||
DeleteExpired bool `json:"deleteExpired"`
|
||||
}
|
||||
|
||||
var listenerStorageConfig atomic.Value
|
||||
|
||||
func InitListenerStorageConfig(db *gorm.DB) error {
|
||||
cfg, err := loadListenerStorageConfig(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storeListenerStorageConfig(cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetListenerStorageConfig() ListenerStorageConfig {
|
||||
if cfg, ok := listenerStorageConfig.Load().(ListenerStorageConfig); ok {
|
||||
return normalizeListenerStorageConfig(cfg)
|
||||
}
|
||||
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||
Enabled: true,
|
||||
ExpireDays: models.DefaultMqttMeasurementExpireDays,
|
||||
DeleteExpired: true,
|
||||
})
|
||||
}
|
||||
|
||||
func UpdateListenerStorageConfig(db *gorm.DB, cfg ListenerStorageConfig) (ListenerStorageConfig, error) {
|
||||
cfg = normalizeListenerStorageConfig(cfg)
|
||||
if cfg.ExpireDays <= 0 {
|
||||
return ListenerStorageConfig{}, fmt.Errorf("expireDays must be greater than 0")
|
||||
}
|
||||
|
||||
record := models.MqttListenerSetting{
|
||||
Model: gorm.Model{ID: models.MqttListenerSettingSingletonID},
|
||||
Enabled: cfg.Enabled,
|
||||
ExpireDays: cfg.ExpireDays,
|
||||
DeleteExpired: cfg.DeleteExpired,
|
||||
}
|
||||
if err := db.Save(&record).Error; err != nil {
|
||||
return ListenerStorageConfig{}, err
|
||||
}
|
||||
storeListenerStorageConfig(cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func loadListenerStorageConfig(db *gorm.DB) (ListenerStorageConfig, error) {
|
||||
var record models.MqttListenerSetting
|
||||
if err := db.First(&record, models.MqttListenerSettingSingletonID).Error; err != nil {
|
||||
return ListenerStorageConfig{}, err
|
||||
}
|
||||
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||
Enabled: record.Enabled,
|
||||
ExpireDays: record.ExpireDays,
|
||||
DeleteExpired: record.DeleteExpired,
|
||||
}), nil
|
||||
}
|
||||
|
||||
func storeListenerStorageConfig(cfg ListenerStorageConfig) {
|
||||
listenerStorageConfig.Store(normalizeListenerStorageConfig(cfg))
|
||||
}
|
||||
|
||||
func normalizeListenerStorageConfig(cfg ListenerStorageConfig) ListenerStorageConfig {
|
||||
if cfg.ExpireDays <= 0 {
|
||||
cfg.ExpireDays = models.DefaultMqttMeasurementExpireDays
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
@@ -156,6 +156,8 @@ func SetupRouter() *gin.Engine {
|
||||
admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus)
|
||||
admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt)
|
||||
admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt)
|
||||
admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig)
|
||||
admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig)
|
||||
|
||||
admin.GET("/statistics/ai-analysis-records", statisticsController.ListAIAnalysisRecords)
|
||||
admin.GET("/statistics/ai-analysis-records/:id/pdf", statisticsController.DownloadAIAnalysisRecordPDF)
|
||||
|
||||
Reference in New Issue
Block a user