Compare commits
3 Commits
0d07dc653b
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| df27dfdb03 | |||
| 3f499ddf27 | |||
| 1f77e1dd6c |
@@ -17,6 +17,8 @@ mqtt:
|
||||
port: 10237
|
||||
username: public_client
|
||||
password: uXC3M4ObO9KpdU
|
||||
gw_username: ""
|
||||
gw_password: ""
|
||||
client_id_prefix: hr-receiver
|
||||
region: "+"
|
||||
use_tls: true
|
||||
|
||||
@@ -32,6 +32,8 @@ type MQTTConfig struct {
|
||||
Port int `mapstructure:"port" yaml:"port"`
|
||||
Username string `mapstructure:"username" yaml:"username"`
|
||||
Password string `mapstructure:"password" yaml:"password"`
|
||||
GWUsername string `mapstructure:"gw_username" yaml:"gw_username"`
|
||||
GWPassword string `mapstructure:"gw_password" yaml:"gw_password"`
|
||||
ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"`
|
||||
Region string `mapstructure:"region" yaml:"region"`
|
||||
UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"`
|
||||
|
||||
+243
-60
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"hr_receiver/config"
|
||||
"hr_receiver/models"
|
||||
"math"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -73,6 +74,13 @@ type mqttMeasurementHistorySummary struct {
|
||||
ValueLabel string `json:"valueLabel"`
|
||||
}
|
||||
|
||||
const (
|
||||
mqttChartRawPointLimit int64 = 5000
|
||||
mqttChartTargetPointsTotal int64 = 6000
|
||||
mqttChartMinPointsPerAddr int64 = 120
|
||||
mqttChartMaxPointsPerAddr int64 = 600
|
||||
)
|
||||
|
||||
// --- 查询接口 ---
|
||||
|
||||
// @Summary 获取AI分析记录列表
|
||||
@@ -278,15 +286,6 @@ func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context,
|
||||
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
|
||||
|
||||
var records []models.MqttHeartRateRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Limit(10000).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "心率"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
|
||||
@@ -294,32 +293,17 @@ func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context,
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "heart_rate",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.HeartRate),
|
||||
ValueLabel: "心率",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: record.Battery,
|
||||
IsActive: record.IsActive,
|
||||
IsOnSkin: record.IsOnSkin,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
items, bucketSizeMs, sampled, err := sc.buildHeartRateChartItems(baseQuery, params, summary)
|
||||
if err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records")
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"summary": summary,
|
||||
"sampled": sampled,
|
||||
"bucketSizeMs": bucketSizeMs,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -390,15 +374,6 @@ func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context,
|
||||
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
|
||||
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
|
||||
|
||||
var records []models.MqttStepCountRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Limit(10000).
|
||||
Find(&records).Error; err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records")
|
||||
return
|
||||
}
|
||||
|
||||
var summary mqttMeasurementHistorySummary
|
||||
summary.ValueLabel = "步数"
|
||||
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
|
||||
@@ -406,32 +381,17 @@ func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context,
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "step_count",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.StepCount),
|
||||
ValueLabel: "步数",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: 0,
|
||||
IsActive: false,
|
||||
IsOnSkin: false,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
items, bucketSizeMs, sampled, err := sc.buildStepCountChartItems(baseQuery, params, summary)
|
||||
if err != nil {
|
||||
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records")
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccess(c, http.StatusOK, "query success", gin.H{
|
||||
"list": items,
|
||||
"summary": summary,
|
||||
"sampled": sampled,
|
||||
"bucketSizeMs": bucketSizeMs,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -489,6 +449,229 @@ func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, s
|
||||
return nil
|
||||
}
|
||||
|
||||
func estimateChartBucketSizeMs(params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) int64 {
|
||||
if params.StartTime <= 0 || params.EndTime <= params.StartTime {
|
||||
return 0
|
||||
}
|
||||
if summary.RecordCount <= mqttChartRawPointLimit {
|
||||
return 0
|
||||
}
|
||||
|
||||
uniqueAddrs := summary.UniqueAddrs
|
||||
if uniqueAddrs < 1 {
|
||||
uniqueAddrs = 1
|
||||
}
|
||||
|
||||
pointsPerAddr := mqttChartTargetPointsTotal / uniqueAddrs
|
||||
if pointsPerAddr < mqttChartMinPointsPerAddr {
|
||||
pointsPerAddr = mqttChartMinPointsPerAddr
|
||||
}
|
||||
if pointsPerAddr > mqttChartMaxPointsPerAddr {
|
||||
pointsPerAddr = mqttChartMaxPointsPerAddr
|
||||
}
|
||||
|
||||
rangeMs := params.EndTime - params.StartTime
|
||||
if rangeMs <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
bucketSizeMs := int64(math.Ceil(float64(rangeMs) / float64(pointsPerAddr)))
|
||||
return normalizeBucketSizeMs(bucketSizeMs)
|
||||
}
|
||||
|
||||
func normalizeBucketSizeMs(bucketSizeMs int64) int64 {
|
||||
if bucketSizeMs <= 1000 {
|
||||
return 1000
|
||||
}
|
||||
|
||||
candidates := []int64{
|
||||
1000,
|
||||
2000,
|
||||
5000,
|
||||
10 * 1000,
|
||||
15 * 1000,
|
||||
30 * 1000,
|
||||
60 * 1000,
|
||||
2 * 60 * 1000,
|
||||
5 * 60 * 1000,
|
||||
10 * 60 * 1000,
|
||||
15 * 60 * 1000,
|
||||
30 * 60 * 1000,
|
||||
60 * 60 * 1000,
|
||||
2 * 60 * 60 * 1000,
|
||||
6 * 60 * 60 * 1000,
|
||||
12 * 60 * 60 * 1000,
|
||||
24 * 60 * 60 * 1000,
|
||||
}
|
||||
for _, candidate := range candidates {
|
||||
if bucketSizeMs <= candidate {
|
||||
return candidate
|
||||
}
|
||||
}
|
||||
return 24 * 60 * 60 * 1000
|
||||
}
|
||||
|
||||
func buildChartBucketExpr(bucketSizeMs int64, timeField string) string {
|
||||
return "((" + timeField + " / " + strconv.FormatInt(bucketSizeMs, 10) + ") * " + strconv.FormatInt(bucketSizeMs, 10) + ")"
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) buildHeartRateChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) {
|
||||
bucketSizeMs := estimateChartBucketSizeMs(params, summary)
|
||||
if bucketSizeMs == 0 {
|
||||
var records []models.MqttHeartRateRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Find(&records).Error; err != nil {
|
||||
return nil, 0, false, err
|
||||
}
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "heart_rate",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.HeartRate),
|
||||
ValueLabel: "心率",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: record.Battery,
|
||||
IsActive: record.IsActive,
|
||||
IsOnSkin: record.IsOnSkin,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
return items, 0, false, nil
|
||||
}
|
||||
|
||||
bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at")
|
||||
type rawChartItem struct {
|
||||
RegionID uint32
|
||||
Addr string
|
||||
Value float64
|
||||
PacketNum int64
|
||||
SignalRSSINeg float64
|
||||
SNR float64
|
||||
ReceivedAt int64
|
||||
CreatedAtMilli int64
|
||||
}
|
||||
|
||||
var rows []rawChartItem
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Select(
|
||||
"region_id, belt_addr as addr, ROUND(AVG(heart_rate)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli",
|
||||
).
|
||||
Group("region_id, belt_addr, " + bucketExpr).
|
||||
Order("received_at ASC, addr ASC").
|
||||
Scan(&rows).Error; err != nil {
|
||||
return nil, bucketSizeMs, true, err
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
DataType: "heart_rate",
|
||||
RegionID: row.RegionID,
|
||||
Addr: row.Addr,
|
||||
Value: int64(math.Round(row.Value)),
|
||||
ValueLabel: "心率",
|
||||
PacketNum: uint32(maxInt64(row.PacketNum, 0)),
|
||||
SignalRSSINeg: row.SignalRSSINeg,
|
||||
SNR: row.SNR,
|
||||
ReceivedAt: row.ReceivedAt,
|
||||
CreatedAtMilli: row.CreatedAtMilli,
|
||||
})
|
||||
}
|
||||
return items, bucketSizeMs, true, nil
|
||||
}
|
||||
|
||||
func (sc *StatisticsController) buildStepCountChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) {
|
||||
bucketSizeMs := estimateChartBucketSizeMs(params, summary)
|
||||
if bucketSizeMs == 0 {
|
||||
var records []models.MqttStepCountRecord
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Order("received_at ASC").
|
||||
Find(&records).Error; err != nil {
|
||||
return nil, 0, false, err
|
||||
}
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(records))
|
||||
for _, record := range records {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
ID: record.ID,
|
||||
DataType: "step_count",
|
||||
Identifier: record.Identifier,
|
||||
Topic: record.Topic,
|
||||
RegionID: record.RegionID,
|
||||
Addr: record.BeltAddr,
|
||||
Value: int64(record.StepCount),
|
||||
ValueLabel: "步数",
|
||||
PacketNum: record.PacketNum,
|
||||
GatewayMAC: record.GatewayMAC,
|
||||
Battery: 0,
|
||||
IsActive: false,
|
||||
IsOnSkin: false,
|
||||
SignalRSSINeg: record.SignalRSSINeg,
|
||||
SNR: record.SNR,
|
||||
ReceivedAt: record.ReceivedAt,
|
||||
CreatedAtMilli: record.CreatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
return items, 0, false, nil
|
||||
}
|
||||
|
||||
bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at")
|
||||
type rawChartItem struct {
|
||||
RegionID uint32
|
||||
Addr string
|
||||
Value float64
|
||||
PacketNum int64
|
||||
SignalRSSINeg float64
|
||||
SNR float64
|
||||
ReceivedAt int64
|
||||
CreatedAtMilli int64
|
||||
}
|
||||
|
||||
var rows []rawChartItem
|
||||
if err := baseQuery.Session(&gorm.Session{}).
|
||||
Select(
|
||||
"region_id, belt_addr as addr, ROUND(AVG(step_count)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli",
|
||||
).
|
||||
Group("region_id, belt_addr, " + bucketExpr).
|
||||
Order("received_at ASC, addr ASC").
|
||||
Scan(&rows).Error; err != nil {
|
||||
return nil, bucketSizeMs, true, err
|
||||
}
|
||||
|
||||
items := make([]mqttMeasurementHistoryItem, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
items = append(items, mqttMeasurementHistoryItem{
|
||||
DataType: "step_count",
|
||||
RegionID: row.RegionID,
|
||||
Addr: row.Addr,
|
||||
Value: int64(math.Round(row.Value)),
|
||||
ValueLabel: "步数",
|
||||
PacketNum: uint32(maxInt64(row.PacketNum, 0)),
|
||||
SignalRSSINeg: row.SignalRSSINeg,
|
||||
SNR: row.SNR,
|
||||
ReceivedAt: row.ReceivedAt,
|
||||
CreatedAtMilli: row.CreatedAtMilli,
|
||||
})
|
||||
}
|
||||
return items, bucketSizeMs, true, nil
|
||||
}
|
||||
|
||||
func maxInt64(value, floor int64) int64 {
|
||||
if value < floor {
|
||||
return floor
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
// --- 删除接口 ---
|
||||
|
||||
// @Summary 删除AI分析记录
|
||||
|
||||
@@ -19,6 +19,13 @@ type mqttDebugStartRequest struct {
|
||||
PersistToDatabase bool `json:"persistToDatabase"`
|
||||
}
|
||||
|
||||
type mqttReplayStartRequest struct {
|
||||
Addr string `json:"addr"`
|
||||
EndTime int64 `json:"endTime"`
|
||||
RegionID uint32 `json:"regionId"`
|
||||
StartTime int64 `json:"startTime"`
|
||||
}
|
||||
|
||||
var debugUpgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
@@ -89,6 +96,70 @@ func (sc *SystemDebugController) StopMqtt(c *gin.Context) {
|
||||
writeSuccess(c, http.StatusOK, "stop success", service.Status())
|
||||
}
|
||||
|
||||
// @Summary 获取MQTT重放状态
|
||||
// @Description 获取心率历史数据 MQTT 重放状态
|
||||
// @Tags 系统调试
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "查询成功"
|
||||
// @Router /admin/system-debug/mqtt/replay/status [get]
|
||||
func (sc *SystemDebugController) MqttReplayStatus(c *gin.Context) {
|
||||
service := mqtt.GetReplayService()
|
||||
if service == nil {
|
||||
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||
return
|
||||
}
|
||||
writeSuccess(c, http.StatusOK, "query success", service.Status())
|
||||
}
|
||||
|
||||
// @Summary 启动MQTT历史数据重放
|
||||
// @Description 按指定时间范围重放历史心率数据到 MQTT
|
||||
// @Tags 系统调试
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "启动成功"
|
||||
// @Router /admin/system-debug/mqtt/replay/start [post]
|
||||
func (sc *SystemDebugController) StartMqttReplay(c *gin.Context) {
|
||||
service := mqtt.GetReplayService()
|
||||
if service == nil {
|
||||
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||
return
|
||||
}
|
||||
var payload mqttReplayStartRequest
|
||||
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
status, err := service.Start(mqtt.ReplayStartRequest{
|
||||
Addr: payload.Addr,
|
||||
EndTime: payload.EndTime,
|
||||
RegionID: payload.RegionID,
|
||||
StartTime: payload.StartTime,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
writeSuccess(c, http.StatusOK, "start success", status)
|
||||
}
|
||||
|
||||
// @Summary 停止MQTT历史数据重放
|
||||
// @Description 停止当前正在执行的心率历史数据 MQTT 重放
|
||||
// @Tags 系统调试
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Success 200 {object} SwagAPIResponse "停止成功"
|
||||
// @Router /admin/system-debug/mqtt/replay/stop [post]
|
||||
func (sc *SystemDebugController) StopMqttReplay(c *gin.Context) {
|
||||
service := mqtt.GetReplayService()
|
||||
if service == nil {
|
||||
writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable")
|
||||
return
|
||||
}
|
||||
writeSuccess(c, http.StatusOK, "stop success", service.Stop())
|
||||
}
|
||||
|
||||
// @Summary MQTT WebSocket连接
|
||||
// @Description 通过WebSocket实时监听MQTT消息(需要SuperAdmin权限)
|
||||
// @Tags 系统调试
|
||||
|
||||
@@ -87,6 +87,7 @@ func main() {
|
||||
log.Printf("mqtt listener start failed: %v", err)
|
||||
}
|
||||
mqtt.InitDebugService(config.DB, config.App.MQTT)
|
||||
mqtt.InitReplayService(config.DB, config.App.MQTT)
|
||||
controllers.StartLessonPlanCleanupJob(config.DB)
|
||||
controllers.StartMqttMeasurementCleanupJob(config.DB)
|
||||
|
||||
|
||||
@@ -161,6 +161,10 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) {
|
||||
if len(msg.Payload()) == 0 {
|
||||
return
|
||||
}
|
||||
if consumeReplayPayload(msg.Topic(), msg.Payload()) {
|
||||
log.Printf("mqtt listener skipped replay payload topic=%s", msg.Topic())
|
||||
return
|
||||
}
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg
|
||||
|
||||
@@ -0,0 +1,455 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"hr_receiver/config"
|
||||
"hr_receiver/models"
|
||||
whgw_hrpb "hr_receiver/proto"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const (
|
||||
maxReplayRecordCount = 20000
|
||||
maxReplaySleepGap = 24 * time.Hour
|
||||
replayMarkerRetention = 10 * time.Minute
|
||||
)
|
||||
|
||||
type ReplayStartRequest struct {
|
||||
Addr string `json:"addr"`
|
||||
EndTime int64 `json:"endTime"`
|
||||
RegionID uint32 `json:"regionId"`
|
||||
StartTime int64 `json:"startTime"`
|
||||
}
|
||||
|
||||
type ReplayStatus struct {
|
||||
Addr string `json:"addr"`
|
||||
CompletedAt int64 `json:"completedAt"`
|
||||
CurrentAddr string `json:"currentAddr"`
|
||||
CurrentHeartRate int `json:"currentHeartRate"`
|
||||
EndTime int64 `json:"endTime"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
LastPublishedAt int64 `json:"lastPublishedAt"`
|
||||
LastTopic string `json:"lastTopic"`
|
||||
ProcessedCount int `json:"processedCount"`
|
||||
RegionID uint32 `json:"regionId"`
|
||||
Running bool `json:"running"`
|
||||
StartedAt int64 `json:"startedAt"`
|
||||
StartTime int64 `json:"startTime"`
|
||||
StoppedAt int64 `json:"stoppedAt"`
|
||||
TotalCount int `json:"totalCount"`
|
||||
}
|
||||
|
||||
type ReplayService struct {
|
||||
cfg config.MQTTConfig
|
||||
db *gorm.DB
|
||||
cancel context.CancelFunc
|
||||
mu sync.RWMutex
|
||||
status ReplayStatus
|
||||
}
|
||||
|
||||
type replayFingerprintStore struct {
|
||||
items map[string]time.Time
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var (
|
||||
globalReplayService = &ReplayService{}
|
||||
replayFingerprints = &replayFingerprintStore{items: map[string]time.Time{}}
|
||||
)
|
||||
|
||||
func InitReplayService(db *gorm.DB, cfg config.MQTTConfig) {
|
||||
globalReplayService = &ReplayService{
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func GetReplayService() *ReplayService {
|
||||
return globalReplayService
|
||||
}
|
||||
|
||||
func (s *ReplayService) Status() ReplayStatus {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.status
|
||||
}
|
||||
|
||||
func (s *ReplayService) Start(req ReplayStartRequest) (ReplayStatus, error) {
|
||||
req.Addr = strings.TrimSpace(req.Addr)
|
||||
if req.StartTime <= 0 || req.EndTime <= 0 {
|
||||
return ReplayStatus{}, fmt.Errorf("startTime and endTime are required")
|
||||
}
|
||||
req.EndTime = normalizeReplayEndTime(req.EndTime)
|
||||
if req.EndTime < req.StartTime {
|
||||
return ReplayStatus{}, fmt.Errorf("endTime must be greater than or equal to startTime")
|
||||
}
|
||||
if err := validateConfig(s.cfg); err != nil {
|
||||
return ReplayStatus{}, err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
if s.status.Running {
|
||||
current := s.status
|
||||
s.mu.Unlock()
|
||||
return current, fmt.Errorf("mqtt replay is already running")
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
query := s.db.Model(&models.MqttHeartRateRecord{}).
|
||||
Where("received_at >= ? AND received_at <= ?", req.StartTime, req.EndTime)
|
||||
if req.RegionID > 0 {
|
||||
query = query.Where("region_id = ?", req.RegionID)
|
||||
}
|
||||
if req.Addr != "" {
|
||||
query = query.Where("belt_addr LIKE ?", "%"+req.Addr+"%")
|
||||
}
|
||||
|
||||
var total int64
|
||||
if err := query.Count(&total).Error; err != nil {
|
||||
return ReplayStatus{}, err
|
||||
}
|
||||
if total == 0 {
|
||||
log.Printf("mqtt replay found no records start=%d end=%d region=%d addr=%q", req.StartTime, req.EndTime, req.RegionID, req.Addr)
|
||||
return ReplayStatus{}, fmt.Errorf("no heart rate records found in the selected range")
|
||||
}
|
||||
if total > maxReplayRecordCount {
|
||||
return ReplayStatus{}, fmt.Errorf("replay record count exceeds limit: %d > %d", total, maxReplayRecordCount)
|
||||
}
|
||||
|
||||
var records []models.MqttHeartRateRecord
|
||||
if err := query.Order("received_at ASC, id ASC").Find(&records).Error; err != nil {
|
||||
return ReplayStatus{}, err
|
||||
}
|
||||
|
||||
status := ReplayStatus{
|
||||
Addr: req.Addr,
|
||||
EndTime: req.EndTime,
|
||||
RegionID: req.RegionID,
|
||||
Running: true,
|
||||
StartedAt: time.Now().UnixMilli(),
|
||||
StartTime: req.StartTime,
|
||||
TotalCount: len(records),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
s.mu.Lock()
|
||||
s.cancel = cancel
|
||||
s.status = status
|
||||
s.mu.Unlock()
|
||||
|
||||
go s.run(ctx, records)
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (s *ReplayService) Stop() ReplayStatus {
|
||||
s.mu.Lock()
|
||||
cancel := s.cancel
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
s.cancel = nil
|
||||
}
|
||||
if s.status.Running {
|
||||
s.status.Running = false
|
||||
s.status.StoppedAt = time.Now().UnixMilli()
|
||||
}
|
||||
status := s.status
|
||||
s.mu.Unlock()
|
||||
return status
|
||||
}
|
||||
|
||||
func (s *ReplayService) run(ctx context.Context, records []models.MqttHeartRateRecord) {
|
||||
client, err := s.connectReplayClient()
|
||||
if err != nil {
|
||||
s.fail(err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if client.IsConnected() {
|
||||
client.Disconnect(250)
|
||||
}
|
||||
}()
|
||||
|
||||
for idx, record := range records {
|
||||
if idx > 0 {
|
||||
wait := time.Duration(record.ReceivedAt-records[idx-1].ReceivedAt) * time.Millisecond
|
||||
if wait < 0 {
|
||||
wait = 0
|
||||
}
|
||||
if wait > maxReplaySleepGap {
|
||||
wait = maxReplaySleepGap
|
||||
}
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
s.stopWithContext()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.publishRecord(client, record); err != nil {
|
||||
s.fail(err)
|
||||
return
|
||||
}
|
||||
s.updateProgress(record, idx+1)
|
||||
}
|
||||
|
||||
s.complete()
|
||||
}
|
||||
|
||||
func (s *ReplayService) connectReplayClient() (mqtt.Client, error) {
|
||||
opts := mqtt.NewClientOptions()
|
||||
scheme := "tcp"
|
||||
if s.cfg.UseTLS {
|
||||
scheme = "ssl"
|
||||
opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})
|
||||
}
|
||||
broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port)
|
||||
username := strings.TrimSpace(s.cfg.GWUsername)
|
||||
password := s.cfg.GWPassword
|
||||
if username == "" {
|
||||
username = s.cfg.Username
|
||||
password = s.cfg.Password
|
||||
}
|
||||
opts.AddBroker(broker)
|
||||
opts.SetClientID(fmt.Sprintf("%s-replay-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano()))
|
||||
opts.SetUsername(username)
|
||||
opts.SetPassword(password)
|
||||
opts.SetKeepAlive(60 * time.Second)
|
||||
opts.SetAutoReconnect(false)
|
||||
opts.SetConnectRetry(false)
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
if !token.WaitTimeout(15 * time.Second) {
|
||||
return nil, fmt.Errorf("mqtt replay connect timeout")
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("mqtt replay connected broker=%s username=%s", broker, username)
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (s *ReplayService) publishRecord(client mqtt.Client, record models.MqttHeartRateRecord) error {
|
||||
payloadMessage := buildReplayHeartRateMessage(record)
|
||||
payload, err := proto.Marshal(payloadMessage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal replay heart rate payload: %w", err)
|
||||
}
|
||||
topic := strings.TrimSpace(record.Topic)
|
||||
if topic == "" {
|
||||
topic = fmt.Sprintf("/whgw/v2/region/%d/measurement/band/%d/hr", record.RegionID, record.BandID)
|
||||
}
|
||||
markReplayPayload(topic, payload)
|
||||
token := client.Publish(topic, byte(s.cfg.QoS), false, payload)
|
||||
if !token.WaitTimeout(10 * time.Second) {
|
||||
return fmt.Errorf("mqtt replay publish timeout")
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("mqtt replay published region=%d addr=%s hr=%d packet=%d topic=%s", record.RegionID, record.BeltAddr, record.HeartRate, record.PacketNum, topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildReplayHeartRateMessage(record models.MqttHeartRateRecord) *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg {
|
||||
return &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg{
|
||||
Choice: &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement{
|
||||
NtfHrMeasurement: &whgw_hrpb.HrMeasurement{
|
||||
HrPacket: &whgw_hrpb.HrPacket{
|
||||
Hr: uint32(record.HeartRate),
|
||||
Id: record.BandID,
|
||||
PacketNum: record.PacketNum,
|
||||
Status: &whgw_hrpb.StatusFlag{
|
||||
Battery: record.Battery,
|
||||
HrConfidence: whgw_hrpb.HrConfidence(record.HrConfidence),
|
||||
IsActive: record.IsActive,
|
||||
IsOnSkin: record.IsOnSkin,
|
||||
},
|
||||
},
|
||||
PacketStatus: buildReplayPacketStatus(record),
|
||||
GatewayInfo: &whgw_hrpb.GatewayInfo{
|
||||
Extra: &whgw_hrpb.GatewayInfoExtra{
|
||||
ActiveUplink: whgw_hrpb.NetworkUplinkKind(record.GatewayActiveUplink),
|
||||
CellularModem: &whgw_hrpb.CellularModemInfo{
|
||||
Ber: record.GatewayCellularBER,
|
||||
Imei: record.GatewayCellularIMEI,
|
||||
Rssi: record.GatewayCellularRSSI,
|
||||
},
|
||||
SchemaVersion: record.GatewaySchemaVersion,
|
||||
},
|
||||
GatewayMac: parseMAC(record.GatewayMAC),
|
||||
RegionId: record.RegionID,
|
||||
},
|
||||
HubInfo: &whgw_hrpb.HubInfo{
|
||||
BusId: record.HubBusID,
|
||||
SubDevId: record.HubSubDevID,
|
||||
RadioParameters: &whgw_hrpb.LoRaParameters{
|
||||
Bw: whgw_hrpb.LoRaBW(record.HubRadioBW),
|
||||
FrequencyMhz: float32(record.HubRadioFrequencyMHz),
|
||||
Sf: record.HubRadioSF,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildReplayPacketStatus(record models.MqttHeartRateRecord) *whgw_hrpb.IPacketStatus {
|
||||
switch strings.TrimSpace(record.PacketStatusSource) {
|
||||
case "raw":
|
||||
return &whgw_hrpb.IPacketStatus{
|
||||
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
||||
Raw: &whgw_hrpb.RawPacketStatus{
|
||||
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
||||
SnrPktX4: record.RawSnrPktX4,
|
||||
},
|
||||
},
|
||||
}
|
||||
case "parsed":
|
||||
return &whgw_hrpb.IPacketStatus{
|
||||
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
||||
Parsed: &whgw_hrpb.PacketStatus{
|
||||
SignalRssiNeg: float32(record.SignalRSSINeg),
|
||||
SnrPkt: float32(record.SNR),
|
||||
},
|
||||
},
|
||||
}
|
||||
default:
|
||||
if record.RawSignalRSSIX2Neg > 0 || record.RawSnrPktX4 != 0 {
|
||||
return &whgw_hrpb.IPacketStatus{
|
||||
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
||||
Raw: &whgw_hrpb.RawPacketStatus{
|
||||
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
||||
SnrPktX4: record.RawSnrPktX4,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
return &whgw_hrpb.IPacketStatus{
|
||||
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
||||
Parsed: &whgw_hrpb.PacketStatus{
|
||||
SignalRssiNeg: float32(record.SignalRSSINeg),
|
||||
SnrPkt: float32(record.SNR),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseMAC(value string) []byte {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
return nil
|
||||
}
|
||||
parts := strings.Split(value, ":")
|
||||
result := make([]byte, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
if len(part) == 1 {
|
||||
part = "0" + part
|
||||
}
|
||||
decoded, err := hex.DecodeString(part)
|
||||
if err != nil || len(decoded) != 1 {
|
||||
return nil
|
||||
}
|
||||
result = append(result, decoded[0])
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ReplayService) updateProgress(record models.MqttHeartRateRecord, processed int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.status.CurrentAddr = record.BeltAddr
|
||||
s.status.CurrentHeartRate = record.HeartRate
|
||||
s.status.LastPublishedAt = time.Now().UnixMilli()
|
||||
s.status.LastTopic = record.Topic
|
||||
s.status.ProcessedCount = processed
|
||||
}
|
||||
|
||||
func (s *ReplayService) complete() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.cancel = nil
|
||||
s.status.CompletedAt = time.Now().UnixMilli()
|
||||
s.status.Running = false
|
||||
}
|
||||
|
||||
func (s *ReplayService) fail(err error) {
|
||||
log.Printf("mqtt replay failed: %v", err)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.cancel = nil
|
||||
s.status.CompletedAt = time.Now().UnixMilli()
|
||||
s.status.ErrorMessage = err.Error()
|
||||
s.status.Running = false
|
||||
}
|
||||
|
||||
func (s *ReplayService) stopWithContext() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.cancel = nil
|
||||
if s.status.Running {
|
||||
s.status.Running = false
|
||||
s.status.StoppedAt = time.Now().UnixMilli()
|
||||
}
|
||||
}
|
||||
|
||||
func markReplayPayload(topic string, payload []byte) {
|
||||
replayFingerprints.mu.Lock()
|
||||
defer replayFingerprints.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, expiresAt := range replayFingerprints.items {
|
||||
if now.After(expiresAt) {
|
||||
delete(replayFingerprints.items, key)
|
||||
}
|
||||
}
|
||||
replayFingerprints.items[buildReplayFingerprint(topic, payload)] = now.Add(replayMarkerRetention)
|
||||
}
|
||||
|
||||
func consumeReplayPayload(topic string, payload []byte) bool {
|
||||
replayFingerprints.mu.Lock()
|
||||
defer replayFingerprints.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
key := buildReplayFingerprint(topic, payload)
|
||||
expiresAt, ok := replayFingerprints.items[key]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
delete(replayFingerprints.items, key)
|
||||
if now.After(expiresAt) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func buildReplayFingerprint(topic string, payload []byte) string {
|
||||
sum := sha1.Sum(append([]byte(topic+"|"), payload...))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func normalizeReplayEndTime(endTime int64) int64 {
|
||||
if endTime <= 0 {
|
||||
return endTime
|
||||
}
|
||||
if endTime%1000 == 0 {
|
||||
return endTime + 999
|
||||
}
|
||||
return endTime
|
||||
}
|
||||
@@ -156,6 +156,9 @@ func SetupRouter() *gin.Engine {
|
||||
admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus)
|
||||
admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt)
|
||||
admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt)
|
||||
admin.GET("/system-debug/mqtt/replay/status", systemDebugController.MqttReplayStatus)
|
||||
admin.POST("/system-debug/mqtt/replay/start", systemDebugController.StartMqttReplay)
|
||||
admin.POST("/system-debug/mqtt/replay/stop", systemDebugController.StopMqttReplay)
|
||||
admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig)
|
||||
admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user