From 1f77e1dd6c0b30f3578115b4b9dcb459f9d2be76 Mon Sep 17 00:00:00 2001 From: laoboli <1293528695@qq.com> Date: Wed, 13 May 2026 19:56:05 +0800 Subject: [PATCH] feat: replay service --- config.sample.yaml | 2 + config/config.go | 2 + controllers/system_debug.go | 71 ++++++ main.go | 1 + mqtt/listener.go | 4 + mqtt/replay_service.go | 455 ++++++++++++++++++++++++++++++++++++ routes/routes.go | 3 + 7 files changed, 538 insertions(+) create mode 100644 mqtt/replay_service.go diff --git a/config.sample.yaml b/config.sample.yaml index 2fc49e7..b841864 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -17,6 +17,8 @@ mqtt: port: 10237 username: public_client password: uXC3M4ObO9KpdU + gw_username: "" + gw_password: "" client_id_prefix: hr-receiver region: "+" use_tls: true diff --git a/config/config.go b/config/config.go index 8e15d77..36df144 100644 --- a/config/config.go +++ b/config/config.go @@ -32,6 +32,8 @@ type MQTTConfig struct { Port int `mapstructure:"port" yaml:"port"` Username string `mapstructure:"username" yaml:"username"` Password string `mapstructure:"password" yaml:"password"` + GWUsername string `mapstructure:"gw_username" yaml:"gw_username"` + GWPassword string `mapstructure:"gw_password" yaml:"gw_password"` ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"` Region string `mapstructure:"region" yaml:"region"` UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"` diff --git a/controllers/system_debug.go b/controllers/system_debug.go index 5415ca4..050f4a5 100644 --- a/controllers/system_debug.go +++ b/controllers/system_debug.go @@ -19,6 +19,13 @@ type mqttDebugStartRequest struct { PersistToDatabase bool `json:"persistToDatabase"` } +type mqttReplayStartRequest struct { + Addr string `json:"addr"` + EndTime int64 `json:"endTime"` + RegionID uint32 `json:"regionId"` + StartTime int64 `json:"startTime"` +} + var debugUpgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true @@ -89,6 +96,70 @@ func (sc *SystemDebugController) StopMqtt(c *gin.Context) { writeSuccess(c, http.StatusOK, "stop success", service.Status()) } +// @Summary 获取MQTT重放状态 +// @Description 获取心率历史数据 MQTT 重放状态 +// @Tags 系统调试 +// @Produce json +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "查询成功" +// @Router /admin/system-debug/mqtt/replay/status [get] +func (sc *SystemDebugController) MqttReplayStatus(c *gin.Context) { + service := mqtt.GetReplayService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable") + return + } + writeSuccess(c, http.StatusOK, "query success", service.Status()) +} + +// @Summary 启动MQTT历史数据重放 +// @Description 按指定时间范围重放历史心率数据到 MQTT +// @Tags 系统调试 +// @Accept json +// @Produce json +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "启动成功" +// @Router /admin/system-debug/mqtt/replay/start [post] +func (sc *SystemDebugController) StartMqttReplay(c *gin.Context) { + service := mqtt.GetReplayService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable") + return + } + var payload mqttReplayStartRequest + if err := c.ShouldBindJSON(&payload); err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + status, err := service.Start(mqtt.ReplayStartRequest{ + Addr: payload.Addr, + EndTime: payload.EndTime, + RegionID: payload.RegionID, + StartTime: payload.StartTime, + }) + if err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + writeSuccess(c, http.StatusOK, "start success", status) +} + +// @Summary 停止MQTT历史数据重放 +// @Description 停止当前正在执行的心率历史数据 MQTT 重放 +// @Tags 系统调试 +// @Produce json +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "停止成功" +// @Router /admin/system-debug/mqtt/replay/stop [post] +func (sc *SystemDebugController) StopMqttReplay(c *gin.Context) { + service := mqtt.GetReplayService() + if service == nil { + writeError(c, http.StatusServiceUnavailable, "mqtt replay service unavailable") + return + } + writeSuccess(c, http.StatusOK, "stop success", service.Stop()) +} + // @Summary MQTT WebSocket连接 // @Description 通过WebSocket实时监听MQTT消息(需要SuperAdmin权限) // @Tags 系统调试 diff --git a/main.go b/main.go index 70ea30b..4899cca 100644 --- a/main.go +++ b/main.go @@ -87,6 +87,7 @@ func main() { log.Printf("mqtt listener start failed: %v", err) } mqtt.InitDebugService(config.DB, config.App.MQTT) + mqtt.InitReplayService(config.DB, config.App.MQTT) controllers.StartLessonPlanCleanupJob(config.DB) controllers.StartMqttMeasurementCleanupJob(config.DB) diff --git a/mqtt/listener.go b/mqtt/listener.go index 9111173..00f23dd 100644 --- a/mqtt/listener.go +++ b/mqtt/listener.go @@ -161,6 +161,10 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) { if len(msg.Payload()) == 0 { return } + if consumeReplayPayload(msg.Topic(), msg.Payload()) { + log.Printf("mqtt listener skipped replay payload topic=%s", msg.Topic()) + return + } now := time.Now().UnixMilli() var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg diff --git a/mqtt/replay_service.go b/mqtt/replay_service.go new file mode 100644 index 0000000..8c8b1f7 --- /dev/null +++ b/mqtt/replay_service.go @@ -0,0 +1,455 @@ +package mqtt + +import ( + "context" + "crypto/sha1" + "crypto/tls" + "encoding/hex" + "fmt" + "hr_receiver/config" + "hr_receiver/models" + whgw_hrpb "hr_receiver/proto" + "log" + "strings" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "google.golang.org/protobuf/proto" + "gorm.io/gorm" +) + +const ( + maxReplayRecordCount = 20000 + maxReplaySleepGap = 24 * time.Hour + replayMarkerRetention = 10 * time.Minute +) + +type ReplayStartRequest struct { + Addr string `json:"addr"` + EndTime int64 `json:"endTime"` + RegionID uint32 `json:"regionId"` + StartTime int64 `json:"startTime"` +} + +type ReplayStatus struct { + Addr string `json:"addr"` + CompletedAt int64 `json:"completedAt"` + CurrentAddr string `json:"currentAddr"` + CurrentHeartRate int `json:"currentHeartRate"` + EndTime int64 `json:"endTime"` + ErrorMessage string `json:"errorMessage"` + LastPublishedAt int64 `json:"lastPublishedAt"` + LastTopic string `json:"lastTopic"` + ProcessedCount int `json:"processedCount"` + RegionID uint32 `json:"regionId"` + Running bool `json:"running"` + StartedAt int64 `json:"startedAt"` + StartTime int64 `json:"startTime"` + StoppedAt int64 `json:"stoppedAt"` + TotalCount int `json:"totalCount"` +} + +type ReplayService struct { + cfg config.MQTTConfig + db *gorm.DB + cancel context.CancelFunc + mu sync.RWMutex + status ReplayStatus +} + +type replayFingerprintStore struct { + items map[string]time.Time + mu sync.Mutex +} + +var ( + globalReplayService = &ReplayService{} + replayFingerprints = &replayFingerprintStore{items: map[string]time.Time{}} +) + +func InitReplayService(db *gorm.DB, cfg config.MQTTConfig) { + globalReplayService = &ReplayService{ + cfg: cfg, + db: db, + } +} + +func GetReplayService() *ReplayService { + return globalReplayService +} + +func (s *ReplayService) Status() ReplayStatus { + s.mu.RLock() + defer s.mu.RUnlock() + return s.status +} + +func (s *ReplayService) Start(req ReplayStartRequest) (ReplayStatus, error) { + req.Addr = strings.TrimSpace(req.Addr) + if req.StartTime <= 0 || req.EndTime <= 0 { + return ReplayStatus{}, fmt.Errorf("startTime and endTime are required") + } + req.EndTime = normalizeReplayEndTime(req.EndTime) + if req.EndTime < req.StartTime { + return ReplayStatus{}, fmt.Errorf("endTime must be greater than or equal to startTime") + } + if err := validateConfig(s.cfg); err != nil { + return ReplayStatus{}, err + } + + s.mu.Lock() + if s.status.Running { + current := s.status + s.mu.Unlock() + return current, fmt.Errorf("mqtt replay is already running") + } + s.mu.Unlock() + + query := s.db.Model(&models.MqttHeartRateRecord{}). + Where("received_at >= ? AND received_at <= ?", req.StartTime, req.EndTime) + if req.RegionID > 0 { + query = query.Where("region_id = ?", req.RegionID) + } + if req.Addr != "" { + query = query.Where("belt_addr LIKE ?", "%"+req.Addr+"%") + } + + var total int64 + if err := query.Count(&total).Error; err != nil { + return ReplayStatus{}, err + } + if total == 0 { + log.Printf("mqtt replay found no records start=%d end=%d region=%d addr=%q", req.StartTime, req.EndTime, req.RegionID, req.Addr) + return ReplayStatus{}, fmt.Errorf("no heart rate records found in the selected range") + } + if total > maxReplayRecordCount { + return ReplayStatus{}, fmt.Errorf("replay record count exceeds limit: %d > %d", total, maxReplayRecordCount) + } + + var records []models.MqttHeartRateRecord + if err := query.Order("received_at ASC, id ASC").Find(&records).Error; err != nil { + return ReplayStatus{}, err + } + + status := ReplayStatus{ + Addr: req.Addr, + EndTime: req.EndTime, + RegionID: req.RegionID, + Running: true, + StartedAt: time.Now().UnixMilli(), + StartTime: req.StartTime, + TotalCount: len(records), + } + + ctx, cancel := context.WithCancel(context.Background()) + + s.mu.Lock() + s.cancel = cancel + s.status = status + s.mu.Unlock() + + go s.run(ctx, records) + return status, nil +} + +func (s *ReplayService) Stop() ReplayStatus { + s.mu.Lock() + cancel := s.cancel + if cancel != nil { + cancel() + s.cancel = nil + } + if s.status.Running { + s.status.Running = false + s.status.StoppedAt = time.Now().UnixMilli() + } + status := s.status + s.mu.Unlock() + return status +} + +func (s *ReplayService) run(ctx context.Context, records []models.MqttHeartRateRecord) { + client, err := s.connectReplayClient() + if err != nil { + s.fail(err) + return + } + defer func() { + if client.IsConnected() { + client.Disconnect(250) + } + }() + + for idx, record := range records { + if idx > 0 { + wait := time.Duration(record.ReceivedAt-records[idx-1].ReceivedAt) * time.Millisecond + if wait < 0 { + wait = 0 + } + if wait > maxReplaySleepGap { + wait = maxReplaySleepGap + } + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + s.stopWithContext() + return + case <-timer.C: + } + } + + if err := s.publishRecord(client, record); err != nil { + s.fail(err) + return + } + s.updateProgress(record, idx+1) + } + + s.complete() +} + +func (s *ReplayService) connectReplayClient() (mqtt.Client, error) { + opts := mqtt.NewClientOptions() + scheme := "tcp" + if s.cfg.UseTLS { + scheme = "ssl" + opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}) + } + broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port) + username := strings.TrimSpace(s.cfg.GWUsername) + password := s.cfg.GWPassword + if username == "" { + username = s.cfg.Username + password = s.cfg.Password + } + opts.AddBroker(broker) + opts.SetClientID(fmt.Sprintf("%s-replay-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano())) + opts.SetUsername(username) + opts.SetPassword(password) + opts.SetKeepAlive(60 * time.Second) + opts.SetAutoReconnect(false) + opts.SetConnectRetry(false) + + client := mqtt.NewClient(opts) + token := client.Connect() + if !token.WaitTimeout(15 * time.Second) { + return nil, fmt.Errorf("mqtt replay connect timeout") + } + if err := token.Error(); err != nil { + return nil, err + } + log.Printf("mqtt replay connected broker=%s username=%s", broker, username) + return client, nil +} + +func (s *ReplayService) publishRecord(client mqtt.Client, record models.MqttHeartRateRecord) error { + payloadMessage := buildReplayHeartRateMessage(record) + payload, err := proto.Marshal(payloadMessage) + if err != nil { + return fmt.Errorf("marshal replay heart rate payload: %w", err) + } + topic := strings.TrimSpace(record.Topic) + if topic == "" { + topic = fmt.Sprintf("/whgw/v2/region/%d/measurement/band/%d/hr", record.RegionID, record.BandID) + } + markReplayPayload(topic, payload) + token := client.Publish(topic, byte(s.cfg.QoS), false, payload) + if !token.WaitTimeout(10 * time.Second) { + return fmt.Errorf("mqtt replay publish timeout") + } + if err := token.Error(); err != nil { + return err + } + log.Printf("mqtt replay published region=%d addr=%s hr=%d packet=%d topic=%s", record.RegionID, record.BeltAddr, record.HeartRate, record.PacketNum, topic) + return nil +} + +func buildReplayHeartRateMessage(record models.MqttHeartRateRecord) *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg { + return &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg{ + Choice: &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement{ + NtfHrMeasurement: &whgw_hrpb.HrMeasurement{ + HrPacket: &whgw_hrpb.HrPacket{ + Hr: uint32(record.HeartRate), + Id: record.BandID, + PacketNum: record.PacketNum, + Status: &whgw_hrpb.StatusFlag{ + Battery: record.Battery, + HrConfidence: whgw_hrpb.HrConfidence(record.HrConfidence), + IsActive: record.IsActive, + IsOnSkin: record.IsOnSkin, + }, + }, + PacketStatus: buildReplayPacketStatus(record), + GatewayInfo: &whgw_hrpb.GatewayInfo{ + Extra: &whgw_hrpb.GatewayInfoExtra{ + ActiveUplink: whgw_hrpb.NetworkUplinkKind(record.GatewayActiveUplink), + CellularModem: &whgw_hrpb.CellularModemInfo{ + Ber: record.GatewayCellularBER, + Imei: record.GatewayCellularIMEI, + Rssi: record.GatewayCellularRSSI, + }, + SchemaVersion: record.GatewaySchemaVersion, + }, + GatewayMac: parseMAC(record.GatewayMAC), + RegionId: record.RegionID, + }, + HubInfo: &whgw_hrpb.HubInfo{ + BusId: record.HubBusID, + SubDevId: record.HubSubDevID, + RadioParameters: &whgw_hrpb.LoRaParameters{ + Bw: whgw_hrpb.LoRaBW(record.HubRadioBW), + FrequencyMhz: float32(record.HubRadioFrequencyMHz), + Sf: record.HubRadioSF, + }, + }, + }, + }, + } +} + +func buildReplayPacketStatus(record models.MqttHeartRateRecord) *whgw_hrpb.IPacketStatus { + switch strings.TrimSpace(record.PacketStatusSource) { + case "raw": + return &whgw_hrpb.IPacketStatus{ + Choice: &whgw_hrpb.IPacketStatus_Raw{ + Raw: &whgw_hrpb.RawPacketStatus{ + SignalRssiX2Neg: record.RawSignalRSSIX2Neg, + SnrPktX4: record.RawSnrPktX4, + }, + }, + } + case "parsed": + return &whgw_hrpb.IPacketStatus{ + Choice: &whgw_hrpb.IPacketStatus_Parsed{ + Parsed: &whgw_hrpb.PacketStatus{ + SignalRssiNeg: float32(record.SignalRSSINeg), + SnrPkt: float32(record.SNR), + }, + }, + } + default: + if record.RawSignalRSSIX2Neg > 0 || record.RawSnrPktX4 != 0 { + return &whgw_hrpb.IPacketStatus{ + Choice: &whgw_hrpb.IPacketStatus_Raw{ + Raw: &whgw_hrpb.RawPacketStatus{ + SignalRssiX2Neg: record.RawSignalRSSIX2Neg, + SnrPktX4: record.RawSnrPktX4, + }, + }, + } + } + return &whgw_hrpb.IPacketStatus{ + Choice: &whgw_hrpb.IPacketStatus_Parsed{ + Parsed: &whgw_hrpb.PacketStatus{ + SignalRssiNeg: float32(record.SignalRSSINeg), + SnrPkt: float32(record.SNR), + }, + }, + } + } +} + +func parseMAC(value string) []byte { + value = strings.TrimSpace(value) + if value == "" { + return nil + } + parts := strings.Split(value, ":") + result := make([]byte, 0, len(parts)) + for _, part := range parts { + if len(part) == 1 { + part = "0" + part + } + decoded, err := hex.DecodeString(part) + if err != nil || len(decoded) != 1 { + return nil + } + result = append(result, decoded[0]) + } + return result +} + +func (s *ReplayService) updateProgress(record models.MqttHeartRateRecord, processed int) { + s.mu.Lock() + defer s.mu.Unlock() + s.status.CurrentAddr = record.BeltAddr + s.status.CurrentHeartRate = record.HeartRate + s.status.LastPublishedAt = time.Now().UnixMilli() + s.status.LastTopic = record.Topic + s.status.ProcessedCount = processed +} + +func (s *ReplayService) complete() { + s.mu.Lock() + defer s.mu.Unlock() + s.cancel = nil + s.status.CompletedAt = time.Now().UnixMilli() + s.status.Running = false +} + +func (s *ReplayService) fail(err error) { + log.Printf("mqtt replay failed: %v", err) + s.mu.Lock() + defer s.mu.Unlock() + s.cancel = nil + s.status.CompletedAt = time.Now().UnixMilli() + s.status.ErrorMessage = err.Error() + s.status.Running = false +} + +func (s *ReplayService) stopWithContext() { + s.mu.Lock() + defer s.mu.Unlock() + s.cancel = nil + if s.status.Running { + s.status.Running = false + s.status.StoppedAt = time.Now().UnixMilli() + } +} + +func markReplayPayload(topic string, payload []byte) { + replayFingerprints.mu.Lock() + defer replayFingerprints.mu.Unlock() + + now := time.Now() + for key, expiresAt := range replayFingerprints.items { + if now.After(expiresAt) { + delete(replayFingerprints.items, key) + } + } + replayFingerprints.items[buildReplayFingerprint(topic, payload)] = now.Add(replayMarkerRetention) +} + +func consumeReplayPayload(topic string, payload []byte) bool { + replayFingerprints.mu.Lock() + defer replayFingerprints.mu.Unlock() + + now := time.Now() + key := buildReplayFingerprint(topic, payload) + expiresAt, ok := replayFingerprints.items[key] + if !ok { + return false + } + delete(replayFingerprints.items, key) + if now.After(expiresAt) { + return false + } + return true +} + +func buildReplayFingerprint(topic string, payload []byte) string { + sum := sha1.Sum(append([]byte(topic+"|"), payload...)) + return hex.EncodeToString(sum[:]) +} + +func normalizeReplayEndTime(endTime int64) int64 { + if endTime <= 0 { + return endTime + } + if endTime%1000 == 0 { + return endTime + 999 + } + return endTime +} diff --git a/routes/routes.go b/routes/routes.go index c9c98a2..341a30d 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -156,6 +156,9 @@ func SetupRouter() *gin.Engine { admin.GET("/system-debug/mqtt/status", systemDebugController.MqttStatus) admin.POST("/system-debug/mqtt/start", systemDebugController.StartMqtt) admin.POST("/system-debug/mqtt/stop", systemDebugController.StopMqtt) + admin.GET("/system-debug/mqtt/replay/status", systemDebugController.MqttReplayStatus) + admin.POST("/system-debug/mqtt/replay/start", systemDebugController.StartMqttReplay) + admin.POST("/system-debug/mqtt/replay/stop", systemDebugController.StopMqttReplay) admin.GET("/system-debug/mqtt/listener-config", systemDebugController.GetMqttListenerConfig) admin.PUT("/system-debug/mqtt/listener-config", systemDebugController.UpdateMqttListenerConfig)