fix: mqtt data identify.
This commit is contained in:
@@ -245,8 +245,15 @@ func (s *DebugService) maybePersist(record interface{}) {
|
|||||||
if !enabled {
|
if !enabled {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil {
|
tx := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||||
log.Printf("mqtt debug persist failed type=%T err=%v", record, err)
|
if tx.Error != nil {
|
||||||
|
log.Printf("mqtt debug persist failed type=%T err=%v", record, tx.Error)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt debug", "inserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt debug", "skipped duplicate", record)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+38
-5
@@ -231,7 +231,16 @@ func (l *Listener) persistRecord(record interface{}) error {
|
|||||||
case *models.MqttTrainingSessionRecord:
|
case *models.MqttTrainingSessionRecord:
|
||||||
return l.persistTrainingSession(r)
|
return l.persistTrainingSession(r)
|
||||||
default:
|
default:
|
||||||
return l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error
|
tx := l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt listener", "inserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -255,10 +264,34 @@ func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionReco
|
|||||||
assignments["ended_at"] = *record.EndedAt
|
assignments["ended_at"] = *record.EndedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.db.Clauses(clause.OnConflict{
|
tx := l.db.Clauses(clause.OnConflict{
|
||||||
Columns: []clause.Column{{Name: "identifier"}},
|
Columns: []clause.Column{{Name: "identifier"}},
|
||||||
DoUpdates: clause.Assignments(assignments),
|
DoUpdates: clause.Assignments(assignments),
|
||||||
}).Create(record).Error
|
}).Create(record)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
if tx.RowsAffected > 0 {
|
||||||
|
//logPersistResult("mqtt listener", "upserted", record)
|
||||||
|
} else {
|
||||||
|
//logPersistResult("mqtt listener", "skipped duplicate", record)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func logPersistResult(source, action string, record interface{}) {
|
||||||
|
switch r := record.(type) {
|
||||||
|
case *models.MqttHeartRateRecord:
|
||||||
|
log.Printf("%s %s heart_rate region=%d addr=%s hr=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.HeartRate, r.PacketNum, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttStepCountRecord:
|
||||||
|
log.Printf("%s %s step_count region=%d addr=%s steps=%d packet=%d identifier=%s topic=%s", source, action, r.RegionID, r.BeltAddr, r.StepCount, r.PacketNum, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttGatewayStatusRecord:
|
||||||
|
log.Printf("%s %s gateway_status region=%d gateway=%s rxCount=%d uptimeMs=%d identifier=%s topic=%s", source, action, r.RegionID, r.GatewayMAC, r.RxCount, r.UptimeMs, r.Identifier, r.Topic)
|
||||||
|
case *models.MqttTrainingSessionRecord:
|
||||||
|
log.Printf("%s %s training_session region=%d testId=%s event=%s identifier=%s publishedAt=%d topic=%s", source, action, r.RegionID, r.TestID, r.EventType, r.Identifier, r.PublishedAt, r.Topic)
|
||||||
|
default:
|
||||||
|
log.Printf("%s %s record type=%T", source, action, record)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
|
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
|
||||||
@@ -274,7 +307,7 @@ func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, no
|
|||||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||||
|
|
||||||
return models.MqttHeartRateRecord{
|
return models.MqttHeartRateRecord{
|
||||||
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
RegionID: regionID,
|
RegionID: regionID,
|
||||||
GatewayMAC: gatewayMAC,
|
GatewayMAC: gatewayMAC,
|
||||||
@@ -318,7 +351,7 @@ func buildStepCountRecord(measurement *whgw_hrpb.StepCountMeasurement, topic str
|
|||||||
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
|
||||||
|
|
||||||
return models.MqttStepCountRecord{
|
return models.MqttStepCountRecord{
|
||||||
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
|
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), now),
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
RegionID: regionID,
|
RegionID: regionID,
|
||||||
GatewayMAC: gatewayMAC,
|
GatewayMAC: gatewayMAC,
|
||||||
|
|||||||
Reference in New Issue
Block a user