refactor: mqtt data model.

This commit is contained in:
2026-05-09 15:19:15 +08:00
parent 85f28b5660
commit cb7ed2c4cc
4 changed files with 1003 additions and 449 deletions
+93 -43
View File
@@ -23,6 +23,14 @@ const (
defaultWorkers = 4
)
type packetStatusSnapshot struct {
source string
signalRSSINeg float64
snr float64
rawSignalRSSIX2Neg uint32
rawSnrPktX4 int32
}
type Listener struct {
db *gorm.DB
cfg config.MQTTConfig
@@ -245,75 +253,107 @@ func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionReco
}
func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord {
regionID := measurement.GetGatewayInfo().GetRegionId()
gatewayInfo := measurement.GetGatewayInfo()
hubInfo := measurement.GetHubInfo()
regionID := gatewayInfo.GetRegionId()
if regionID == 0 {
regionID = parseRegionFromTopic(topic)
}
gatewayMAC := formatMAC(measurement.GetGatewayInfo().GetGatewayMac())
gatewayMAC := formatMAC(gatewayInfo.GetGatewayMac())
packet := measurement.GetHrPacket()
rssi, snr := parsePacketStatus(measurement.GetPacketStatus())
status := parsePacketStatus(measurement.GetPacketStatus())
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
return models.MqttHeartRateRecord{
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
Topic: topic,
RegionID: regionID,
GatewayMAC: gatewayMAC,
BandID: packet.GetId(),
BeltAddr: beltAddr,
PacketNum: packet.GetPacketNum(),
HeartRate: int(packet.GetHr()),
HrConfidence: int(packet.GetStatus().GetHrConfidence().Number()),
IsActive: packet.GetStatus().GetIsActive(),
IsOnSkin: packet.GetStatus().GetIsOnSkin(),
Battery: packet.GetStatus().GetBattery(),
SignalRSSINeg: rssi,
SNR: snr,
HubBusID: measurement.GetHubInfo().GetBusId(),
HubSubDevID: measurement.GetHubInfo().GetSubDevId(),
ReceivedAt: now,
Identifier: fmt.Sprintf("hr:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
Topic: topic,
RegionID: regionID,
GatewayMAC: gatewayMAC,
GatewaySchemaVersion: gatewayInfo.GetExtra().GetSchemaVersion(),
GatewayActiveUplink: int32(gatewayInfo.GetExtra().GetActiveUplink().Number()),
GatewayCellularIMEI: gatewayInfo.GetExtra().GetCellularModem().GetImei(),
GatewayCellularRSSI: gatewayInfo.GetExtra().GetCellularModem().GetRssi(),
GatewayCellularBER: gatewayInfo.GetExtra().GetCellularModem().GetBer(),
BandID: packet.GetId(),
BeltAddr: beltAddr,
PacketNum: packet.GetPacketNum(),
HeartRate: int(packet.GetHr()),
HrConfidence: int(packet.GetStatus().GetHrConfidence().Number()),
IsActive: packet.GetStatus().GetIsActive(),
IsOnSkin: packet.GetStatus().GetIsOnSkin(),
Battery: packet.GetStatus().GetBattery(),
PacketStatusSource: status.source,
SignalRSSINeg: status.signalRSSINeg,
SNR: status.snr,
RawSignalRSSIX2Neg: status.rawSignalRSSIX2Neg,
RawSnrPktX4: status.rawSnrPktX4,
HubBusID: hubInfo.GetBusId(),
HubSubDevID: hubInfo.GetSubDevId(),
HubRadioBW: int32(hubInfo.GetRadioParameters().GetBw().Number()),
HubRadioSF: hubInfo.GetRadioParameters().GetSf(),
HubRadioFrequencyMHz: float64(hubInfo.GetRadioParameters().GetFrequencyMhz()),
ReceivedAt: now,
}
}
func buildStepCountRecord(measurement *whgw_hrpb.StepCountMeasurement, topic string, now int64) models.MqttStepCountRecord {
regionID := measurement.GetGatewayInfo().GetRegionId()
gatewayInfo := measurement.GetGatewayInfo()
hubInfo := measurement.GetHubInfo()
regionID := gatewayInfo.GetRegionId()
if regionID == 0 {
regionID = parseRegionFromTopic(topic)
}
gatewayMAC := formatMAC(measurement.GetGatewayInfo().GetGatewayMac())
gatewayMAC := formatMAC(gatewayInfo.GetGatewayMac())
packet := measurement.GetStepCountPacket()
rssi, snr := parsePacketStatus(measurement.GetPacketStatus())
status := parsePacketStatus(measurement.GetPacketStatus())
beltAddr := fmt.Sprintf("%d-%d", regionID, packet.GetId())
return models.MqttStepCountRecord{
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
Topic: topic,
RegionID: regionID,
GatewayMAC: gatewayMAC,
BandID: packet.GetId(),
BeltAddr: beltAddr,
PacketNum: packet.GetPacketNum(),
StepCount: packet.GetStepCount(),
SignalRSSINeg: rssi,
SNR: snr,
HubBusID: measurement.GetHubInfo().GetBusId(),
HubSubDevID: measurement.GetHubInfo().GetSubDevId(),
ReceivedAt: now,
Identifier: fmt.Sprintf("step:%d:%s:%d:%d", regionID, gatewayMAC, packet.GetId(), packet.GetPacketNum()),
Topic: topic,
RegionID: regionID,
GatewayMAC: gatewayMAC,
GatewaySchemaVersion: gatewayInfo.GetExtra().GetSchemaVersion(),
GatewayActiveUplink: int32(gatewayInfo.GetExtra().GetActiveUplink().Number()),
GatewayCellularIMEI: gatewayInfo.GetExtra().GetCellularModem().GetImei(),
GatewayCellularRSSI: gatewayInfo.GetExtra().GetCellularModem().GetRssi(),
GatewayCellularBER: gatewayInfo.GetExtra().GetCellularModem().GetBer(),
BandID: packet.GetId(),
BeltAddr: beltAddr,
PacketNum: packet.GetPacketNum(),
StepCount: packet.GetStepCount(),
PacketStatusSource: status.source,
SignalRSSINeg: status.signalRSSINeg,
SNR: status.snr,
RawSignalRSSIX2Neg: status.rawSignalRSSIX2Neg,
RawSnrPktX4: status.rawSnrPktX4,
HubBusID: hubInfo.GetBusId(),
HubSubDevID: hubInfo.GetSubDevId(),
HubRadioBW: int32(hubInfo.GetRadioParameters().GetBw().Number()),
HubRadioSF: hubInfo.GetRadioParameters().GetSf(),
HubRadioFrequencyMHz: float64(hubInfo.GetRadioParameters().GetFrequencyMhz()),
ReceivedAt: now,
}
}
func buildGatewayStatusRecord(status *whgw_hrpb.GatewayStatus, topic string, now int64) models.MqttGatewayStatusRecord {
regionID := status.GetInfo().GetRegionId()
gatewayInfo := status.GetInfo()
regionID := gatewayInfo.GetRegionId()
if regionID == 0 {
regionID = parseRegionFromTopic(topic)
}
gatewayMAC := formatMAC(status.GetInfo().GetGatewayMac())
gatewayMAC := formatMAC(gatewayInfo.GetGatewayMac())
return models.MqttGatewayStatusRecord{
Identifier: fmt.Sprintf("gateway:%d:%s:%d:%d:%d", regionID, gatewayMAC, status.GetStat().GetBootCount(), status.GetStat().GetUptimeMs(), status.GetStat().GetRxCount()),
Topic: topic,
RegionID: regionID,
GatewayMAC: gatewayMAC,
GatewaySchemaVersion: gatewayInfo.GetExtra().GetSchemaVersion(),
GatewayActiveUplink: int32(gatewayInfo.GetExtra().GetActiveUplink().Number()),
GatewayCellularIMEI: gatewayInfo.GetExtra().GetCellularModem().GetImei(),
GatewayCellularRSSI: gatewayInfo.GetExtra().GetCellularModem().GetRssi(),
GatewayCellularBER: gatewayInfo.GetExtra().GetCellularModem().GetBer(),
BootCount: status.GetStat().GetBootCount(),
UptimeMs: status.GetStat().GetUptimeMs(),
DurationMsSinceLastPacket: status.GetStat().GetDurationMsSinceLastPacket(),
@@ -325,17 +365,27 @@ func buildGatewayStatusRecord(status *whgw_hrpb.GatewayStatus, topic string, now
}
}
func parsePacketStatus(status *whgw_hrpb.IPacketStatus) (float64, float64) {
func parsePacketStatus(status *whgw_hrpb.IPacketStatus) packetStatusSnapshot {
if status == nil {
return 0, 0
return packetStatusSnapshot{}
}
if parsed := status.GetParsed(); parsed != nil {
return float64(parsed.GetSignalRssiNeg()), float64(parsed.GetSnrPkt())
return packetStatusSnapshot{
source: "parsed",
signalRSSINeg: float64(parsed.GetSignalRssiNeg()),
snr: float64(parsed.GetSnrPkt()),
}
}
if raw := status.GetRaw(); raw != nil {
return -float64(raw.GetSignalRssiX2Neg()) / 2, float64(raw.GetSnrPktX4()) / 4
return packetStatusSnapshot{
source: "raw",
signalRSSINeg: -float64(raw.GetSignalRssiX2Neg()) / 2,
snr: float64(raw.GetSnrPktX4()) / 4,
rawSignalRSSIX2Neg: raw.GetSignalRssiX2Neg(),
rawSnrPktX4: raw.GetSnrPktX4(),
}
}
return 0, 0
return packetStatusSnapshot{}
}
func formatMAC(data []byte) string {