diff --git a/controllers/statistics.go b/controllers/statistics.go index 2ee4942..5043034 100644 --- a/controllers/statistics.go +++ b/controllers/statistics.go @@ -4,6 +4,7 @@ import ( "errors" "hr_receiver/config" "hr_receiver/models" + "math" "net/http" "sort" "strconv" @@ -73,6 +74,13 @@ type mqttMeasurementHistorySummary struct { ValueLabel string `json:"valueLabel"` } +const ( + mqttChartRawPointLimit int64 = 5000 + mqttChartTargetPointsTotal int64 = 6000 + mqttChartMinPointsPerAddr int64 = 120 + mqttChartMaxPointsPerAddr int64 = 600 +) + // --- 查询接口 --- // @Summary 获取AI分析记录列表 @@ -278,14 +286,6 @@ func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{}) baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at") - var records []models.MqttHeartRateRecord - if err := baseQuery.Session(&gorm.Session{}). - Order("received_at ASC"). - Find(&records).Error; err != nil { - writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records") - return - } - var summary mqttMeasurementHistorySummary summary.ValueLabel = "心率" if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil { @@ -293,32 +293,17 @@ func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, return } - items := make([]mqttMeasurementHistoryItem, 0, len(records)) - for _, record := range records { - items = append(items, mqttMeasurementHistoryItem{ - ID: record.ID, - DataType: "heart_rate", - Identifier: record.Identifier, - Topic: record.Topic, - RegionID: record.RegionID, - Addr: record.BeltAddr, - Value: int64(record.HeartRate), - ValueLabel: "心率", - PacketNum: record.PacketNum, - GatewayMAC: record.GatewayMAC, - Battery: record.Battery, - IsActive: record.IsActive, - IsOnSkin: record.IsOnSkin, - SignalRSSINeg: record.SignalRSSINeg, - SNR: record.SNR, - ReceivedAt: record.ReceivedAt, - CreatedAtMilli: record.CreatedAt.UnixMilli(), - }) + items, bucketSizeMs, sampled, err := sc.buildHeartRateChartItems(baseQuery, params, summary) + if err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records") + return } writeSuccess(c, http.StatusOK, "query success", gin.H{ - "list": items, - "summary": summary, + "list": items, + "summary": summary, + "sampled": sampled, + "bucketSizeMs": bucketSizeMs, }) } @@ -389,14 +374,6 @@ func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, baseQuery := sc.DB.Model(&models.MqttStepCountRecord{}) baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at") - var records []models.MqttStepCountRecord - if err := baseQuery.Session(&gorm.Session{}). - Order("received_at ASC"). - Find(&records).Error; err != nil { - writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records") - return - } - var summary mqttMeasurementHistorySummary summary.ValueLabel = "步数" if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil { @@ -404,32 +381,17 @@ func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, return } - items := make([]mqttMeasurementHistoryItem, 0, len(records)) - for _, record := range records { - items = append(items, mqttMeasurementHistoryItem{ - ID: record.ID, - DataType: "step_count", - Identifier: record.Identifier, - Topic: record.Topic, - RegionID: record.RegionID, - Addr: record.BeltAddr, - Value: int64(record.StepCount), - ValueLabel: "步数", - PacketNum: record.PacketNum, - GatewayMAC: record.GatewayMAC, - Battery: 0, - IsActive: false, - IsOnSkin: false, - SignalRSSINeg: record.SignalRSSINeg, - SNR: record.SNR, - ReceivedAt: record.ReceivedAt, - CreatedAtMilli: record.CreatedAt.UnixMilli(), - }) + items, bucketSizeMs, sampled, err := sc.buildStepCountChartItems(baseQuery, params, summary) + if err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records") + return } writeSuccess(c, http.StatusOK, "query success", gin.H{ - "list": items, - "summary": summary, + "list": items, + "summary": summary, + "sampled": sampled, + "bucketSizeMs": bucketSizeMs, }) } @@ -487,6 +449,229 @@ func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, s return nil } +func estimateChartBucketSizeMs(params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) int64 { + if params.StartTime <= 0 || params.EndTime <= params.StartTime { + return 0 + } + if summary.RecordCount <= mqttChartRawPointLimit { + return 0 + } + + uniqueAddrs := summary.UniqueAddrs + if uniqueAddrs < 1 { + uniqueAddrs = 1 + } + + pointsPerAddr := mqttChartTargetPointsTotal / uniqueAddrs + if pointsPerAddr < mqttChartMinPointsPerAddr { + pointsPerAddr = mqttChartMinPointsPerAddr + } + if pointsPerAddr > mqttChartMaxPointsPerAddr { + pointsPerAddr = mqttChartMaxPointsPerAddr + } + + rangeMs := params.EndTime - params.StartTime + if rangeMs <= 0 { + return 0 + } + + bucketSizeMs := int64(math.Ceil(float64(rangeMs) / float64(pointsPerAddr))) + return normalizeBucketSizeMs(bucketSizeMs) +} + +func normalizeBucketSizeMs(bucketSizeMs int64) int64 { + if bucketSizeMs <= 1000 { + return 1000 + } + + candidates := []int64{ + 1000, + 2000, + 5000, + 10 * 1000, + 15 * 1000, + 30 * 1000, + 60 * 1000, + 2 * 60 * 1000, + 5 * 60 * 1000, + 10 * 60 * 1000, + 15 * 60 * 1000, + 30 * 60 * 1000, + 60 * 60 * 1000, + 2 * 60 * 60 * 1000, + 6 * 60 * 60 * 1000, + 12 * 60 * 60 * 1000, + 24 * 60 * 60 * 1000, + } + for _, candidate := range candidates { + if bucketSizeMs <= candidate { + return candidate + } + } + return 24 * 60 * 60 * 1000 +} + +func buildChartBucketExpr(bucketSizeMs int64, timeField string) string { + return "((" + timeField + " / " + strconv.FormatInt(bucketSizeMs, 10) + ") * " + strconv.FormatInt(bucketSizeMs, 10) + ")" +} + +func (sc *StatisticsController) buildHeartRateChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) { + bucketSizeMs := estimateChartBucketSizeMs(params, summary) + if bucketSizeMs == 0 { + var records []models.MqttHeartRateRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at ASC"). + Find(&records).Error; err != nil { + return nil, 0, false, err + } + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "heart_rate", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.HeartRate), + ValueLabel: "心率", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: record.Battery, + IsActive: record.IsActive, + IsOnSkin: record.IsOnSkin, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + return items, 0, false, nil + } + + bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at") + type rawChartItem struct { + RegionID uint32 + Addr string + Value float64 + PacketNum int64 + SignalRSSINeg float64 + SNR float64 + ReceivedAt int64 + CreatedAtMilli int64 + } + + var rows []rawChartItem + if err := baseQuery.Session(&gorm.Session{}). + Select( + "region_id, belt_addr as addr, ROUND(AVG(heart_rate)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli", + ). + Group("region_id, belt_addr, " + bucketExpr). + Order("received_at ASC, addr ASC"). + Scan(&rows).Error; err != nil { + return nil, bucketSizeMs, true, err + } + + items := make([]mqttMeasurementHistoryItem, 0, len(rows)) + for _, row := range rows { + items = append(items, mqttMeasurementHistoryItem{ + DataType: "heart_rate", + RegionID: row.RegionID, + Addr: row.Addr, + Value: int64(math.Round(row.Value)), + ValueLabel: "心率", + PacketNum: uint32(maxInt64(row.PacketNum, 0)), + SignalRSSINeg: row.SignalRSSINeg, + SNR: row.SNR, + ReceivedAt: row.ReceivedAt, + CreatedAtMilli: row.CreatedAtMilli, + }) + } + return items, bucketSizeMs, true, nil +} + +func (sc *StatisticsController) buildStepCountChartItems(baseQuery *gorm.DB, params mqttMeasurementHistoryParams, summary mqttMeasurementHistorySummary) ([]mqttMeasurementHistoryItem, int64, bool, error) { + bucketSizeMs := estimateChartBucketSizeMs(params, summary) + if bucketSizeMs == 0 { + var records []models.MqttStepCountRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at ASC"). + Find(&records).Error; err != nil { + return nil, 0, false, err + } + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "step_count", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.StepCount), + ValueLabel: "步数", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: 0, + IsActive: false, + IsOnSkin: false, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + return items, 0, false, nil + } + + bucketExpr := buildChartBucketExpr(bucketSizeMs, "received_at") + type rawChartItem struct { + RegionID uint32 + Addr string + Value float64 + PacketNum int64 + SignalRSSINeg float64 + SNR float64 + ReceivedAt int64 + CreatedAtMilli int64 + } + + var rows []rawChartItem + if err := baseQuery.Session(&gorm.Session{}). + Select( + "region_id, belt_addr as addr, ROUND(AVG(step_count)) as value, COUNT(*) as packet_num, AVG(signal_rssi_neg) as signal_rssi_neg, AVG(snr) as snr, " + bucketExpr + " as received_at, MIN(EXTRACT(EPOCH FROM created_at) * 1000)::bigint as created_at_milli", + ). + Group("region_id, belt_addr, " + bucketExpr). + Order("received_at ASC, addr ASC"). + Scan(&rows).Error; err != nil { + return nil, bucketSizeMs, true, err + } + + items := make([]mqttMeasurementHistoryItem, 0, len(rows)) + for _, row := range rows { + items = append(items, mqttMeasurementHistoryItem{ + DataType: "step_count", + RegionID: row.RegionID, + Addr: row.Addr, + Value: int64(math.Round(row.Value)), + ValueLabel: "步数", + PacketNum: uint32(maxInt64(row.PacketNum, 0)), + SignalRSSINeg: row.SignalRSSINeg, + SNR: row.SNR, + ReceivedAt: row.ReceivedAt, + CreatedAtMilli: row.CreatedAtMilli, + }) + } + return items, bucketSizeMs, true, nil +} + +func maxInt64(value, floor int64) int64 { + if value < floor { + return floor + } + return value +} + // --- 删除接口 --- // @Summary 删除AI分析记录