diff --git a/mqtt/debug_service.go b/mqtt/debug_service.go index a1207bf..d9aee91 100644 --- a/mqtt/debug_service.go +++ b/mqtt/debug_service.go @@ -245,8 +245,15 @@ func (s *DebugService) maybePersist(record interface{}) { if !enabled { return } - if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil { - log.Printf("mqtt debug persist failed type=%T err=%v", record, err) + tx := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record) + if tx.Error != nil { + log.Printf("mqtt debug persist failed type=%T err=%v", record, tx.Error) + return + } + if tx.RowsAffected > 0 { + //logPersistResult("mqtt debug", "inserted", record) + } else { + //logPersistResult("mqtt debug", "skipped duplicate", record) } } diff --git a/mqtt/listener.go b/mqtt/listener.go index 172a809..9111173 100644 --- a/mqtt/listener.go +++ b/mqtt/listener.go @@ -231,7 +231,16 @@ func (l *Listener) persistRecord(record interface{}) error { case *models.MqttTrainingSessionRecord: return l.persistTrainingSession(r) default: - return l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error + tx := l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record) + if tx.Error != nil { + return tx.Error + } + if tx.RowsAffected > 0 { + //logPersistResult("mqtt listener", "inserted", record) + } else { + //logPersistResult("mqtt listener", "skipped duplicate", record) + } + return nil } } @@ -255,10 +264,34 @@ func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionReco assignments["ended_at"] = *record.EndedAt } - return l.db.Clauses(clause.OnConflict{ + tx := l.db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "identifier"}}, DoUpdates: clause.Assignments(assignments), - }).Create(record).Error + }).Create(record) + if tx.Error != nil { + return tx.Error + } + if tx.RowsAffected > 0 { + //logPersistResult("mqtt listener", "upserted", record) + } else { + //logPersistResult("mqtt listener", "skipped duplicate", record) + } + return nil +} + +func logPersistResult(source, action string, record interface{}) { + switch r := record.(type) { + case *models.MqttHeartRateRecord: + log.Printf("%s %s heart_rate region=%d addr=%s hr=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.HeartRate, r.PacketNum, r.Identifier, r.Topic) + case *models.MqttStepCountRecord: + log.Printf("%s %s step_count region=%d addr=%s steps=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.StepCount, r.PacketNum, r.Identifier, r.Topic) + case *models.MqttGatewayStatusRecord: + log.Printf("%s %s gateway_status region=%d gateway=%s rxCount=%d uptimeMs=%d identifier=%s topic=%s", source, action, r.RegionID, r.GatewayMAC, r.RxCount, r.UptimeMs, r.Identifier, r.Topic) + case *models.MqttTrainingSessionRecord: + log.Printf("%s %s training_session region=%d testId=%s event=%s identifier=%s publishedAt=%d topic=%s", source, action, r.RegionID, r.TestID, r.EventType, r.Identifier, r.PublishedAt, r.Topic) + default: + log.Printf("%s %s record type=%T", source, action, record) + } } func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord { @@ -274,7 +307,7 @@ func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, no beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId()) return models.MqttHeartRateRecord{ - Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()), + Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now), Topic: topic, RegionID: regionID, GatewayMAC: gatewayMAC, @@ -318,7 +351,7 @@ func buildStepCountRecord(measurement *whgw_hrpb.StepCountMeasurement, topic str beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId()) return models.MqttStepCountRecord{ - Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()), + Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now), Topic: topic, RegionID: regionID, GatewayMAC: gatewayMAC,