Compare commits
6 Commits
25ed68be27
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| df27dfdb03 | |||
| 3f499ddf27 | |||
| 1f77e1dd6c | |||
| 0d07dc653b | |||
| b5b6d64f5e | |||
| 1b447c782d |
@@ -7,3 +7,4 @@ config.yaml
|
|||||||
*.md
|
*.md
|
||||||
*.csv
|
*.csv
|
||||||
*.docx
|
*.docx
|
||||||
|
export*.sql
|
||||||
|
|||||||
+3
-1
@@ -17,11 +17,13 @@ mqtt:
|
|||||||
port: 10237
|
port: 10237
|
||||||
username: public_client
|
username: public_client
|
||||||
password: uXC3M4ObO9KpdU
|
password: uXC3M4ObO9KpdU
|
||||||
|
gw_username: ""
|
||||||
|
gw_password: ""
|
||||||
client_id_prefix: hr-receiver
|
client_id_prefix: hr-receiver
|
||||||
region: "+"
|
region: "+"
|
||||||
use_tls: true
|
use_tls: true
|
||||||
qos: 0
|
qos: 0
|
||||||
enable_measurement_subscriptions: false
|
enable_measurement_subscriptions: true
|
||||||
enable_training_event_subscription: true
|
enable_training_event_subscription: true
|
||||||
swagger:
|
swagger:
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ type MQTTConfig struct {
|
|||||||
Port int `mapstructure:"port" yaml:"port"`
|
Port int `mapstructure:"port" yaml:"port"`
|
||||||
Username string `mapstructure:"username" yaml:"username"`
|
Username string `mapstructure:"username" yaml:"username"`
|
||||||
Password string `mapstructure:"password" yaml:"password"`
|
Password string `mapstructure:"password" yaml:"password"`
|
||||||
|
GWUsername string `mapstructure:"gw_username" yaml:"gw_username"`
|
||||||
|
GWPassword string `mapstructure:"gw_password" yaml:"gw_password"`
|
||||||
ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"`
|
ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"`
|
||||||
Region string `mapstructure:"region" yaml:"region"`
|
Region string `mapstructure:"region" yaml:"region"`
|
||||||
UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"`
|
UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"`
|
||||||
@@ -60,6 +62,7 @@ func InitConfig() {
|
|||||||
viper.AddConfigPath("./")
|
viper.AddConfigPath("./")
|
||||||
viper.SetConfigName("config")
|
viper.SetConfigName("config")
|
||||||
viper.SetConfigType("yaml")
|
viper.SetConfigType("yaml")
|
||||||
|
viper.SetDefault("mqtt.enable_measurement_subscriptions", true)
|
||||||
if err := viper.ReadInConfig(); err != nil {
|
if err := viper.ReadInConfig(); err != nil {
|
||||||
panic("Failed to read config: " + err.Error())
|
panic("Failed to read config: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
package controllers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"hr_receiver/config"
|
||||||
|
"hr_receiver/models"
|
||||||
|
"hr_receiver/mqtt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mqttListenerConfigRequest struct {
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
ExpireDays int `json:"expireDays"`
|
||||||
|
DeleteExpired bool `json:"deleteExpired"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Summary 获取MQTT监听存储配置
|
||||||
|
// @Description 获取测量数据监听启用状态、过期天数和过期删除开关
|
||||||
|
// @Tags 系统调试
|
||||||
|
// @Produce json
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||||
|
// @Router /admin/system-debug/mqtt/listener-config [get]
|
||||||
|
func (sc *SystemDebugController) GetMqttListenerConfig(c *gin.Context) {
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", mqtt.GetListenerStorageConfig())
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Summary 更新MQTT监听存储配置
|
||||||
|
// @Description 更新测量数据监听启用状态、过期天数和过期删除开关
|
||||||
|
// @Tags 系统调试
|
||||||
|
// @Accept json
|
||||||
|
// @Produce json
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "更新成功"
|
||||||
|
// @Router /admin/system-debug/mqtt/listener-config [put]
|
||||||
|
func (sc *SystemDebugController) UpdateMqttListenerConfig(c *gin.Context) {
|
||||||
|
var payload mqttListenerConfigRequest
|
||||||
|
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cfg, err := mqtt.UpdateListenerStorageConfig(config.DB, mqtt.ListenerStorageConfig{
|
||||||
|
Enabled: payload.Enabled,
|
||||||
|
ExpireDays: payload.ExpireDays,
|
||||||
|
DeleteExpired: payload.DeleteExpired,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeSuccess(c, http.StatusOK, "update success", cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartMqttMeasurementCleanupJob(db *gorm.DB) {
|
||||||
|
go func() {
|
||||||
|
runCleanup := func() {
|
||||||
|
if err := cleanupExpiredMqttMeasurementData(db); err != nil {
|
||||||
|
log.Printf("mqtt measurement cleanup failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runCleanup()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(24 * time.Hour)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
runCleanup()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanupExpiredMqttMeasurementData(db *gorm.DB) error {
|
||||||
|
cfg := mqtt.GetListenerStorageConfig()
|
||||||
|
if !cfg.DeleteExpired {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cutoffMillis := time.Now().AddDate(0, 0, -cfg.ExpireDays).UnixMilli()
|
||||||
|
modelsToClean := []interface{}{
|
||||||
|
&models.MqttHeartRateRecord{},
|
||||||
|
&models.MqttStepCountRecord{},
|
||||||
|
&models.MqttGatewayStatusRecord{},
|
||||||
|
}
|
||||||
|
for _, model := range modelsToClean {
|
||||||
|
if err := db.Unscoped().Where("received_at < ?", cutoffMillis).Delete(model).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"hr_receiver/config"
|
"hr_receiver/config"
|
||||||
"hr_receiver/models"
|
"hr_receiver/models"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -32,6 +33,54 @@ type analysisRecordListParams struct {
|
|||||||
EndTime int64 `form:"endTime"`
|
EndTime int64 `form:"endTime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mqttMeasurementHistoryParams struct {
|
||||||
|
PageNum int `form:"pageNum,default=1"`
|
||||||
|
PageSize int `form:"pageSize,default=50"`
|
||||||
|
DataType string `form:"dataType"`
|
||||||
|
RegionID uint32 `form:"regionId"`
|
||||||
|
Addr string `form:"addr"`
|
||||||
|
ValueOperator string `form:"valueOperator"`
|
||||||
|
Value *int64 `form:"value"`
|
||||||
|
StartTime int64 `form:"startTime"`
|
||||||
|
EndTime int64 `form:"endTime"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type mqttMeasurementHistoryItem struct {
|
||||||
|
ID uint `json:"id"`
|
||||||
|
DataType string `json:"dataType"`
|
||||||
|
Identifier string `json:"identifier"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
RegionID uint32 `json:"regionId"`
|
||||||
|
Addr string `json:"addr"`
|
||||||
|
Value int64 `json:"value"`
|
||||||
|
ValueLabel string `json:"valueLabel"`
|
||||||
|
PacketNum uint32 `json:"packetNum"`
|
||||||
|
GatewayMAC string `json:"gatewayMac"`
|
||||||
|
Battery uint32 `json:"battery"`
|
||||||
|
IsActive bool `json:"isActive"`
|
||||||
|
IsOnSkin bool `json:"isOnSkin"`
|
||||||
|
SignalRSSINeg float64 `json:"signalRssiNeg"`
|
||||||
|
SNR float64 `json:"snr"`
|
||||||
|
ReceivedAt int64 `json:"receivedAt"`
|
||||||
|
CreatedAtMilli int64 `json:"createdAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type mqttMeasurementHistorySummary struct {
|
||||||
|
AvgValue float64 `json:"avgValue"`
|
||||||
|
MaxValue int64 `json:"maxValue"`
|
||||||
|
MinValue int64 `json:"minValue"`
|
||||||
|
RecordCount int64 `json:"recordCount"`
|
||||||
|
UniqueAddrs int64 `json:"uniqueAddrs"`
|
||||||
|
ValueLabel string `json:"valueLabel"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
mqttChartRawPointLimit int64 = 5000
|
||||||
|
mqttChartTargetPointsTotal int64 = 6000
|
||||||
|
mqttChartMinPointsPerAddr int64 = 120
|
||||||
|
mqttChartMaxPointsPerAddr int64 = 600
|
||||||
|
)
|
||||||
|
|
||||||
// --- 查询接口 ---
|
// --- 查询接口 ---
|
||||||
|
|
||||||
// @Summary 获取AI分析记录列表
|
// @Summary 获取AI分析记录列表
|
||||||
@@ -94,6 +143,535 @@ func (sc *StatisticsController) ListAIAnalysisRecords(c *gin.Context) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Summary 获取MQTT测量历史数据
|
||||||
|
// @Description 分页查询MQTT测量历史数据,支持按数据类型、区域、地址、数值比较和时间范围筛选
|
||||||
|
// @Tags 统计管理
|
||||||
|
// @Produce json
|
||||||
|
// @Param pageNum query int false "页码(默认1)"
|
||||||
|
// @Param pageSize query int false "每页数量(默认50,最大200)"
|
||||||
|
// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate"
|
||||||
|
// @Param regionId query int false "区域ID"
|
||||||
|
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
|
||||||
|
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
|
||||||
|
// @Param value query int false "数值比较值"
|
||||||
|
// @Param startTime query int false "开始时间(毫秒时间戳)"
|
||||||
|
// @Param endTime query int false "结束时间(毫秒时间戳)"
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||||
|
// @Router /admin/statistics/mqtt-measurements [get]
|
||||||
|
func (sc *StatisticsController) ListMqttMeasurementHistory(c *gin.Context) {
|
||||||
|
var params mqttMeasurementHistoryParams
|
||||||
|
if err := c.ShouldBindQuery(¶ms); err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if params.PageNum < 1 {
|
||||||
|
params.PageNum = 1
|
||||||
|
}
|
||||||
|
if params.PageSize < 1 || params.PageSize > 200 {
|
||||||
|
params.PageSize = 50
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(params.DataType) == "" {
|
||||||
|
params.DataType = "heart_rate"
|
||||||
|
}
|
||||||
|
|
||||||
|
switch params.DataType {
|
||||||
|
case "heart_rate":
|
||||||
|
sc.listHeartRateMeasurementHistory(c, params)
|
||||||
|
case "step_count":
|
||||||
|
sc.listStepCountMeasurementHistory(c, params)
|
||||||
|
default:
|
||||||
|
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Summary 获取MQTT测量图表数据
|
||||||
|
// @Description 查询MQTT测量图表数据,返回同筛选条件下的完整历史序列,不参与分页
|
||||||
|
// @Tags 统计管理
|
||||||
|
// @Produce json
|
||||||
|
// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate"
|
||||||
|
// @Param regionId query int false "区域ID"
|
||||||
|
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
|
||||||
|
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
|
||||||
|
// @Param value query int false "数值比较值"
|
||||||
|
// @Param startTime query int false "开始时间(毫秒时间戳)"
|
||||||
|
// @Param endTime query int false "结束时间(毫秒时间戳)"
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||||
|
// @Router /admin/statistics/mqtt-measurements/chart [get]
|
||||||
|
func (sc *StatisticsController) MqttMeasurementChartData(c *gin.Context) {
|
||||||
|
var params mqttMeasurementHistoryParams
|
||||||
|
if err := c.ShouldBindQuery(¶ms); err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(params.DataType) == "" {
|
||||||
|
params.DataType = "heart_rate"
|
||||||
|
}
|
||||||
|
|
||||||
|
switch params.DataType {
|
||||||
|
case "heart_rate":
|
||||||
|
sc.chartHeartRateMeasurementHistory(c, params)
|
||||||
|
case "step_count":
|
||||||
|
sc.chartStepCountMeasurementHistory(c, params)
|
||||||
|
default:
|
||||||
|
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) listHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||||
|
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
|
||||||
|
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to count mqtt heart rate records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := (params.PageNum - 1) * params.PageSize
|
||||||
|
var records []models.MqttHeartRateRecord
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Order("received_at DESC").
|
||||||
|
Offset(offset).
|
||||||
|
Limit(params.PageSize).
|
||||||
|
Find(&records).Error; err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var summary mqttMeasurementHistorySummary
|
||||||
|
summary.ValueLabel = "心率"
|
||||||
|
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||||
|
for _, record := range records {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
ID: record.ID,
|
||||||
|
DataType: "heart_rate",
|
||||||
|
Identifier: record.Identifier,
|
||||||
|
Topic: record.Topic,
|
||||||
|
RegionID: record.RegionID,
|
||||||
|
Addr: record.BeltAddr,
|
||||||
|
Value: int64(record.HeartRate),
|
||||||
|
ValueLabel: "心率",
|
||||||
|
PacketNum: record.PacketNum,
|
||||||
|
GatewayMAC: record.GatewayMAC,
|
||||||
|
Battery: record.Battery,
|
||||||
|
IsActive: record.IsActive,
|
||||||
|
IsOnSkin: record.IsOnSkin,
|
||||||
|
SignalRSSINeg: record.SignalRSSINeg,
|
||||||
|
SNR: record.SNR,
|
||||||
|
ReceivedAt: record.ReceivedAt,
|
||||||
|
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||||
|
"list": items,
|
||||||
|
"pagination": gin.H{
|
||||||
|
"currentPage": params.PageNum,
|
||||||
|
"pageSize": params.PageSize,
|
||||||
|
"totalList": total,
|
||||||
|
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
|
||||||
|
},
|
||||||
|
"summary": summary,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||||
|
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
|
||||||
|
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
|
||||||
|
|
||||||
|
var summary mqttMeasurementHistorySummary
|
||||||
|
summary.ValueLabel = "心率"
|
||||||
|
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
items, bucketSizeMs, sampled, err := sc.buildHeartRateChartItems(baseQuery, params, summary)
|
||||||
|
if err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||||
|
"list": items,
|
||||||
|
"summary": summary,
|
||||||
|
"sampled": sampled,
|
||||||
|
"bucketSizeMs": bucketSizeMs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) listStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||||
|
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
|
||||||
|
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to count mqtt step count records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := (params.PageNum - 1) * params.PageSize
|
||||||
|
var records []models.MqttStepCountRecord
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Order("received_at DESC").
|
||||||
|
Offset(offset).
|
||||||
|
Limit(params.PageSize).
|
||||||
|
Find(&records).Error; err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var summary mqttMeasurementHistorySummary
|
||||||
|
summary.ValueLabel = "步数"
|
||||||
|
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||||
|
for _, record := range records {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
ID: record.ID,
|
||||||
|
DataType: "step_count",
|
||||||
|
Identifier: record.Identifier,
|
||||||
|
Topic: record.Topic,
|
||||||
|
RegionID: record.RegionID,
|
||||||
|
Addr: record.BeltAddr,
|
||||||
|
Value: int64(record.StepCount),
|
||||||
|
ValueLabel: "步数",
|
||||||
|
PacketNum: record.PacketNum,
|
||||||
|
GatewayMAC: record.GatewayMAC,
|
||||||
|
Battery: 0,
|
||||||
|
IsActive: false,
|
||||||
|
IsOnSkin: false,
|
||||||
|
SignalRSSINeg: record.SignalRSSINeg,
|
||||||
|
SNR: record.SNR,
|
||||||
|
ReceivedAt: record.ReceivedAt,
|
||||||
|
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||||
|
"list": items,
|
||||||
|
"pagination": gin.H{
|
||||||
|
"currentPage": params.PageNum,
|
||||||
|
"pageSize": params.PageSize,
|
||||||
|
"totalList": total,
|
||||||
|
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
|
||||||
|
},
|
||||||
|
"summary": summary,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
|
||||||
|
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
|
||||||
|
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
|
||||||
|
|
||||||
|
var summary mqttMeasurementHistorySummary
|
||||||
|
summary.ValueLabel = "步数"
|
||||||
|
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
items, bucketSizeMs, sampled, err := sc.buildStepCountChartItems(baseQuery, params, summary)
|
||||||
|
if err != nil {
|
||||||
|
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||||
|
"list": items,
|
||||||
|
"summary": summary,
|
||||||
|
"sampled": sampled,
|
||||||
|
"bucketSizeMs": bucketSizeMs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyMqttMeasurementFilters(query *gorm.DB, params mqttMeasurementHistoryParams, addrField, valueField, timeField string) *gorm.DB {
|
||||||
|
if params.RegionID > 0 {
|
||||||
|
query = query.Where("region_id = ?", params.RegionID)
|
||||||
|
}
|
||||||
|
if addr := strings.TrimSpace(params.Addr); addr != "" {
|
||||||
|
query = query.Where(addrField+" LIKE ?", "%"+addr+"%")
|
||||||
|
}
|
||||||
|
if params.StartTime > 0 {
|
||||||
|
query = query.Where(timeField+" >= ?", params.StartTime)
|
||||||
|
}
|
||||||
|
if params.EndTime > 0 {
|
||||||
|
query = query.Where(timeField+" <= ?", params.EndTime)
|
||||||
|
}
|
||||||
|
if params.Value != nil {
|
||||||
|
switch strings.TrimSpace(params.ValueOperator) {
|
||||||
|
case "", "eq":
|
||||||
|
query = query.Where(valueField+" = ?", *params.Value)
|
||||||
|
case "gt":
|
||||||
|
query = query.Where(valueField+" > ?", *params.Value)
|
||||||
|
case "gte":
|
||||||
|
query = query.Where(valueField+" >= ?", *params.Value)
|
||||||
|
case "lt":
|
||||||
|
query = query.Where(valueField+" < ?", *params.Value)
|
||||||
|
case "lte":
|
||||||
|
query = query.Where(valueField+" <= ?", *params.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return query
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, summary *mqttMeasurementHistorySummary) error {
|
||||||
|
type rawSummary struct {
|
||||||
|
RecordCount int64
|
||||||
|
UniqueAddrs int64
|
||||||
|
MinValue int64
|
||||||
|
MaxValue int64
|
||||||
|
AvgValue float64
|
||||||
|
}
|
||||||
|
|
||||||
|
var result rawSummary
|
||||||
|
if err := query.Session(&gorm.Session{}).Select(
|
||||||
|
"COUNT(*) as record_count, COUNT(DISTINCT " + addrField + ") as unique_addrs, COALESCE(MIN(" + valueField + "), 0) as min_value, COALESCE(MAX(" + valueField + "), 0) as max_value, COALESCE(AVG(" + valueField + "), 0) as avg_value",
|
||||||
|
).Scan(&result).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
summary.RecordCount = result.RecordCount
|
||||||
|
summary.UniqueAddrs = result.UniqueAddrs
|
||||||
|
summary.MinValue = result.MinValue
|
||||||
|
summary.MaxValue = result.MaxValue
|
||||||
|
summary.AvgValue = result.AvgValue
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func estimateChartBucketSizeMs(params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) int64 {
|
||||||
|
if params.StartTime <= 0 || params.EndTime <= params.StartTime {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if summary.RecordCount <= mqttChartRawPointLimit {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
uniqueAddrs := summary.UniqueAddrs
|
||||||
|
if uniqueAddrs < 1 {
|
||||||
|
uniqueAddrs = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
pointsPerAddr := mqttChartTargetPointsTotal / uniqueAddrs
|
||||||
|
if pointsPerAddr < mqttChartMinPointsPerAddr {
|
||||||
|
pointsPerAddr = mqttChartMinPointsPerAddr
|
||||||
|
}
|
||||||
|
if pointsPerAddr > mqttChartMaxPointsPerAddr {
|
||||||
|
pointsPerAddr = mqttChartMaxPointsPerAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
rangeMs := params.EndTime - params.StartTime
|
||||||
|
if rangeMs <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketSizeMs := int64(math.Ceil(float64(rangeMs) / float64(pointsPerAddr)))
|
||||||
|
return normalizeBucketSizeMs(bucketSizeMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBucketSizeMs(bucketSizeMs int64) int64 {
|
||||||
|
if bucketSizeMs <= 1000 {
|
||||||
|
return 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
candidates := []int64{
|
||||||
|
1000,
|
||||||
|
2000,
|
||||||
|
5000,
|
||||||
|
10 * 1000,
|
||||||
|
15 * 1000,
|
||||||
|
30 * 1000,
|
||||||
|
60 * 1000,
|
||||||
|
2 * 60 * 1000,
|
||||||
|
5 * 60 * 1000,
|
||||||
|
10 * 60 * 1000,
|
||||||
|
15 * 60 * 1000,
|
||||||
|
30 * 60 * 1000,
|
||||||
|
60 * 60 * 1000,
|
||||||
|
2 * 60 * 60 * 1000,
|
||||||
|
6 * 60 * 60 * 1000,
|
||||||
|
12 * 60 * 60 * 1000,
|
||||||
|
24 * 60 * 60 * 1000,
|
||||||
|
}
|
||||||
|
for _, candidate := range candidates {
|
||||||
|
if bucketSizeMs <= candidate {
|
||||||
|
return candidate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 24 * 60 * 60 * 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildChartBucketExpr(bucketSizeMs int64, timeField string) string {
|
||||||
|
return "((" + timeField + " / " + strconv.FormatInt(bucketSizeMs, 10) + ") * " + strconv.FormatInt(bucketSizeMs, 10) + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) buildHeartRateChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) {
|
||||||
|
bucketSizeMs := estimateChartBucketSizeMs(params, summary)
|
||||||
|
if bucketSizeMs == 0 {
|
||||||
|
var records []models.MqttHeartRateRecord
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Order("received_at ASC").
|
||||||
|
Find(&records).Error; err != nil {
|
||||||
|
return nil, 0, false, err
|
||||||
|
}
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||||
|
for _, record := range records {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
ID: record.ID,
|
||||||
|
DataType: "heart_rate",
|
||||||
|
Identifier: record.Identifier,
|
||||||
|
Topic: record.Topic,
|
||||||
|
RegionID: record.RegionID,
|
||||||
|
Addr: record.BeltAddr,
|
||||||
|
Value: int64(record.HeartRate),
|
||||||
|
ValueLabel: "心率",
|
||||||
|
PacketNum: record.PacketNum,
|
||||||
|
GatewayMAC: record.GatewayMAC,
|
||||||
|
Battery: record.Battery,
|
||||||
|
IsActive: record.IsActive,
|
||||||
|
IsOnSkin: record.IsOnSkin,
|
||||||
|
SignalRSSINeg: record.SignalRSSINeg,
|
||||||
|
SNR: record.SNR,
|
||||||
|
ReceivedAt: record.ReceivedAt,
|
||||||
|
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, 0, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at")
|
||||||
|
type rawChartItem struct {
|
||||||
|
RegionID uint32
|
||||||
|
Addr string
|
||||||
|
Value float64
|
||||||
|
PacketNum int64
|
||||||
|
SignalRSSINeg float64
|
||||||
|
SNR float64
|
||||||
|
ReceivedAt int64
|
||||||
|
CreatedAtMilli int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var rows []rawChartItem
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Select(
|
||||||
|
"region_id, belt_addr as addr, ROUND(AVG(heart_rate)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli",
|
||||||
|
).
|
||||||
|
Group("region_id, belt_addr, " + bucketExpr).
|
||||||
|
Order("received_at ASC, addr ASC").
|
||||||
|
Scan(&rows).Error; err != nil {
|
||||||
|
return nil, bucketSizeMs, true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(rows))
|
||||||
|
for _, row := range rows {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
DataType: "heart_rate",
|
||||||
|
RegionID: row.RegionID,
|
||||||
|
Addr: row.Addr,
|
||||||
|
Value: int64(math.Round(row.Value)),
|
||||||
|
ValueLabel: "心率",
|
||||||
|
PacketNum: uint32(maxInt64(row.PacketNum, 0)),
|
||||||
|
SignalRSSINeg: row.SignalRSSINeg,
|
||||||
|
SNR: row.SNR,
|
||||||
|
ReceivedAt: row.ReceivedAt,
|
||||||
|
CreatedAtMilli: row.CreatedAtMilli,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, bucketSizeMs, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *StatisticsController) buildStepCountChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) {
|
||||||
|
bucketSizeMs := estimateChartBucketSizeMs(params, summary)
|
||||||
|
if bucketSizeMs == 0 {
|
||||||
|
var records []models.MqttStepCountRecord
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Order("received_at ASC").
|
||||||
|
Find(&records).Error; err != nil {
|
||||||
|
return nil, 0, false, err
|
||||||
|
}
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||||
|
for _, record := range records {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
ID: record.ID,
|
||||||
|
DataType: "step_count",
|
||||||
|
Identifier: record.Identifier,
|
||||||
|
Topic: record.Topic,
|
||||||
|
RegionID: record.RegionID,
|
||||||
|
Addr: record.BeltAddr,
|
||||||
|
Value: int64(record.StepCount),
|
||||||
|
ValueLabel: "步数",
|
||||||
|
PacketNum: record.PacketNum,
|
||||||
|
GatewayMAC: record.GatewayMAC,
|
||||||
|
Battery: 0,
|
||||||
|
IsActive: false,
|
||||||
|
IsOnSkin: false,
|
||||||
|
SignalRSSINeg: record.SignalRSSINeg,
|
||||||
|
SNR: record.SNR,
|
||||||
|
ReceivedAt: record.ReceivedAt,
|
||||||
|
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, 0, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at")
|
||||||
|
type rawChartItem struct {
|
||||||
|
RegionID uint32
|
||||||
|
Addr string
|
||||||
|
Value float64
|
||||||
|
PacketNum int64
|
||||||
|
SignalRSSINeg float64
|
||||||
|
SNR float64
|
||||||
|
ReceivedAt int64
|
||||||
|
CreatedAtMilli int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var rows []rawChartItem
|
||||||
|
if err := baseQuery.Session(&gorm.Session{}).
|
||||||
|
Select(
|
||||||
|
"region_id, belt_addr as addr, ROUND(AVG(step_count)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli",
|
||||||
|
).
|
||||||
|
Group("region_id, belt_addr, " + bucketExpr).
|
||||||
|
Order("received_at ASC, addr ASC").
|
||||||
|
Scan(&rows).Error; err != nil {
|
||||||
|
return nil, bucketSizeMs, true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]mqttMeasurementHistoryItem, 0, len(rows))
|
||||||
|
for _, row := range rows {
|
||||||
|
items = append(items, mqttMeasurementHistoryItem{
|
||||||
|
DataType: "step_count",
|
||||||
|
RegionID: row.RegionID,
|
||||||
|
Addr: row.Addr,
|
||||||
|
Value: int64(math.Round(row.Value)),
|
||||||
|
ValueLabel: "步数",
|
||||||
|
PacketNum: uint32(maxInt64(row.PacketNum, 0)),
|
||||||
|
SignalRSSINeg: row.SignalRSSINeg,
|
||||||
|
SNR: row.SNR,
|
||||||
|
ReceivedAt: row.ReceivedAt,
|
||||||
|
CreatedAtMilli: row.CreatedAtMilli,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, bucketSizeMs, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func maxInt64(value, floor int64) int64 {
|
||||||
|
if value < floor {
|
||||||
|
return floor
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
// --- 删除接口 ---
|
// --- 删除接口 ---
|
||||||
|
|
||||||
// @Summary 删除AI分析记录
|
// @Summary 删除AI分析记录
|
||||||
|
|||||||
@@ -19,6 +19,13 @@ type mqttDebugStartRequest struct {
|
|||||||
PersistToDatabase bool `json:"persistToDatabase"`
|
PersistToDatabase bool `json:"persistToDatabase"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mqttReplayStartRequest struct {
|
||||||
|
Addr string `json:"addr"`
|
||||||
|
EndTime int64 `json:"endTime"`
|
||||||
|
RegionID uint32 `json:"regionId"`
|
||||||
|
StartTime int64 `json:"startTime"`
|
||||||
|
}
|
||||||
|
|
||||||
var debugUpgrader = websocket.Upgrader{
|
var debugUpgrader = websocket.Upgrader{
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
return true
|
return true
|
||||||
@@ -89,6 +96,70 @@ func (sc *SystemDebugController) StopMqtt(c *gin.Context) {
|
|||||||
writeSuccess(c, http.StatusOK, "stop success", service.Status())
|
writeSuccess(c, http.StatusOK, "stop success", service.Status())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Summary 获取MQTT重放状态
|
||||||
|
// @Description 获取心率历史数据 MQTT 重放状态
|
||||||
|
// @Tags 系统调试
|
||||||
|
// @Produce json
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||||
|
// @Router /admin/system-debug/mqtt/replay/status [get]
|
||||||
|
func (sc *SystemDebugController) MqttReplayStatus(c *gin.Context) {
|
||||||
|
service := mqtt.GetReplayService()
|
||||||
|
if service == nil {
|
||||||
|
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeSuccess(c, http.StatusOK, "query success", service.Status())
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Summary 启动MQTT历史数据重放
|
||||||
|
// @Description 按指定时间范围重放历史心率数据到 MQTT
|
||||||
|
// @Tags 系统调试
|
||||||
|
// @Accept json
|
||||||
|
// @Produce json
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "启动成功"
|
||||||
|
// @Router /admin/system-debug/mqtt/replay/start [post]
|
||||||
|
func (sc *SystemDebugController) StartMqttReplay(c *gin.Context) {
|
||||||
|
service := mqtt.GetReplayService()
|
||||||
|
if service == nil {
|
||||||
|
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var payload mqttReplayStartRequest
|
||||||
|
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
status, err := service.Start(mqtt.ReplayStartRequest{
|
||||||
|
Addr: payload.Addr,
|
||||||
|
EndTime: payload.EndTime,
|
||||||
|
RegionID: payload.RegionID,
|
||||||
|
StartTime: payload.StartTime,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
writeError(c, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeSuccess(c, http.StatusOK, "start success", status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Summary 停止MQTT历史数据重放
|
||||||
|
// @Description 停止当前正在执行的心率历史数据 MQTT 重放
|
||||||
|
// @Tags 系统调试
|
||||||
|
// @Produce json
|
||||||
|
// @Security BearerAuth
|
||||||
|
// @Success 200 {object} SwagAPIResponse "停止成功"
|
||||||
|
// @Router /admin/system-debug/mqtt/replay/stop [post]
|
||||||
|
func (sc *SystemDebugController) StopMqttReplay(c *gin.Context) {
|
||||||
|
service := mqtt.GetReplayService()
|
||||||
|
if service == nil {
|
||||||
|
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeSuccess(c, http.StatusOK, "stop success", service.Stop())
|
||||||
|
}
|
||||||
|
|
||||||
// @Summary MQTT WebSocket连接
|
// @Summary MQTT WebSocket连接
|
||||||
// @Description 通过WebSocket实时监听MQTT消息(需要SuperAdmin权限)
|
// @Description 通过WebSocket实时监听MQTT消息(需要SuperAdmin权限)
|
||||||
// @Tags 系统调试
|
// @Tags 系统调试
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ func main() {
|
|||||||
&models.MqttStepCountRecord{},
|
&models.MqttStepCountRecord{},
|
||||||
&models.MqttGatewayStatusRecord{},
|
&models.MqttGatewayStatusRecord{},
|
||||||
&models.MqttTrainingSessionRecord{},
|
&models.MqttTrainingSessionRecord{},
|
||||||
|
&models.MqttListenerSetting{},
|
||||||
&models.Gateway{},
|
&models.Gateway{},
|
||||||
&models.AIAnalysisRecord{},
|
&models.AIAnalysisRecord{},
|
||||||
&models.AIPricingConfig{},
|
&models.AIPricingConfig{},
|
||||||
@@ -72,15 +73,23 @@ func main() {
|
|||||||
if err := models.EnsureDefaultProductPrototypes(config.DB); err != nil {
|
if err := models.EnsureDefaultProductPrototypes(config.DB); err != nil {
|
||||||
log.Printf("default product prototypes init failed: %v", err)
|
log.Printf("default product prototypes init failed: %v", err)
|
||||||
}
|
}
|
||||||
|
if err := models.EnsureDefaultMqttListenerSetting(config.DB); err != nil {
|
||||||
|
log.Printf("default mqtt listener setting init failed: %v", err)
|
||||||
|
}
|
||||||
if err := models.EnsureDefaultProjectProductTemplates(config.DB); err != nil {
|
if err := models.EnsureDefaultProjectProductTemplates(config.DB); err != nil {
|
||||||
log.Printf("default project product templates init failed: %v", err)
|
log.Printf("default project product templates init failed: %v", err)
|
||||||
}
|
}
|
||||||
|
if err := mqtt.InitListenerStorageConfig(config.DB); err != nil {
|
||||||
|
log.Printf("mqtt listener config init failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := mqtt.Start(config.DB, config.App.MQTT); err != nil {
|
if err := mqtt.Start(config.DB, config.App.MQTT); err != nil {
|
||||||
log.Printf("mqtt listener start failed: %v", err)
|
log.Printf("mqtt listener start failed: %v", err)
|
||||||
}
|
}
|
||||||
mqtt.InitDebugService(config.DB, config.App.MQTT)
|
mqtt.InitDebugService(config.DB, config.App.MQTT)
|
||||||
|
mqtt.InitReplayService(config.DB, config.App.MQTT)
|
||||||
controllers.StartLessonPlanCleanupJob(config.DB)
|
controllers.StartLessonPlanCleanupJob(config.DB)
|
||||||
|
controllers.StartMqttMeasurementCleanupJob(config.DB)
|
||||||
|
|
||||||
// 启动服务
|
// 启动服务
|
||||||
r := routes.SetupRouter()
|
r := routes.SetupRouter()
|
||||||
|
|||||||
@@ -0,0 +1,49 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import "gorm.io/gorm"
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultMqttMeasurementExpireDays = 7
|
||||||
|
MqttListenerSettingSingletonID = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
type MqttListenerSetting struct {
|
||||||
|
gorm.Model
|
||||||
|
Enabled bool `gorm:"not null;default:true" json:"enabled"`
|
||||||
|
ExpireDays int `gorm:"type:int;not null;default:7" json:"expireDays"`
|
||||||
|
DeleteExpired bool `gorm:"not null;default:true" json:"deleteExpired"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (MqttListenerSetting) TableName() string {
|
||||||
|
return "mqtt_listener_settings"
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultMqttListenerSetting() MqttListenerSetting {
|
||||||
|
return MqttListenerSetting{
|
||||||
|
Model: gorm.Model{ID: MqttListenerSettingSingletonID},
|
||||||
|
Enabled: true,
|
||||||
|
ExpireDays: DefaultMqttMeasurementExpireDays,
|
||||||
|
DeleteExpired: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func EnsureDefaultMqttListenerSetting(db *gorm.DB) error {
|
||||||
|
defaults := DefaultMqttListenerSetting()
|
||||||
|
|
||||||
|
var existing MqttListenerSetting
|
||||||
|
err := db.First(&existing, MqttListenerSettingSingletonID).Error
|
||||||
|
if err == nil {
|
||||||
|
updates := map[string]interface{}{}
|
||||||
|
if existing.ExpireDays <= 0 {
|
||||||
|
updates["expire_days"] = DefaultMqttMeasurementExpireDays
|
||||||
|
}
|
||||||
|
if len(updates) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return db.Model(&existing).Updates(updates).Error
|
||||||
|
}
|
||||||
|
if err != nil && err != gorm.ErrRecordNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return db.Create(&defaults).Error
|
||||||
|
}
|
||||||
@@ -245,8 +245,15 @@ func (s *DebugService) maybePersist(record interface{}) {
|
|||||||
if !enabled {
|
if !enabled {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil {
|
tx := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||||
log.Printf("mqtt debug persist failed type=%T err=%v", record, err)
|
if tx.Error != nil {
|
||||||
|
log.Printf("mqtt debug persist failed type=%T err=%v", record, tx.Error)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt debug", "inserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt debug", "skipped duplicate", record)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+58
-12
@@ -128,13 +128,13 @@ func (l *Listener) connect() error {
|
|||||||
|
|
||||||
func (l *Listener) subscribe(client mqtt.Client) error {
|
func (l *Listener) subscribe(client mqtt.Client) error {
|
||||||
var topics []string
|
var topics []string
|
||||||
if l.cfg.EnableMeasurementSubscriptions {
|
// Measurement topics stay subscribed so runtime config changes take effect
|
||||||
topics = append(topics,
|
// immediately without requiring an MQTT reconnect.
|
||||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region),
|
topics = append(topics,
|
||||||
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region),
|
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region),
|
||||||
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region),
|
fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region),
|
||||||
)
|
fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region),
|
||||||
}
|
)
|
||||||
if l.cfg.EnableTrainingEventSubscription {
|
if l.cfg.EnableTrainingEventSubscription {
|
||||||
topics = append(topics, "/whgw/v2/region/test/+/+")
|
topics = append(topics, "/whgw/v2/region/test/+/+")
|
||||||
}
|
}
|
||||||
@@ -161,6 +161,10 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
|||||||
if len(msg.Payload()) == 0 {
|
if len(msg.Payload()) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if consumeReplayPayload(msg.Topic(), msg.Payload()) {
|
||||||
|
log.Printf("mqtt listener skipped replay payload topic=%s", msg.Topic())
|
||||||
|
return
|
||||||
|
}
|
||||||
now := time.Now().UnixMilli()
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg
|
var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg
|
||||||
@@ -180,12 +184,21 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
|||||||
|
|
||||||
switch payload := packet.Choice.(type) {
|
switch payload := packet.Choice.(type) {
|
||||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement:
|
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement:
|
||||||
|
if !GetListenerStorageConfig().Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now)
|
record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now)
|
||||||
l.enqueue(&record)
|
l.enqueue(&record)
|
||||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement:
|
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfStepCountMeasurement:
|
||||||
|
if !GetListenerStorageConfig().Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now)
|
record := buildStepCountRecord(payload.NtfStepCountMeasurement, msg.Topic(), now)
|
||||||
l.enqueue(&record)
|
l.enqueue(&record)
|
||||||
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus:
|
case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfGatewayStatus:
|
||||||
|
if !GetListenerStorageConfig().Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now)
|
record := buildGatewayStatusRecord(payload.NtfGatewayStatus, msg.Topic(), now)
|
||||||
l.enqueue(&record)
|
l.enqueue(&record)
|
||||||
default:
|
default:
|
||||||
@@ -222,7 +235,16 @@ func (l *Listener) persistRecord(record interface{}) error {
|
|||||||
case *models.MqttTrainingSessionRecord:
|
case *models.MqttTrainingSessionRecord:
|
||||||
return l.persistTrainingSession(r)
|
return l.persistTrainingSession(r)
|
||||||
default:
|
default:
|
||||||
return l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error
|
tx := l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt listener", "inserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,10 +268,34 @@ func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionReco
|
|||||||
assignments["ended_at"] = *record.EndedAt
|
assignments["ended_at"] = *record.EndedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.db.Clauses(clause.OnConflict{
|
tx := l.db.Clauses(clause.OnConflict{
|
||||||
Columns: []clause.Column{{Name: "identifier"}},
|
Columns: []clause.Column{{Name: "identifier"}},
|
||||||
DoUpdates: clause.Assignments(assignments),
|
DoUpdates: clause.Assignments(assignments),
|
||||||
}).Create(record).Error
|
}).Create(record)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt listener", "upserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func logPersistResult(source, action string, record interface{}) {
|
||||||
|
switch r := record.(type) {
|
||||||
|
case *models.MqttHeartRateRecord:
|
||||||
|
log.Printf("%s %s heart_rate region=%d addr=%s hr=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.HeartRate, r.PacketNum, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttStepCountRecord:
|
||||||
|
log.Printf("%s %s step_count region=%d addr=%s steps=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.StepCount, r.PacketNum, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttGatewayStatusRecord:
|
||||||
|
log.Printf("%s %s gateway_status region=%d gateway=%s rxCount=%d uptimeMs=%d identifier=%s topic=%s", source, action, r.RegionID, r.GatewayMAC, r.RxCount, r.UptimeMs, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttTrainingSessionRecord:
|
||||||
|
log.Printf("%s %s training_session region=%d testId=%s event=%s identifier=%s publishedAt=%d topic=%s", source, action, r.RegionID, r.TestID, r.EventType, r.Identifier, r.PublishedAt, r.Topic)
|
||||||
|
default:
|
||||||
|
log.Printf("%s %s record type=%T", source, action, record)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
|
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
|
||||||
@@ -265,7 +311,7 @@ func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, no
|
|||||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||||
|
|
||||||
return models.MqttHeartRateRecord{
|
return models.MqttHeartRateRecord{
|
||||||
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
RegionID: regionID,
|
RegionID: regionID,
|
||||||
GatewayMAC: gatewayMAC,
|
GatewayMAC: gatewayMAC,
|
||||||
@@ -309,7 +355,7 @@ func buildStepCountRecord(measurement *whgw_hrpb.StepCountMeasurement, topic str
|
|||||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||||
|
|
||||||
return models.MqttStepCountRecord{
|
return models.MqttStepCountRecord{
|
||||||
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
RegionID: regionID,
|
RegionID: regionID,
|
||||||
GatewayMAC: gatewayMAC,
|
GatewayMAC: gatewayMAC,
|
||||||
|
|||||||
@@ -0,0 +1,79 @@
|
|||||||
|
package mqtt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"hr_receiver/models"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListenerStorageConfig struct {
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
ExpireDays int `json:"expireDays"`
|
||||||
|
DeleteExpired bool `json:"deleteExpired"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var listenerStorageConfig atomic.Value
|
||||||
|
|
||||||
|
func InitListenerStorageConfig(db *gorm.DB) error {
|
||||||
|
cfg, err := loadListenerStorageConfig(db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
storeListenerStorageConfig(cfg)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetListenerStorageConfig() ListenerStorageConfig {
|
||||||
|
if cfg, ok := listenerStorageConfig.Load().(ListenerStorageConfig); ok {
|
||||||
|
return normalizeListenerStorageConfig(cfg)
|
||||||
|
}
|
||||||
|
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||||
|
Enabled: true,
|
||||||
|
ExpireDays: models.DefaultMqttMeasurementExpireDays,
|
||||||
|
DeleteExpired: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpdateListenerStorageConfig(db *gorm.DB, cfg ListenerStorageConfig) (ListenerStorageConfig, error) {
|
||||||
|
cfg = normalizeListenerStorageConfig(cfg)
|
||||||
|
if cfg.ExpireDays <= 0 {
|
||||||
|
return ListenerStorageConfig{}, fmt.Errorf("expireDays must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
record := models.MqttListenerSetting{
|
||||||
|
Model: gorm.Model{ID: models.MqttListenerSettingSingletonID},
|
||||||
|
Enabled: cfg.Enabled,
|
||||||
|
ExpireDays: cfg.ExpireDays,
|
||||||
|
DeleteExpired: cfg.DeleteExpired,
|
||||||
|
}
|
||||||
|
if err := db.Save(&record).Error; err != nil {
|
||||||
|
return ListenerStorageConfig{}, err
|
||||||
|
}
|
||||||
|
storeListenerStorageConfig(cfg)
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadListenerStorageConfig(db *gorm.DB) (ListenerStorageConfig, error) {
|
||||||
|
var record models.MqttListenerSetting
|
||||||
|
if err := db.First(&record, models.MqttListenerSettingSingletonID).Error; err != nil {
|
||||||
|
return ListenerStorageConfig{}, err
|
||||||
|
}
|
||||||
|
return normalizeListenerStorageConfig(ListenerStorageConfig{
|
||||||
|
Enabled: record.Enabled,
|
||||||
|
ExpireDays: record.ExpireDays,
|
||||||
|
DeleteExpired: record.DeleteExpired,
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeListenerStorageConfig(cfg ListenerStorageConfig) {
|
||||||
|
listenerStorageConfig.Store(normalizeListenerStorageConfig(cfg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeListenerStorageConfig(cfg ListenerStorageConfig) ListenerStorageConfig {
|
||||||
|
if cfg.ExpireDays <= 0 {
|
||||||
|
cfg.ExpireDays = models.DefaultMqttMeasurementExpireDays
|
||||||
|
}
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
@@ -0,0 +1,455 @@
|
|||||||
|
package mqtt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha1"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"hr_receiver/config"
|
||||||
|
"hr_receiver/models"
|
||||||
|
whgw_hrpb "hr_receiver/proto"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxReplayRecordCount = 20000
|
||||||
|
maxReplaySleepGap = 24 * time.Hour
|
||||||
|
replayMarkerRetention = 10 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReplayStartRequest struct {
|
||||||
|
Addr string `json:"addr"`
|
||||||
|
EndTime int64 `json:"endTime"`
|
||||||
|
RegionID uint32 `json:"regionId"`
|
||||||
|
StartTime int64 `json:"startTime"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReplayStatus struct {
|
||||||
|
Addr string `json:"addr"`
|
||||||
|
CompletedAt int64 `json:"completedAt"`
|
||||||
|
CurrentAddr string `json:"currentAddr"`
|
||||||
|
CurrentHeartRate int `json:"currentHeartRate"`
|
||||||
|
EndTime int64 `json:"endTime"`
|
||||||
|
ErrorMessage string `json:"errorMessage"`
|
||||||
|
LastPublishedAt int64 `json:"lastPublishedAt"`
|
||||||
|
LastTopic string `json:"lastTopic"`
|
||||||
|
ProcessedCount int `json:"processedCount"`
|
||||||
|
RegionID uint32 `json:"regionId"`
|
||||||
|
Running bool `json:"running"`
|
||||||
|
StartedAt int64 `json:"startedAt"`
|
||||||
|
StartTime int64 `json:"startTime"`
|
||||||
|
StoppedAt int64 `json:"stoppedAt"`
|
||||||
|
TotalCount int `json:"totalCount"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReplayService struct {
|
||||||
|
cfg config.MQTTConfig
|
||||||
|
db *gorm.DB
|
||||||
|
cancel context.CancelFunc
|
||||||
|
mu sync.RWMutex
|
||||||
|
status ReplayStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
type replayFingerprintStore struct {
|
||||||
|
items map[string]time.Time
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
globalReplayService = &ReplayService{}
|
||||||
|
replayFingerprints = &replayFingerprintStore{items: map[string]time.Time{}}
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitReplayService(db *gorm.DB, cfg config.MQTTConfig) {
|
||||||
|
globalReplayService = &ReplayService{
|
||||||
|
cfg: cfg,
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetReplayService() *ReplayService {
|
||||||
|
return globalReplayService
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) Status() ReplayStatus {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) Start(req ReplayStartRequest) (ReplayStatus, error) {
|
||||||
|
req.Addr = strings.TrimSpace(req.Addr)
|
||||||
|
if req.StartTime <= 0 || req.EndTime <= 0 {
|
||||||
|
return ReplayStatus{}, fmt.Errorf("startTime and endTime are required")
|
||||||
|
}
|
||||||
|
req.EndTime = normalizeReplayEndTime(req.EndTime)
|
||||||
|
if req.EndTime < req.StartTime {
|
||||||
|
return ReplayStatus{}, fmt.Errorf("endTime must be greater than or equal to startTime")
|
||||||
|
}
|
||||||
|
if err := validateConfig(s.cfg); err != nil {
|
||||||
|
return ReplayStatus{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.status.Running {
|
||||||
|
current := s.status
|
||||||
|
s.mu.Unlock()
|
||||||
|
return current, fmt.Errorf("mqtt replay is already running")
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
query := s.db.Model(&models.MqttHeartRateRecord{}).
|
||||||
|
Where("received_at >= ? AND received_at <= ?", req.StartTime, req.EndTime)
|
||||||
|
if req.RegionID > 0 {
|
||||||
|
query = query.Where("region_id = ?", req.RegionID)
|
||||||
|
}
|
||||||
|
if req.Addr != "" {
|
||||||
|
query = query.Where("belt_addr LIKE ?", "%"+req.Addr+"%")
|
||||||
|
}
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
if err := query.Count(&total).Error; err != nil {
|
||||||
|
return ReplayStatus{}, err
|
||||||
|
}
|
||||||
|
if total == 0 {
|
||||||
|
log.Printf("mqtt replay found no records start=%d end=%d region=%d addr=%q", req.StartTime, req.EndTime, req.RegionID, req.Addr)
|
||||||
|
return ReplayStatus{}, fmt.Errorf("no heart rate records found in the selected range")
|
||||||
|
}
|
||||||
|
if total > maxReplayRecordCount {
|
||||||
|
return ReplayStatus{}, fmt.Errorf("replay record count exceeds limit: %d > %d", total, maxReplayRecordCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
var records []models.MqttHeartRateRecord
|
||||||
|
if err := query.Order("received_at ASC, id ASC").Find(&records).Error; err != nil {
|
||||||
|
return ReplayStatus{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
status := ReplayStatus{
|
||||||
|
Addr: req.Addr,
|
||||||
|
EndTime: req.EndTime,
|
||||||
|
RegionID: req.RegionID,
|
||||||
|
Running: true,
|
||||||
|
StartedAt: time.Now().UnixMilli(),
|
||||||
|
StartTime: req.StartTime,
|
||||||
|
TotalCount: len(records),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.cancel = cancel
|
||||||
|
s.status = status
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
go s.run(ctx, records)
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) Stop() ReplayStatus {
|
||||||
|
s.mu.Lock()
|
||||||
|
cancel := s.cancel
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
s.cancel = nil
|
||||||
|
}
|
||||||
|
if s.status.Running {
|
||||||
|
s.status.Running = false
|
||||||
|
s.status.StoppedAt = time.Now().UnixMilli()
|
||||||
|
}
|
||||||
|
status := s.status
|
||||||
|
s.mu.Unlock()
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) run(ctx context.Context, records []models.MqttHeartRateRecord) {
|
||||||
|
client, err := s.connectReplayClient()
|
||||||
|
if err != nil {
|
||||||
|
s.fail(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if client.IsConnected() {
|
||||||
|
client.Disconnect(250)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for idx, record := range records {
|
||||||
|
if idx > 0 {
|
||||||
|
wait := time.Duration(record.ReceivedAt-records[idx-1].ReceivedAt) * time.Millisecond
|
||||||
|
if wait < 0 {
|
||||||
|
wait = 0
|
||||||
|
}
|
||||||
|
if wait > maxReplaySleepGap {
|
||||||
|
wait = maxReplaySleepGap
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(wait)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
timer.Stop()
|
||||||
|
s.stopWithContext()
|
||||||
|
return
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.publishRecord(client, record); err != nil {
|
||||||
|
s.fail(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.updateProgress(record, idx+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.complete()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) connectReplayClient() (mqtt.Client, error) {
|
||||||
|
opts := mqtt.NewClientOptions()
|
||||||
|
scheme := "tcp"
|
||||||
|
if s.cfg.UseTLS {
|
||||||
|
scheme = "ssl"
|
||||||
|
opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})
|
||||||
|
}
|
||||||
|
broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port)
|
||||||
|
username := strings.TrimSpace(s.cfg.GWUsername)
|
||||||
|
password := s.cfg.GWPassword
|
||||||
|
if username == "" {
|
||||||
|
username = s.cfg.Username
|
||||||
|
password = s.cfg.Password
|
||||||
|
}
|
||||||
|
opts.AddBroker(broker)
|
||||||
|
opts.SetClientID(fmt.Sprintf("%s-replay-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano()))
|
||||||
|
opts.SetUsername(username)
|
||||||
|
opts.SetPassword(password)
|
||||||
|
opts.SetKeepAlive(60 * time.Second)
|
||||||
|
opts.SetAutoReconnect(false)
|
||||||
|
opts.SetConnectRetry(false)
|
||||||
|
|
||||||
|
client := mqtt.NewClient(opts)
|
||||||
|
token := client.Connect()
|
||||||
|
if !token.WaitTimeout(15 * time.Second) {
|
||||||
|
return nil, fmt.Errorf("mqtt replay connect timeout")
|
||||||
|
}
|
||||||
|
if err := token.Error(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("mqtt replay connected broker=%s username=%s", broker, username)
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) publishRecord(client mqtt.Client, record models.MqttHeartRateRecord) error {
|
||||||
|
payloadMessage := buildReplayHeartRateMessage(record)
|
||||||
|
payload, err := proto.Marshal(payloadMessage)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal replay heart rate payload: %w", err)
|
||||||
|
}
|
||||||
|
topic := strings.TrimSpace(record.Topic)
|
||||||
|
if topic == "" {
|
||||||
|
topic = fmt.Sprintf("/whgw/v2/region/%d/measurement/band/%d/hr", record.RegionID, record.BandID)
|
||||||
|
}
|
||||||
|
markReplayPayload(topic, payload)
|
||||||
|
token := client.Publish(topic, byte(s.cfg.QoS), false, payload)
|
||||||
|
if !token.WaitTimeout(10 * time.Second) {
|
||||||
|
return fmt.Errorf("mqtt replay publish timeout")
|
||||||
|
}
|
||||||
|
if err := token.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Printf("mqtt replay published region=%d addr=%s hr=%d packet=%d topic=%s", record.RegionID, record.BeltAddr, record.HeartRate, record.PacketNum, topic)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildReplayHeartRateMessage(record models.MqttHeartRateRecord) *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg {
|
||||||
|
return &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg{
|
||||||
|
Choice: &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement{
|
||||||
|
NtfHrMeasurement: &whgw_hrpb.HrMeasurement{
|
||||||
|
HrPacket: &whgw_hrpb.HrPacket{
|
||||||
|
Hr: uint32(record.HeartRate),
|
||||||
|
Id: record.BandID,
|
||||||
|
PacketNum: record.PacketNum,
|
||||||
|
Status: &whgw_hrpb.StatusFlag{
|
||||||
|
Battery: record.Battery,
|
||||||
|
HrConfidence: whgw_hrpb.HrConfidence(record.HrConfidence),
|
||||||
|
IsActive: record.IsActive,
|
||||||
|
IsOnSkin: record.IsOnSkin,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PacketStatus: buildReplayPacketStatus(record),
|
||||||
|
GatewayInfo: &whgw_hrpb.GatewayInfo{
|
||||||
|
Extra: &whgw_hrpb.GatewayInfoExtra{
|
||||||
|
ActiveUplink: whgw_hrpb.NetworkUplinkKind(record.GatewayActiveUplink),
|
||||||
|
CellularModem: &whgw_hrpb.CellularModemInfo{
|
||||||
|
Ber: record.GatewayCellularBER,
|
||||||
|
Imei: record.GatewayCellularIMEI,
|
||||||
|
Rssi: record.GatewayCellularRSSI,
|
||||||
|
},
|
||||||
|
SchemaVersion: record.GatewaySchemaVersion,
|
||||||
|
},
|
||||||
|
GatewayMac: parseMAC(record.GatewayMAC),
|
||||||
|
RegionId: record.RegionID,
|
||||||
|
},
|
||||||
|
HubInfo: &whgw_hrpb.HubInfo{
|
||||||
|
BusId: record.HubBusID,
|
||||||
|
SubDevId: record.HubSubDevID,
|
||||||
|
RadioParameters: &whgw_hrpb.LoRaParameters{
|
||||||
|
Bw: whgw_hrpb.LoRaBW(record.HubRadioBW),
|
||||||
|
FrequencyMhz: float32(record.HubRadioFrequencyMHz),
|
||||||
|
Sf: record.HubRadioSF,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildReplayPacketStatus(record models.MqttHeartRateRecord) *whgw_hrpb.IPacketStatus {
|
||||||
|
switch strings.TrimSpace(record.PacketStatusSource) {
|
||||||
|
case "raw":
|
||||||
|
return &whgw_hrpb.IPacketStatus{
|
||||||
|
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
||||||
|
Raw: &whgw_hrpb.RawPacketStatus{
|
||||||
|
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
||||||
|
SnrPktX4: record.RawSnrPktX4,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
case "parsed":
|
||||||
|
return &whgw_hrpb.IPacketStatus{
|
||||||
|
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
||||||
|
Parsed: &whgw_hrpb.PacketStatus{
|
||||||
|
SignalRssiNeg: float32(record.SignalRSSINeg),
|
||||||
|
SnrPkt: float32(record.SNR),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if record.RawSignalRSSIX2Neg > 0 || record.RawSnrPktX4 != 0 {
|
||||||
|
return &whgw_hrpb.IPacketStatus{
|
||||||
|
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
||||||
|
Raw: &whgw_hrpb.RawPacketStatus{
|
||||||
|
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
||||||
|
SnrPktX4: record.RawSnrPktX4,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &whgw_hrpb.IPacketStatus{
|
||||||
|
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
||||||
|
Parsed: &whgw_hrpb.PacketStatus{
|
||||||
|
SignalRssiNeg: float32(record.SignalRSSINeg),
|
||||||
|
SnrPkt: float32(record.SNR),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseMAC(value string) []byte {
|
||||||
|
value = strings.TrimSpace(value)
|
||||||
|
if value == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(value, ":")
|
||||||
|
result := make([]byte, 0, len(parts))
|
||||||
|
for _, part := range parts {
|
||||||
|
if len(part) == 1 {
|
||||||
|
part = "0" + part
|
||||||
|
}
|
||||||
|
decoded, err := hex.DecodeString(part)
|
||||||
|
if err != nil || len(decoded) != 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result = append(result, decoded[0])
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) updateProgress(record models.MqttHeartRateRecord, processed int) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.status.CurrentAddr = record.BeltAddr
|
||||||
|
s.status.CurrentHeartRate = record.HeartRate
|
||||||
|
s.status.LastPublishedAt = time.Now().UnixMilli()
|
||||||
|
s.status.LastTopic = record.Topic
|
||||||
|
s.status.ProcessedCount = processed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) complete() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.cancel = nil
|
||||||
|
s.status.CompletedAt = time.Now().UnixMilli()
|
||||||
|
s.status.Running = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) fail(err error) {
|
||||||
|
log.Printf("mqtt replay failed: %v", err)
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.cancel = nil
|
||||||
|
s.status.CompletedAt = time.Now().UnixMilli()
|
||||||
|
s.status.ErrorMessage = err.Error()
|
||||||
|
s.status.Running = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReplayService) stopWithContext() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.cancel = nil
|
||||||
|
if s.status.Running {
|
||||||
|
s.status.Running = false
|
||||||
|
s.status.StoppedAt = time.Now().UnixMilli()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func markReplayPayload(topic string, payload []byte) {
|
||||||
|
replayFingerprints.mu.Lock()
|
||||||
|
defer replayFingerprints.mu.Unlock()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for key, expiresAt := range replayFingerprints.items {
|
||||||
|
if now.After(expiresAt) {
|
||||||
|
delete(replayFingerprints.items, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
replayFingerprints.items[buildReplayFingerprint(topic, payload)] = now.Add(replayMarkerRetention)
|
||||||
|
}
|
||||||
|
|
||||||
|
func consumeReplayPayload(topic string, payload []byte) bool {
|
||||||
|
replayFingerprints.mu.Lock()
|
||||||
|
defer replayFingerprints.mu.Unlock()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
key := buildReplayFingerprint(topic, payload)
|
||||||
|
expiresAt, ok := replayFingerprints.items[key]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
delete(replayFingerprints.items, key)
|
||||||
|
if now.After(expiresAt) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildReplayFingerprint(topic string, payload []byte) string {
|
||||||
|
sum := sha1.Sum(append([]byte(topic+"|"), payload...))
|
||||||
|
return hex.EncodeToString(sum[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeReplayEndTime(endTime int64) int64 {
|
||||||
|
if endTime <= 0 {
|
||||||
|
return endTime
|
||||||
|
}
|
||||||
|
if endTime%1000 == 0 {
|
||||||
|
return endTime + 999
|
||||||
|
}
|
||||||
|
return endTime
|
||||||
|
}
|
||||||
@@ -156,6 +156,11 @@ func SetupRouter() *gin.Engine {
|
|||||||
admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus)
|
admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus)
|
||||||
admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt)
|
admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt)
|
||||||
admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt)
|
admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt)
|
||||||
|
admin.GET("/system-debug/mqtt/replay/status", systemDebugController.MqttReplayStatus)
|
||||||
|
admin.POST("/system-debug/mqtt/replay/start", systemDebugController.StartMqttReplay)
|
||||||
|
admin.POST("/system-debug/mqtt/replay/stop", systemDebugController.StopMqttReplay)
|
||||||
|
admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig)
|
||||||
|
admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig)
|
||||||
|
|
||||||
admin.GET("/statistics/ai-analysis-records", statisticsController.ListAIAnalysisRecords)
|
admin.GET("/statistics/ai-analysis-records", statisticsController.ListAIAnalysisRecords)
|
||||||
admin.GET("/statistics/ai-analysis-records/:id/pdf", statisticsController.DownloadAIAnalysisRecordPDF)
|
admin.GET("/statistics/ai-analysis-records/:id/pdf", statisticsController.DownloadAIAnalysisRecordPDF)
|
||||||
@@ -163,6 +168,8 @@ func SetupRouter() *gin.Engine {
|
|||||||
admin.GET("/statistics/ai-analysis", statisticsController.StatisticsByRegion)
|
admin.GET("/statistics/ai-analysis", statisticsController.StatisticsByRegion)
|
||||||
admin.GET("/statistics/ai-analysis-timeline", statisticsController.TimelineStatistics)
|
admin.GET("/statistics/ai-analysis-timeline", statisticsController.TimelineStatistics)
|
||||||
admin.GET("/statistics/mqtt-training-sessions", statisticsController.TrainingSessionStatisticsByRegion)
|
admin.GET("/statistics/mqtt-training-sessions", statisticsController.TrainingSessionStatisticsByRegion)
|
||||||
|
admin.GET("/statistics/mqtt-measurements", statisticsController.ListMqttMeasurementHistory)
|
||||||
|
admin.GET("/statistics/mqtt-measurements/chart", statisticsController.MqttMeasurementChartData)
|
||||||
}
|
}
|
||||||
|
|
||||||
v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket)
|
v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket)
|
||||||
|
|||||||
Reference in New Issue
Block a user