456 lines
12 KiB
Go
456 lines
12 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"crypto/tls"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hr_receiver/config"
|
|
"hr_receiver/models"
|
|
whgw_hrpb "hr_receiver/proto"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"google.golang.org/protobuf/proto"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
const (
|
|
maxReplayRecordCount = 20000
|
|
maxReplaySleepGap = 24 * time.Hour
|
|
replayMarkerRetention = 10 * time.Minute
|
|
)
|
|
|
|
type ReplayStartRequest struct {
|
|
Addr string `json:"addr"`
|
|
EndTime int64 `json:"endTime"`
|
|
RegionID uint32 `json:"regionId"`
|
|
StartTime int64 `json:"startTime"`
|
|
}
|
|
|
|
type ReplayStatus struct {
|
|
Addr string `json:"addr"`
|
|
CompletedAt int64 `json:"completedAt"`
|
|
CurrentAddr string `json:"currentAddr"`
|
|
CurrentHeartRate int `json:"currentHeartRate"`
|
|
EndTime int64 `json:"endTime"`
|
|
ErrorMessage string `json:"errorMessage"`
|
|
LastPublishedAt int64 `json:"lastPublishedAt"`
|
|
LastTopic string `json:"lastTopic"`
|
|
ProcessedCount int `json:"processedCount"`
|
|
RegionID uint32 `json:"regionId"`
|
|
Running bool `json:"running"`
|
|
StartedAt int64 `json:"startedAt"`
|
|
StartTime int64 `json:"startTime"`
|
|
StoppedAt int64 `json:"stoppedAt"`
|
|
TotalCount int `json:"totalCount"`
|
|
}
|
|
|
|
type ReplayService struct {
|
|
cfg config.MQTTConfig
|
|
db *gorm.DB
|
|
cancel context.CancelFunc
|
|
mu sync.RWMutex
|
|
status ReplayStatus
|
|
}
|
|
|
|
type replayFingerprintStore struct {
|
|
items map[string]time.Time
|
|
mu sync.Mutex
|
|
}
|
|
|
|
var (
|
|
globalReplayService = &ReplayService{}
|
|
replayFingerprints = &replayFingerprintStore{items: map[string]time.Time{}}
|
|
)
|
|
|
|
func InitReplayService(db *gorm.DB, cfg config.MQTTConfig) {
|
|
globalReplayService = &ReplayService{
|
|
cfg: cfg,
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
func GetReplayService() *ReplayService {
|
|
return globalReplayService
|
|
}
|
|
|
|
func (s *ReplayService) Status() ReplayStatus {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.status
|
|
}
|
|
|
|
func (s *ReplayService) Start(req ReplayStartRequest) (ReplayStatus, error) {
|
|
req.Addr = strings.TrimSpace(req.Addr)
|
|
if req.StartTime <= 0 || req.EndTime <= 0 {
|
|
return ReplayStatus{}, fmt.Errorf("startTime and endTime are required")
|
|
}
|
|
req.EndTime = normalizeReplayEndTime(req.EndTime)
|
|
if req.EndTime < req.StartTime {
|
|
return ReplayStatus{}, fmt.Errorf("endTime must be greater than or equal to startTime")
|
|
}
|
|
if err := validateConfig(s.cfg); err != nil {
|
|
return ReplayStatus{}, err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.status.Running {
|
|
current := s.status
|
|
s.mu.Unlock()
|
|
return current, fmt.Errorf("mqtt replay is already running")
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
query := s.db.Model(&models.MqttHeartRateRecord{}).
|
|
Where("received_at >= ? AND received_at <= ?", req.StartTime, req.EndTime)
|
|
if req.RegionID > 0 {
|
|
query = query.Where("region_id = ?", req.RegionID)
|
|
}
|
|
if req.Addr != "" {
|
|
query = query.Where("belt_addr LIKE ?", "%"+req.Addr+"%")
|
|
}
|
|
|
|
var total int64
|
|
if err := query.Count(&total).Error; err != nil {
|
|
return ReplayStatus{}, err
|
|
}
|
|
if total == 0 {
|
|
log.Printf("mqtt replay found no records start=%d end=%d region=%d addr=%q", req.StartTime, req.EndTime, req.RegionID, req.Addr)
|
|
return ReplayStatus{}, fmt.Errorf("no heart rate records found in the selected range")
|
|
}
|
|
if total > maxReplayRecordCount {
|
|
return ReplayStatus{}, fmt.Errorf("replay record count exceeds limit: %d > %d", total, maxReplayRecordCount)
|
|
}
|
|
|
|
var records []models.MqttHeartRateRecord
|
|
if err := query.Order("received_at ASC, id ASC").Find(&records).Error; err != nil {
|
|
return ReplayStatus{}, err
|
|
}
|
|
|
|
status := ReplayStatus{
|
|
Addr: req.Addr,
|
|
EndTime: req.EndTime,
|
|
RegionID: req.RegionID,
|
|
Running: true,
|
|
StartedAt: time.Now().UnixMilli(),
|
|
StartTime: req.StartTime,
|
|
TotalCount: len(records),
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
s.mu.Lock()
|
|
s.cancel = cancel
|
|
s.status = status
|
|
s.mu.Unlock()
|
|
|
|
go s.run(ctx, records)
|
|
return status, nil
|
|
}
|
|
|
|
func (s *ReplayService) Stop() ReplayStatus {
|
|
s.mu.Lock()
|
|
cancel := s.cancel
|
|
if cancel != nil {
|
|
cancel()
|
|
s.cancel = nil
|
|
}
|
|
if s.status.Running {
|
|
s.status.Running = false
|
|
s.status.StoppedAt = time.Now().UnixMilli()
|
|
}
|
|
status := s.status
|
|
s.mu.Unlock()
|
|
return status
|
|
}
|
|
|
|
func (s *ReplayService) run(ctx context.Context, records []models.MqttHeartRateRecord) {
|
|
client, err := s.connectReplayClient()
|
|
if err != nil {
|
|
s.fail(err)
|
|
return
|
|
}
|
|
defer func() {
|
|
if client.IsConnected() {
|
|
client.Disconnect(250)
|
|
}
|
|
}()
|
|
|
|
for idx, record := range records {
|
|
if idx > 0 {
|
|
wait := time.Duration(record.ReceivedAt-records[idx-1].ReceivedAt) * time.Millisecond
|
|
if wait < 0 {
|
|
wait = 0
|
|
}
|
|
if wait > maxReplaySleepGap {
|
|
wait = maxReplaySleepGap
|
|
}
|
|
timer := time.NewTimer(wait)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
s.stopWithContext()
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
if err := s.publishRecord(client, record); err != nil {
|
|
s.fail(err)
|
|
return
|
|
}
|
|
s.updateProgress(record, idx+1)
|
|
}
|
|
|
|
s.complete()
|
|
}
|
|
|
|
func (s *ReplayService) connectReplayClient() (mqtt.Client, error) {
|
|
opts := mqtt.NewClientOptions()
|
|
scheme := "tcp"
|
|
if s.cfg.UseTLS {
|
|
scheme = "ssl"
|
|
opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})
|
|
}
|
|
broker := fmt.Sprintf("%s://%s:%d", scheme, s.cfg.Host, s.cfg.Port)
|
|
username := strings.TrimSpace(s.cfg.GWUsername)
|
|
password := s.cfg.GWPassword
|
|
if username == "" {
|
|
username = s.cfg.Username
|
|
password = s.cfg.Password
|
|
}
|
|
opts.AddBroker(broker)
|
|
opts.SetClientID(fmt.Sprintf("%s-replay-%d", s.cfg.ClientIDPrefix, time.Now().UnixNano()))
|
|
opts.SetUsername(username)
|
|
opts.SetPassword(password)
|
|
opts.SetKeepAlive(60 * time.Second)
|
|
opts.SetAutoReconnect(false)
|
|
opts.SetConnectRetry(false)
|
|
|
|
client := mqtt.NewClient(opts)
|
|
token := client.Connect()
|
|
if !token.WaitTimeout(15 * time.Second) {
|
|
return nil, fmt.Errorf("mqtt replay connect timeout")
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
log.Printf("mqtt replay connected broker=%s username=%s", broker, username)
|
|
return client, nil
|
|
}
|
|
|
|
func (s *ReplayService) publishRecord(client mqtt.Client, record models.MqttHeartRateRecord) error {
|
|
payloadMessage := buildReplayHeartRateMessage(record)
|
|
payload, err := proto.Marshal(payloadMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal replay heart rate payload: %w", err)
|
|
}
|
|
topic := strings.TrimSpace(record.Topic)
|
|
if topic == "" {
|
|
topic = fmt.Sprintf("/whgw/v2/region/%d/measurement/band/%d/hr", record.RegionID, record.BandID)
|
|
}
|
|
markReplayPayload(topic, payload)
|
|
token := client.Publish(topic, byte(s.cfg.QoS), false, payload)
|
|
if !token.WaitTimeout(10 * time.Second) {
|
|
return fmt.Errorf("mqtt replay publish timeout")
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
return err
|
|
}
|
|
log.Printf("mqtt replay published region=%d addr=%s hr=%d packet=%d topic=%s", record.RegionID, record.BeltAddr, record.HeartRate, record.PacketNum, topic)
|
|
return nil
|
|
}
|
|
|
|
func buildReplayHeartRateMessage(record models.MqttHeartRateRecord) *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg {
|
|
return &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg{
|
|
Choice: &whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement{
|
|
NtfHrMeasurement: &whgw_hrpb.HrMeasurement{
|
|
HrPacket: &whgw_hrpb.HrPacket{
|
|
Hr: uint32(record.HeartRate),
|
|
Id: record.BandID,
|
|
PacketNum: record.PacketNum,
|
|
Status: &whgw_hrpb.StatusFlag{
|
|
Battery: record.Battery,
|
|
HrConfidence: whgw_hrpb.HrConfidence(record.HrConfidence),
|
|
IsActive: record.IsActive,
|
|
IsOnSkin: record.IsOnSkin,
|
|
},
|
|
},
|
|
PacketStatus: buildReplayPacketStatus(record),
|
|
GatewayInfo: &whgw_hrpb.GatewayInfo{
|
|
Extra: &whgw_hrpb.GatewayInfoExtra{
|
|
ActiveUplink: whgw_hrpb.NetworkUplinkKind(record.GatewayActiveUplink),
|
|
CellularModem: &whgw_hrpb.CellularModemInfo{
|
|
Ber: record.GatewayCellularBER,
|
|
Imei: record.GatewayCellularIMEI,
|
|
Rssi: record.GatewayCellularRSSI,
|
|
},
|
|
SchemaVersion: record.GatewaySchemaVersion,
|
|
},
|
|
GatewayMac: parseMAC(record.GatewayMAC),
|
|
RegionId: record.RegionID,
|
|
},
|
|
HubInfo: &whgw_hrpb.HubInfo{
|
|
BusId: record.HubBusID,
|
|
SubDevId: record.HubSubDevID,
|
|
RadioParameters: &whgw_hrpb.LoRaParameters{
|
|
Bw: whgw_hrpb.LoRaBW(record.HubRadioBW),
|
|
FrequencyMhz: float32(record.HubRadioFrequencyMHz),
|
|
Sf: record.HubRadioSF,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func buildReplayPacketStatus(record models.MqttHeartRateRecord) *whgw_hrpb.IPacketStatus {
|
|
switch strings.TrimSpace(record.PacketStatusSource) {
|
|
case "raw":
|
|
return &whgw_hrpb.IPacketStatus{
|
|
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
|
Raw: &whgw_hrpb.RawPacketStatus{
|
|
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
|
SnrPktX4: record.RawSnrPktX4,
|
|
},
|
|
},
|
|
}
|
|
case "parsed":
|
|
return &whgw_hrpb.IPacketStatus{
|
|
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
|
Parsed: &whgw_hrpb.PacketStatus{
|
|
SignalRssiNeg: float32(record.SignalRSSINeg),
|
|
SnrPkt: float32(record.SNR),
|
|
},
|
|
},
|
|
}
|
|
default:
|
|
if record.RawSignalRSSIX2Neg > 0 || record.RawSnrPktX4 != 0 {
|
|
return &whgw_hrpb.IPacketStatus{
|
|
Choice: &whgw_hrpb.IPacketStatus_Raw{
|
|
Raw: &whgw_hrpb.RawPacketStatus{
|
|
SignalRssiX2Neg: record.RawSignalRSSIX2Neg,
|
|
SnrPktX4: record.RawSnrPktX4,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
return &whgw_hrpb.IPacketStatus{
|
|
Choice: &whgw_hrpb.IPacketStatus_Parsed{
|
|
Parsed: &whgw_hrpb.PacketStatus{
|
|
SignalRssiNeg: float32(record.SignalRSSINeg),
|
|
SnrPkt: float32(record.SNR),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseMAC(value string) []byte {
|
|
value = strings.TrimSpace(value)
|
|
if value == "" {
|
|
return nil
|
|
}
|
|
parts := strings.Split(value, ":")
|
|
result := make([]byte, 0, len(parts))
|
|
for _, part := range parts {
|
|
if len(part) == 1 {
|
|
part = "0" + part
|
|
}
|
|
decoded, err := hex.DecodeString(part)
|
|
if err != nil || len(decoded) != 1 {
|
|
return nil
|
|
}
|
|
result = append(result, decoded[0])
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *ReplayService) updateProgress(record models.MqttHeartRateRecord, processed int) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.status.CurrentAddr = record.BeltAddr
|
|
s.status.CurrentHeartRate = record.HeartRate
|
|
s.status.LastPublishedAt = time.Now().UnixMilli()
|
|
s.status.LastTopic = record.Topic
|
|
s.status.ProcessedCount = processed
|
|
}
|
|
|
|
func (s *ReplayService) complete() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.cancel = nil
|
|
s.status.CompletedAt = time.Now().UnixMilli()
|
|
s.status.Running = false
|
|
}
|
|
|
|
func (s *ReplayService) fail(err error) {
|
|
log.Printf("mqtt replay failed: %v", err)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.cancel = nil
|
|
s.status.CompletedAt = time.Now().UnixMilli()
|
|
s.status.ErrorMessage = err.Error()
|
|
s.status.Running = false
|
|
}
|
|
|
|
func (s *ReplayService) stopWithContext() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.cancel = nil
|
|
if s.status.Running {
|
|
s.status.Running = false
|
|
s.status.StoppedAt = time.Now().UnixMilli()
|
|
}
|
|
}
|
|
|
|
func markReplayPayload(topic string, payload []byte) {
|
|
replayFingerprints.mu.Lock()
|
|
defer replayFingerprints.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
for key, expiresAt := range replayFingerprints.items {
|
|
if now.After(expiresAt) {
|
|
delete(replayFingerprints.items, key)
|
|
}
|
|
}
|
|
replayFingerprints.items[buildReplayFingerprint(topic, payload)] = now.Add(replayMarkerRetention)
|
|
}
|
|
|
|
func consumeReplayPayload(topic string, payload []byte) bool {
|
|
replayFingerprints.mu.Lock()
|
|
defer replayFingerprints.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
key := buildReplayFingerprint(topic, payload)
|
|
expiresAt, ok := replayFingerprints.items[key]
|
|
if !ok {
|
|
return false
|
|
}
|
|
delete(replayFingerprints.items, key)
|
|
if now.After(expiresAt) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func buildReplayFingerprint(topic string, payload []byte) string {
|
|
sum := sha1.Sum(append([]byte(topic+"|"), payload...))
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
func normalizeReplayEndTime(endTime int64) int64 {
|
|
if endTime <= 0 {
|
|
return endTime
|
|
}
|
|
if endTime%1000 == 0 {
|
|
return endTime + 999
|
|
}
|
|
return endTime
|
|
}
|