Compare commits
3 Commits
25ed68be27
...
0d07dc653b
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d07dc653b | |||
| b5b6d64f5e | |||
| 1b447c782d |
@@ -7,3 +7,4 @@ config.yaml
|
||||
*.md
|
||||
*.csv
|
||||
*.docx
|
||||
export*.sql
|
||||
|
||||
+1
-1
@@ -21,7 +21,7 @@ mqtt:
|
||||
region: "+"
|
||||
use_tls: true
|
||||
qos: 0
|
||||
enable_measurement_subscriptions: false
|
||||
enable_measurement_subscriptions: true
|
||||
enable_training_event_subscription: true
|
||||
swagger:
|
||||
enabled: true
|
||||
|
||||
@@ -60,6 +60,7 @@ func InitConfig() {
|
||||
viper.AddConfigPath("./")
|
||||
viper.SetConfigName("config")
|
||||
viper.SetConfigType("yaml")
|
||||
viper.SetDefault("mqtt.enable_measurement_subscriptions", true)
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
panic("Failed to read config: " + err.Error())
|
||||
}
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"hr_receiver/config"
|
||||
"hr_receiver/models"
|
||||
"hr_receiver/mqtt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type mqttListenerConfigRequest struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
ExpireDays int `json:"expireDays"`
|
||||
DeleteExpired bool `json:"deleteExpired"`
|
||||
}
|
||||
|
||||
// @Summary 获取MQTT监听存储配置
|
||||
// @Description 获取测量数据监听启用状态、过期天数和过期删除开关
|
||||
// @Tags 系统调试
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||
// @Router /admin/system-debug/mqtt/listener-config [get]
|
||||
func (sc *SystemDebugController) GetMqttListenerConfig(c *gin.Context) {
|
||||
writeSuccess(c, http.StatusOK, "query success", mqtt.GetListenerStorageConfig())
|
||||
}
|
||||
|
||||
// @Summary 更新MQTT监听存储配置
|
||||
// @Description 更新测量数据监听启用状态、过期天数和过期删除开关
|
||||
// @Tags 系统调试
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "更新成功"
|
||||
// @Router /admin/system-debug/mqtt/listener-config [put]
|
||||
func (sc *SystemDebugController) UpdateMqttListenerConfig(c *gin.Context) {
|
||||
var payload mqttListenerConfigRequest
|
||||
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
cfg, err := mqtt.UpdateListenerStorageConfig(config.DB, mqtt.ListenerStorageConfig{
|
||||
Enabled: payload.Enabled,
|
||||
ExpireDays: payload.ExpireDays,
|
||||
DeleteExpired: payload.DeleteExpired,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
writeSuccess(c, http.StatusOK, "update success", cfg)
|
||||
}
|
||||
|
||||
func StartMqttMeasurementCleanupJob(db *gorm.DB) {
|
||||
go func() {
|
||||
runCleanup := func() {
|
||||
if err := cleanupExpiredMqttMeasurementData(db); err != nil {
|
||||
log.Printf("mqtt measurement cleanup failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
runCleanup()
|
||||
|
||||
ticker := time.NewTicker(24 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
runCleanup()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func cleanupExpiredMqttMeasurementData(db *gorm.DB) error {
|
||||
cfg := mqtt.GetListenerStorageConfig()
|
||||
if !cfg.DeleteExpired {
|
||||
return nil
|
||||
}
|
||||
|
||||
cutoffMillis := time.Now().AddDate(0, 0, -cfg.ExpireDays).UnixMilli()
|
||||
modelsToClean := []interface{}{
|
||||
&models.MqttHeartRateRecord{},
|
||||
&models.MqttStepCountRecord{},
|
||||
&models.MqttGatewayStatusRecord{},
|
||||
}
|
||||
for _, model := range modelsToClean {
|
||||
if err := db.Unscoped().Where("received_at < ?", cutoffMillis).Delete(model).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -32,6 +32,47 @@ type analysisRecordListParams struct {
|
||||
EndTime int64 `form:"endTime"`
|
||||
}
|
||||
|
||||
type mqttMeasurementHistoryParams struct {
|
||||
PageNum int `form:"pageNum,default=1"`
|
||||
PageSize int `form:"pageSize,default=50"`
|
||||
DataType string `form:"dataType"`
|
||||
RegionID uint32 `form:"regionId"`
|
||||
Addr string `form:"addr"`
|
||||
ValueOperator string `form:"valueOperator"`
|
||||
Value *int64 `form:"value"`
|
||||
StartTime int64 `form:"startTime"`
|
||||
EndTime int64 `form:"endTime"`
|
||||
}
|
||||
|
||||
type mqttMeasurementHistoryItem struct {
|
||||
ID uint `json:"id"`
|
||||
DataType string `json:"dataType"`
|
||||
Identifier string `json:"identifier"`
|
||||
Topic string `json:"topic"`
|
||||
RegionID uint32 `json:"regionId"`
|
||||
Addr string `json:"addr"`
|
||||
Value int64 `json:"value"`
|
||||
ValueLabel string `json:"valueLabel"`
|
||||
PacketNum uint32 `json:"packetNum"`
|
||||
GatewayMAC string `json:"gatewayMac"`
|
||||
Battery uint32 `json:"battery"`
|
||||
IsActive bool `json:"isActive"`
|
||||
IsOnSkin bool `json:"isOnSkin"`
|
||||
SignalRSSINeg float64 `json:"signalRssiNeg"`
|
||||
SNR float64 `json:"snr"`
|
||||
ReceivedAt int64 `json:"receivedAt"`
|
||||
CreatedAtMilli int64 `json:"createdAt"`
|
||||
}
|
||||
|
||||
type mqttMeasurementHistorySummary struct {
|
||||
AvgValue float64 `json:"avgValue"`
|
||||
MaxValue int64 `json:"maxValue"`
|
||||
MinValue int64 `json:"minValue"`
|
||||
RecordCount int64 `json:"recordCount"`
|
||||
UniqueAddrs int64 `json:"uniqueAddrs"`
|
||||
ValueLabel string `json:"valueLabel"`
|
||||
}
|
||||
|
||||
// --- 查询接口 ---
|
||||
|
||||
// @Summary 获取AI分析记录列表
|
||||
@@ -94,6 +135,360 @@ func (sc *StatisticsController) ListAIAnalysisRecords(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// @Summary 获取MQTT测量历史数据
|
||||
// @Description 分页查询MQTT测量历史数据,支持按数据类型、区域、地址、数值比较和时间范围筛选
|
||||
// @Tags 统计管理
|
||||
// @Produce json
|
||||
// @Param pageNum query int false "页码(默认1)"
|
||||
// @Param pageSize query int false "每页数量(默认50,最大200)"
|
||||
// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate"
|
||||
// @Param regionId query int false "区域ID"
|
||||
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
|
||||
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
|
||||
// @Param value query int false "数值比较值"
|
||||
// @Param startTime query int false "开始时间(毫秒时间戳)"
|
||||
// @Param endTime query int false "结束时间(毫秒时间戳)"
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||
// @Router /admin/statistics/mqtt-measurements [get]
|
||||
func (sc *StatisticsController) ListMqttMeasurementHistory(c *gin.Context) {
|
||||
var params mqttMeasurementHistoryParams
|
||||
if err := c.ShouldBindQuery(¶ms); err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if params.PageNum < 1 {
|
||||
params.PageNum = 1
|
||||
}
|
||||
if params.PageSize < 1 || params.PageSize > 200 {
|
||||
params.PageSize = 50
|
||||
}
|
||||
if strings.TrimSpace(params.DataType) == "" {
|
||||
params.DataType = "heart_rate"
|
||||
}
|
||||
|
||||
switch params.DataType {
|
||||
case "heart_rate":
|
||||
sc.listHeartRateMeasurementHistory(c, params)
|
||||
case "step_count":
|
||||
sc.listStepCountMeasurementHistory(c, params)
|
||||
default:
|
||||
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
|
||||
}
|
||||
}
|
||||
|
||||
// @Summary 获取MQTT测量图表数据
|
||||
// @Description 查询MQTT测量图表数据,返回同筛选条件下的完整历史序列,不参与分页
|
||||
// @Tags 统计管理
|
||||
// @Produce json
|
||||
// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate"
|
||||
// @Param regionId query int false "区域ID"
|
||||
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
|
||||
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
|
||||
// @Param value query int false "数值比较值"
|
||||
// @Param startTime query int false "开始时间(毫秒时间戳)"
|
||||
// @Param endTime query int false "结束时间(毫秒时间戳)"
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||
// @Router /admin/statistics/mqtt-measurements/chart [get]
|
||||
func (sc *StatisticsController) MqttMeasurementChartData(c *gin.Context) {
|
||||
var params mqttMeasurementHistoryParams
|
||||
if err := c.ShouldBindQuery(¶ms); err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(params.DataType) == "" {
|
||||
params.DataType = "heart_rate"
|
||||
}
|
||||
|
||||
switch params.DataType {
|
||||
case "heart_rate":
|
||||
sc.chartHeartRateMeasurementHistory(c, params)
|
||||
case "step_count":
|
||||
sc.chartStepCountMeasurementHistory(c, params)
|
||||
default:
|
||||
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) listHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
|
||||
|
||||
var total int64
|
||||
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to count mqtt heart rate records")
|
||||
return
|
||||
}
|
||||
|
||||
offset := (params.PageNum - 1) * params.PageSize
|
||||
var records []models.MqttHeartRateRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at DESC").
|
||||
Offset(offset).
|
||||
Limit(params.PageSize).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "心率"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "heart_rate",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.HeartRate),
|
||||
ValueLabel: "心率",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: record.Battery,
|
||||
IsActive: record.IsActive,
|
||||
IsOnSkin: record.IsOnSkin,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"pagination": gin.H{
|
||||
"currentPage": params.PageNum,
|
||||
"pageSize": params.PageSize,
|
||||
"totalList": total,
|
||||
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
|
||||
},
|
||||
"summary": summary,
|
||||
})
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
|
||||
|
||||
var records []models.MqttHeartRateRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Limit(10000).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "心率"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "heart_rate",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.HeartRate),
|
||||
ValueLabel: "心率",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: record.Battery,
|
||||
IsActive: record.IsActive,
|
||||
IsOnSkin: record.IsOnSkin,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"summary": summary,
|
||||
})
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) listStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
|
||||
|
||||
var total int64
|
||||
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to count mqtt step count records")
|
||||
return
|
||||
}
|
||||
|
||||
offset := (params.PageNum - 1) * params.PageSize
|
||||
var records []models.MqttStepCountRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at DESC").
|
||||
Offset(offset).
|
||||
Limit(params.PageSize).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "步数"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "step_count",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.StepCount),
|
||||
ValueLabel: "步数",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: 0,
|
||||
IsActive: false,
|
||||
IsOnSkin: false,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"pagination": gin.H{
|
||||
"currentPage": params.PageNum,
|
||||
"pageSize": params.PageSize,
|
||||
"totalList": total,
|
||||
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
|
||||
},
|
||||
"summary": summary,
|
||||
})
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
|
||||
|
||||
var records []models.MqttStepCountRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Limit(10000).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "步数"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "step_count",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.StepCount),
|
||||
ValueLabel: "步数",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: 0,
|
||||
IsActive: false,
|
||||
IsOnSkin: false,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"summary": summary,
|
||||
})
|
||||
}
|
||||
|
||||
func applyMqttMeasurementFilters(query *gorm.DB, params mqttMeasurementHistoryParams, addrField, valueField, timeField string) *gorm.DB {
|
||||
if params.RegionID > 0 {
|
||||
query = query.Where("region_id = ?", params.RegionID)
|
||||
}
|
||||
if addr := strings.TrimSpace(params.Addr); addr != "" {
|
||||
query = query.Where(addrField+" LIKE ?", "%"+addr+"%")
|
||||
}
|
||||
if params.StartTime > 0 {
|
||||
query = query.Where(timeField+" >= ?", params.StartTime)
|
||||
}
|
||||
if params.EndTime > 0 {
|
||||
query = query.Where(timeField+" <= ?", params.EndTime)
|
||||
}
|
||||
if params.Value != nil {
|
||||
switch strings.TrimSpace(params.ValueOperator) {
|
||||
case "", "eq":
|
||||
query = query.Where(valueField+" = ?", *params.Value)
|
||||
case "gt":
|
||||
query = query.Where(valueField+" > ?", *params.Value)
|
||||
case "gte":
|
||||
query = query.Where(valueField+" >= ?", *params.Value)
|
||||
case "lt":
|
||||
query = query.Where(valueField+" < ?", *params.Value)
|
||||
case "lte":
|
||||
query = query.Where(valueField+" <= ?", *params.Value)
|
||||
}
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, summary *mqttMeasurementHistorySummary) error {
|
||||
type rawSummary struct {
|
||||
RecordCount int64
|
||||
UniqueAddrs int64
|
||||
MinValue int64
|
||||
MaxValue int64
|
||||
AvgValue float64
|
||||
}
|
||||
|
||||
var result rawSummary
|
||||
if err := query.Session(&gorm.Session{}).Select(
|
||||
"COUNT(*) as record_count, COUNT(DISTINCT " + addrField + ") as unique_addrs, COALESCE(MIN(" + valueField + "), 0) as min_value, COALESCE(MAX(" + valueField + "), 0) as max_value, COALESCE(AVG(" + valueField + "), 0) as avg_value",
|
||||
).Scan(&result).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
summary.RecordCount = result.RecordCount
|
||||
summary.UniqueAddrs = result.UniqueAddrs
|
||||
summary.MinValue = result.MinValue
|
||||
summary.MaxValue = result.MaxValue
|
||||
summary.AvgValue = result.AvgValue
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- 删除接口 ---
|
||||
|
||||
// @Summary 删除AI分析记录
|
||||
|
||||
@@ -50,6 +50,7 @@ func main() {
|
||||
&models.MqttStepCountRecord{},
|
||||
&models.MqttGatewayStatusRecord{},
|
||||
&models.MqttTrainingSessionRecord{},
|
||||
&models.MqttListenerSetting{},
|
||||
&models.Gateway{},
|
||||
&models.AIAnalysisRecord{},
|
||||
&models.AIPricingConfig{},
|
||||
@@ -72,15 +73,22 @@ func main() {
|
||||
if err := models.EnsureDefaultProductPrototypes(config.DB); err != nil {
|
||||
log.Printf("default product prototypes init failed: %v", err)
|
||||
}
|
||||
if err := models.EnsureDefaultMqttListenerSetting(config.DB); err != nil {
|
||||
log.Printf("default mqtt listener setting init failed: %v", err)
|
||||
}
|
||||
if err := models.EnsureDefaultProjectProductTemplates(config.DB); err != nil {
|
||||
log.Printf("default project product templates init failed: %v", err)
|
||||
}
|
||||
if err := mqtt.InitListenerStorageConfig(config.DB); err != nil {
|
||||
log.Printf("mqtt listener config init failed: %v", err)
|
||||
}
|
||||
|
||||
if err := mqtt.Start(config.DB, config.App.MQTT); err != nil {
|
||||
log.Printf("mqtt listener start failed: %v", err)
|
||||
}
|
||||
mqtt.InitDebugService(config.DB, config.App.MQTT)
|
||||
controllers.StartLessonPlanCleanupJob(config.DB)
|
||||
controllers.StartMqttMeasurementCleanupJob(config.DB)
|
||||
|
||||
// 启动服务
|
||||
r := routes.SetupRouter()
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package models
|
||||
|
||||
import "gorm.io/gorm"
|
||||
|
||||
const (
|
||||
DefaultMqttMeasurementExpireDays = 7
|
||||
MqttListenerSettingSingletonID = 1
|
||||
)
|
||||
|
||||
type MqttListenerSetting struct {
|
||||
gorm.Model
|
||||
Enabled bool `gorm:"not null;default:true" json:"enabled"`
|
||||
ExpireDays int `gorm:"type:int;not null;default:7" json:"expireDays"`
|
||||
DeleteExpired bool `gorm:"not null;default:true" json:"deleteExpired"`
|
||||
}
|
||||
|
||||
func (MqttListenerSetting) TableName() string {
|
||||
return "mqtt_listener_settings"
|
||||
}
|
||||
|
||||
func DefaultMqttListenerSetting() MqttListenerSetting {
|
||||
return MqttListenerSetting{
|
||||
Model: gorm.Model{ID: MqttListenerSettingSingletonID},
|
||||
Enabled: true,
|
||||
ExpireDays: DefaultMqttMeasurementExpireDays,
|
||||
DeleteExpired: true,
|
||||
}
|
||||
}
|
||||
|
||||
func EnsureDefaultMqttListenerSetting(db *gorm.DB) error {
|
||||
defaults := DefaultMqttListenerSetting()
|
||||
|
||||
var existing MqttListenerSetting
|
||||
err := db.First(&existing, MqttListenerSettingSingletonID).Error
|
||||
if err == nil {
|
||||
updates := map[string]interface{}{}
|
||||
if existing.ExpireDays <= 0 {
|
||||
updates["expire_days"] = DefaultMqttMeasurementExpireDays
|
||||
}
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
return db.Model(&existing).Updates(updates).Error
|
||||
}
|
||||
if err != nil && err != gorm.ErrRecordNotFound {
|
||||
return err
|
||||
}
|
||||
return db.Create(&defaults).Error
|
||||
}
|
||||
@@ -245,8 +245,15 @@ func (s *DebugService) maybePersist(record interface{}) {
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil {
|
||||
log.Printf("mqtt debug persist failed type=%T err=%v", record, err)
|
||||
tx := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||
if tx.Error != nil {
|
||||
log.Printf("mqtt debug persist failed type=%T err=%v", record, tx.Error)
|
||||
return
|
||||
}
|
||||
if tx.RowsAffected > 0 {
|
||||
//logPersistResult("mqtt debug", "inserted", record)
|
||||
} else {
|
||||
//logPersistResult("mqtt debug", "skipped duplicate", record)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+54
-12
@@ -128,13 +128,13 @@ func (l *Listener) connect() error {
|
||||
|
||||
func (l *Listener) subscribe(client mqtt.Client) error {
|
||||
var topics []string
|
||||
if l.cfg.EnableMeasurementSubscriptions {
|
||||
topics = append(topics,
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region),
|
||||
)
|
||||
}
|
||||
// Measurement topics stay subscribed so runtime config changes take effect
|
||||
// immediately without requiring an MQTT reconnect.
|
||||
topics = append(topics,
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region),
|
||||
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region),
|
||||
)
|
||||
if l.cfg.EnableTrainingEventSubscription {
|
||||
topics = append(topics, "/whgw/v2/region/test/+/+")
|
||||
}
|
||||
@@ -180,12 +180,21 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
switch payload := packet.Choice.(type) {
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus:
|
||||
if !GetListenerStorageConfig().Enabled {
|
||||
return
|
||||
}
|
||||
record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now)
|
||||
l.enqueue(&record)
|
||||
default:
|
||||
@@ -222,7 +231,16 @@ func (l *Listener) persistRecord(record interface{}) error {
|
||||
case *models.MqttTrainingSessionRecord:
|
||||
return l.persistTrainingSession(r)
|
||||
default:
|
||||
return l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error
|
||||
tx := l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
if tx.RowsAffected > 0 {
|
||||
//logPersistResult("mqtt listener", "inserted", record)
|
||||
} else {
|
||||
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,10 +264,34 @@ func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionReco
|
||||
assignments["ended_at"] = *record.EndedAt
|
||||
}
|
||||
|
||||
return l.db.Clauses(clause.OnConflict{
|
||||
tx := l.db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "identifier"}},
|
||||
DoUpdates: clause.Assignments(assignments),
|
||||
}).Create(record).Error
|
||||
}).Create(record)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
if tx.RowsAffected > 0 {
|
||||
//logPersistResult("mqtt listener", "upserted", record)
|
||||
} else {
|
||||
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func logPersistResult(source, action string, record interface{}) {
|
||||
switch r := record.(type) {
|
||||
case *models.MqttHeartRateRecord:
|
||||
log.Printf("%s %s heart_rate region=%d addr=%s hr=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.HeartRate, r.PacketNum, r.Identifier, r.Topic)
|
||||
case *models.MqttStepCountRecord:
|
||||
log.Printf("%s %s step_count region=%d addr=%s steps=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.StepCount, r.PacketNum, r.Identifier, r.Topic)
|
||||
case *models.MqttGatewayStatusRecord:
|
||||
log.Printf("%s %s gateway_status region=%d gateway=%s rxCount=%d uptimeMs=%d identifier=%s topic=%s", source, action, r.RegionID, r.GatewayMAC, r.RxCount, r.UptimeMs, r.Identifier, r.Topic)
|
||||
case *models.MqttTrainingSessionRecord:
|
||||
log.Printf("%s %s training_session region=%d testId=%s event=%s identifier=%s publishedAt=%d topic=%s", source, action, r.RegionID, r.TestID, r.EventType, r.Identifier, r.PublishedAt, r.Topic)
|
||||
default:
|
||||
log.Printf("%s %s record type=%T", source, action, record)
|
||||
}
|
||||
}
|
||||
|
||||
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
|
||||
@@ -265,7 +307,7 @@ func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, no
|
||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||
|
||||
return models.MqttHeartRateRecord{
|
||||
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
||||
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||
Topic: topic,
|
||||
RegionID: regionID,
|
||||
GatewayMAC: gatewayMAC,
|
||||
@@ -309,7 +351,7 @@ func buildStepCountRecord(measurement *whgw_hrpb.StepCountMeasurement, topic str
|
||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||
|
||||
return models.MqttStepCountRecord{
|
||||
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
||||
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||
Topic: topic,
|
||||
RegionID: regionID,
|
||||
GatewayMAC: gatewayMAC,
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hr_receiver/models"
|
||||
"sync/atomic"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type ListenerStorageConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
ExpireDays int `json:"expireDays"`
|
||||
DeleteExpired bool `json:"deleteExpired"`
|
||||
}
|
||||
|
||||
var listenerStorageConfig atomic.Value
|
||||
|
||||
func InitListenerStorageConfig(db *gorm.DB) error {
|
||||
cfg, err := loadListenerStorageConfig(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storeListenerStorageConfig(cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetListenerStorageConfig() ListenerStorageConfig {
|
||||
if cfg, ok := listenerStorageConfig.Load().(ListenerStorageConfig); ok {
|
||||
return normalizeListenerStorageConfig(cfg)
|
||||
}
|
||||
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||
Enabled: true,
|
||||
ExpireDays: models.DefaultMqttMeasurementExpireDays,
|
||||
DeleteExpired: true,
|
||||
})
|
||||
}
|
||||
|
||||
func UpdateListenerStorageConfig(db *gorm.DB, cfg ListenerStorageConfig) (ListenerStorageConfig, error) {
|
||||
cfg = normalizeListenerStorageConfig(cfg)
|
||||
if cfg.ExpireDays <= 0 {
|
||||
return ListenerStorageConfig{}, fmt.Errorf("expireDays must be greater than 0")
|
||||
}
|
||||
|
||||
record := models.MqttListenerSetting{
|
||||
Model: gorm.Model{ID: models.MqttListenerSettingSingletonID},
|
||||
Enabled: cfg.Enabled,
|
||||
ExpireDays: cfg.ExpireDays,
|
||||
DeleteExpired: cfg.DeleteExpired,
|
||||
}
|
||||
if err := db.Save(&record).Error; err != nil {
|
||||
return ListenerStorageConfig{}, err
|
||||
}
|
||||
storeListenerStorageConfig(cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func loadListenerStorageConfig(db *gorm.DB) (ListenerStorageConfig, error) {
|
||||
var record models.MqttListenerSetting
|
||||
if err := db.First(&record, models.MqttListenerSettingSingletonID).Error; err != nil {
|
||||
return ListenerStorageConfig{}, err
|
||||
}
|
||||
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||
Enabled: record.Enabled,
|
||||
ExpireDays: record.ExpireDays,
|
||||
DeleteExpired: record.DeleteExpired,
|
||||
}), nil
|
||||
}
|
||||
|
||||
func storeListenerStorageConfig(cfg ListenerStorageConfig) {
|
||||
listenerStorageConfig.Store(normalizeListenerStorageConfig(cfg))
|
||||
}
|
||||
|
||||
func normalizeListenerStorageConfig(cfg ListenerStorageConfig) ListenerStorageConfig {
|
||||
if cfg.ExpireDays <= 0 {
|
||||
cfg.ExpireDays = models.DefaultMqttMeasurementExpireDays
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
@@ -156,6 +156,8 @@ func SetupRouter() *gin.Engine {
|
||||
admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus)
|
||||
admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt)
|
||||
admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt)
|
||||
admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig)
|
||||
admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig)
|
||||
|
||||
admin.GET("/statistics/ai-analysis-records", statisticsController.ListAIAnalysisRecords)
|
||||
admin.GET("/statistics/ai-analysis-records/:id/pdf", statisticsController.DownloadAIAnalysisRecordPDF)
|
||||
@@ -163,6 +165,8 @@ func SetupRouter() *gin.Engine {
|
||||
admin.GET("/statistics/ai-analysis", statisticsController.StatisticsByRegion)
|
||||
admin.GET("/statistics/ai-analysis-timeline", statisticsController.TimelineStatistics)
|
||||
admin.GET("/statistics/mqtt-training-sessions", statisticsController.TrainingSessionStatisticsByRegion)
|
||||
admin.GET("/statistics/mqtt-measurements", statisticsController.ListMqttMeasurementHistory)
|
||||
admin.GET("/statistics/mqtt-measurements/chart", statisticsController.MqttMeasurementChartData)
|
||||
}
|
||||
|
||||
v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket)
|
||||
|
||||
Reference in New Issue
Block a user