package mqtt import ( "context" "crypto/sha1" "crypto/tls" "encoding/hex" "fmt" "hr_receiver/config" "hr_receiver/models" whgw_hrpb "hr_receiver/proto" "log" "strings" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "google.golang.org/protobuf/proto" "gorm.io/gorm" ) const ( maxReplayRecordCount = 20000 maxReplaySleepGap = 24 * time.Hour replayMarkerRetention = 10 * time.Minute ) type ReplayStartRequest struct { Addr string `json:"addr"` EndTime int64 `json:"endTime"` RegionID uint32 `json:"regionId"` StartTime int64 `json:"startTime"` } type ReplayStatus struct { Addr string `json:"addr"` CompletedAt int64 `json:"completedAt"` CurrentAddr string `json:"currentAddr"` CurrentHeartRate int `json:"currentHeartRate"` EndTime int64 `json:"endTime"` ErrorMessage string `json:"errorMessage"` LastPublishedAt int64 `json:"lastPublishedAt"` LastTopic string `json:"lastTopic"` ProcessedCount int `json:"processedCount"` RegionID uint32 `json:"regionId"` Running bool `json:"running"` StartedAt int64 `json:"startedAt"` StartTime int64 `json:"startTime"` StoppedAt int64 `json:"stoppedAt"` TotalCount int `json:"totalCount"` } type ReplayService struct { cfg config.MQTTConfig db *gorm.DB cancel context.CancelFunc mu sync.RWMutex status ReplayStatus } type replayFingerprintStore struct { items map[string]time.Time mu sync.Mutex } var ( globalReplayService = &ReplayService{} replayFingerprints = &replayFingerprintStore{items: map[string]time.Time{}} ) func InitReplayService(db *gorm.DB, cfg config.MQTTConfig) { globalReplayService = &ReplayService{ cfg: cfg, db: db, } } func GetReplayService() *ReplayService { return globalReplayService } func (s *ReplayService) Status() ReplayStatus { s.mu.RLock() defer s.mu.RUnlock() return s.status } func (s *ReplayService) Start(req ReplayStartRequest) (ReplayStatus, error) { req.Addr = strings.TrimSpace(req.Addr) if req.StartTime <= 0 || req.EndTime <= 0 { return ReplayStatus{}, fmt.Errorf("startTime and endTime are required") } req.EndTime = normalizeReplayEndTime(req.EndTime) if req.EndTime < req.StartTime { return ReplayStatus{}, fmt.Errorf("endTime must be greater than or equal to startTime") } if err := validateConfig(s.cfg); err != nil { return ReplayStatus{}, err } s.mu.Lock() if s.status.Running { current := s.status s.mu.Unlock() return current, fmt.Errorf("mqtt replay is already running") } s.mu.Unlock() query := s.db.Model(&models.MqttHeartRateRecord{}). Where("received_at >= ? AND received_at <= ?", req.StartTime, req.EndTime) if req.RegionID > 0 { query = query.Where("region_id = ?", req.RegionID) } if req.Addr != "" { query = query.Where("belt_addr LIKE ?", "%"+req.Addr+"%") } var total int64 if err := query.Count(&total).Error; err != nil { return ReplayStatus{}, err } if total == 0 { log.Printf("mqtt replay found no records start=%d end=%d region=%d addr=%q", req.StartTime, req.EndTime, req.RegionID, req.Addr) return ReplayStatus{}, fmt.Errorf("no heart rate records found in the selected range") } if total > maxReplayRecordCount { return ReplayStatus{}, fmt.Errorf("replay record count exceeds limit: %d > %d", total, maxReplayRecordCount) } var records []models.MqttHeartRateRecord if err := query.Order("received_at ASC, id ASC").Find(&records).Error; err != nil { return ReplayStatus{}, err } status := ReplayStatus{ Addr: req.Addr, EndTime: req.EndTime, RegionID: req.RegionID, Running: true, StartedAt: time.Now().UnixMilli(), StartTime: req.StartTime, TotalCount: len(records), } ctx, cancel := context.WithCancel(context.Background()) s.mu.Lock() s.cancel = cancel s.status = status s.mu.Unlock() go s.run(ctx, records) return status, nil } func (s *ReplayService) Stop() ReplayStatus { s.mu.Lock() cancel := s.cancel if cancel != nil { cancel() s.cancel = nil } if s.status.Running { s.status.Running = false s.status.StoppedAt = time.Now().UnixMilli() } status := s.status s.mu.Unlock() return status } func (s *ReplayService) run(ctx context.Context, records []models.MqttHeartRateRecord) { client, err := s.connectReplayClient() if err != nil { s.fail(err) return } defer func() { if client.IsConnected() { client.Disconnect(250) } }() for idx, record := range records { if idx > 0 { wait := time.Duration(record.ReceivedAt-records[idx-1].ReceivedAt) * time.Millisecond if wait < 0 { wait = 0 } if wait > maxReplaySleepGap { wait = maxReplaySleepGap } timer := time.NewTimer(wait) select { case <-ctx.Done(): timer.Stop() s.stopWithContext() return case <-timer.C: } } if err := s.publishRecord(client, record); err != nil { s.fail(err) return } s.updateProgress(record, idx+1) } s.complete() } func (s *ReplayService) connectReplayClient() (mqtt.Client, error) { opts := mqtt.NewClientOptions() scheme := "tcp" if s.cfg.UseTLS { scheme = "ssl" opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}) } broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port) username := strings.TrimSpace(s.cfg.GWUsername) password := s.cfg.GWPassword if username == "" { username = s.cfg.Username password = s.cfg.Password } opts.AddBroker(broker) opts.SetClientID(fmt.Sprintf("%s-replay-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano())) opts.SetUsername(username) opts.SetPassword(password) opts.SetKeepAlive(60 * time.Second) opts.SetAutoReconnect(false) opts.SetConnectRetry(false) client := mqtt.NewClient(opts) token := client.Connect() if !token.WaitTimeout(15 * time.Second) { return nil, fmt.Errorf("mqtt replay connect timeout") } if err := token.Error(); err != nil { return nil, err } log.Printf("mqtt replay connected broker=%s username=%s", broker, username) return client, nil } func (s *ReplayService) publishRecord(client mqtt.Client, record models.MqttHeartRateRecord) error { payloadMessage := buildReplayHeartRateMessage(record) payload, err := proto.Marshal(payloadMessage) if err != nil { return fmt.Errorf("marshal replay heart rate payload: %w", err) } topic := strings.TrimSpace(record.Topic) if topic == "" { topic = fmt.Sprintf("/whgw/v2/region/%d/measurement/band/%d/hr", record.RegionID, record.BandID) } markReplayPayload(topic, payload) token := client.Publish(topic, byte(s.cfg.QoS), false, payload) if !token.WaitTimeout(10 * time.Second) { return fmt.Errorf("mqtt replay publish timeout") } if err := token.Error(); err != nil { return err } log.Printf("mqtt replay published region=%d addr=%s hr=%d packet=%d topic=%s", record.RegionID, record.BeltAddr, record.HeartRate, record.PacketNum, topic) return nil } func buildReplayHeartRateMessage(record models.MqttHeartRateRecord) *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg { return &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg{ Choice: &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement{ NtfHrMeasurement: &whgw_hrpb.HrMeasurement{ HrPacket: &whgw_hrpb.HrPacket{ Hr: uint32(record.HeartRate), Id: record.BandID, PacketNum: record.PacketNum, Status: &whgw_hrpb.StatusFlag{ Battery: record.Battery, HrConfidence: whgw_hrpb.HrConfidence(record.HrConfidence), IsActive: record.IsActive, IsOnSkin: record.IsOnSkin, }, }, PacketStatus: buildReplayPacketStatus(record), GatewayInfo: &whgw_hrpb.GatewayInfo{ Extra: &whgw_hrpb.GatewayInfoExtra{ ActiveUplink: whgw_hrpb.NetworkUplinkKind(record.GatewayActiveUplink), CellularModem: &whgw_hrpb.CellularModemInfo{ Ber: record.GatewayCellularBER, Imei: record.GatewayCellularIMEI, Rssi: record.GatewayCellularRSSI, }, SchemaVersion: record.GatewaySchemaVersion, }, GatewayMac: parseMAC(record.GatewayMAC), RegionId: record.RegionID, }, HubInfo: &whgw_hrpb.HubInfo{ BusId: record.HubBusID, SubDevId: record.HubSubDevID, RadioParameters: &whgw_hrpb.LoRaParameters{ Bw: whgw_hrpb.LoRaBW(record.HubRadioBW), FrequencyMhz: float32(record.HubRadioFrequencyMHz), Sf: record.HubRadioSF, }, }, }, }, } } func buildReplayPacketStatus(record models.MqttHeartRateRecord) *whgw_hrpb.IPacketStatus { switch strings.TrimSpace(record.PacketStatusSource) { case "raw": return &whgw_hrpb.IPacketStatus{ Choice: &whgw_hrpb.IPacketStatus_Raw{ Raw: &whgw_hrpb.RawPacketStatus{ SignalRssiX2Neg: record.RawSignalRSSIX2Neg, SnrPktX4: record.RawSnrPktX4, }, }, } case "parsed": return &whgw_hrpb.IPacketStatus{ Choice: &whgw_hrpb.IPacketStatus_Parsed{ Parsed: &whgw_hrpb.PacketStatus{ SignalRssiNeg: float32(record.SignalRSSINeg), SnrPkt: float32(record.SNR), }, }, } default: if record.RawSignalRSSIX2Neg > 0 || record.RawSnrPktX4 != 0 { return &whgw_hrpb.IPacketStatus{ Choice: &whgw_hrpb.IPacketStatus_Raw{ Raw: &whgw_hrpb.RawPacketStatus{ SignalRssiX2Neg: record.RawSignalRSSIX2Neg, SnrPktX4: record.RawSnrPktX4, }, }, } } return &whgw_hrpb.IPacketStatus{ Choice: &whgw_hrpb.IPacketStatus_Parsed{ Parsed: &whgw_hrpb.PacketStatus{ SignalRssiNeg: float32(record.SignalRSSINeg), SnrPkt: float32(record.SNR), }, }, } } } func parseMAC(value string) []byte { value = strings.TrimSpace(value) if value == "" { return nil } parts := strings.Split(value, ":") result := make([]byte, 0, len(parts)) for _, part := range parts { if len(part) == 1 { part = "0" + part } decoded, err := hex.DecodeString(part) if err != nil || len(decoded) != 1 { return nil } result = append(result, decoded[0]) } return result } func (s *ReplayService) updateProgress(record models.MqttHeartRateRecord, processed int) { s.mu.Lock() defer s.mu.Unlock() s.status.CurrentAddr = record.BeltAddr s.status.CurrentHeartRate = record.HeartRate s.status.LastPublishedAt = time.Now().UnixMilli() s.status.LastTopic = record.Topic s.status.ProcessedCount = processed } func (s *ReplayService) complete() { s.mu.Lock() defer s.mu.Unlock() s.cancel = nil s.status.CompletedAt = time.Now().UnixMilli() s.status.Running = false } func (s *ReplayService) fail(err error) { log.Printf("mqtt replay failed: %v", err) s.mu.Lock() defer s.mu.Unlock() s.cancel = nil s.status.CompletedAt = time.Now().UnixMilli() s.status.ErrorMessage = err.Error() s.status.Running = false } func (s *ReplayService) stopWithContext() { s.mu.Lock() defer s.mu.Unlock() s.cancel = nil if s.status.Running { s.status.Running = false s.status.StoppedAt = time.Now().UnixMilli() } } func markReplayPayload(topic string, payload []byte) { replayFingerprints.mu.Lock() defer replayFingerprints.mu.Unlock() now := time.Now() for key, expiresAt := range replayFingerprints.items { if now.After(expiresAt) { delete(replayFingerprints.items, key) } } replayFingerprints.items[buildReplayFingerprint(topic, payload)] = now.Add(replayMarkerRetention) } func consumeReplayPayload(topic string, payload []byte) bool { replayFingerprints.mu.Lock() defer replayFingerprints.mu.Unlock() now := time.Now() key := buildReplayFingerprint(topic, payload) expiresAt, ok := replayFingerprints.items[key] if !ok { return false } delete(replayFingerprints.items, key) if now.After(expiresAt) { return false } return true } func buildReplayFingerprint(topic string, payload []byte) string { sum := sha1.Sum(append([]byte(topic+"|"), payload...)) return hex.EncodeToString(sum[:]) } func normalizeReplayEndTime(endTime int64) int64 { if endTime <= 0 { return endTime } if endTime%1000 == 0 { return endTime + 999 } return endTime }