From b5b6d64f5e8c7dc41c983b661555507b4c9d236f Mon Sep 17 00:00:00 2001 From: laoboli <1293528695@qq.com> Date: Wed, 13 May 2026 10:37:26 +0800 Subject: [PATCH] feat: mqtt data static. --- controllers/statistics.go | 395 ++++++++++++++++++++++++++++++++++++++ routes/routes.go | 2 + 2 files changed, 397 insertions(+) diff --git a/controllers/statistics.go b/controllers/statistics.go index d035cad..bc6b1e8 100644 --- a/controllers/statistics.go +++ b/controllers/statistics.go @@ -32,6 +32,47 @@ type analysisRecordListParams struct { EndTime int64 `form:"endTime"` } +type mqttMeasurementHistoryParams struct { + PageNum int `form:"pageNum,default=1"` + PageSize int `form:"pageSize,default=50"` + DataType string `form:"dataType"` + RegionID uint32 `form:"regionId"` + Addr string `form:"addr"` + ValueOperator string `form:"valueOperator"` + Value *int64 `form:"value"` + StartTime int64 `form:"startTime"` + EndTime int64 `form:"endTime"` +} + +type mqttMeasurementHistoryItem struct { + ID uint `json:"id"` + DataType string `json:"dataType"` + Identifier string `json:"identifier"` + Topic string `json:"topic"` + RegionID uint32 `json:"regionId"` + Addr string `json:"addr"` + Value int64 `json:"value"` + ValueLabel string `json:"valueLabel"` + PacketNum uint32 `json:"packetNum"` + GatewayMAC string `json:"gatewayMac"` + Battery uint32 `json:"battery"` + IsActive bool `json:"isActive"` + IsOnSkin bool `json:"isOnSkin"` + SignalRSSINeg float64 `json:"signalRssiNeg"` + SNR float64 `json:"snr"` + ReceivedAt int64 `json:"receivedAt"` + CreatedAtMilli int64 `json:"createdAt"` +} + +type mqttMeasurementHistorySummary struct { + AvgValue float64 `json:"avgValue"` + MaxValue int64 `json:"maxValue"` + MinValue int64 `json:"minValue"` + RecordCount int64 `json:"recordCount"` + UniqueAddrs int64 `json:"uniqueAddrs"` + ValueLabel string `json:"valueLabel"` +} + // --- 查询接口 --- // @Summary 获取AI分析记录列表 @@ -94,6 +135,360 @@ func (sc *StatisticsController) ListAIAnalysisRecords(c *gin.Context) { }) } +// @Summary 获取MQTT测量历史数据 +// @Description 分页查询MQTT测量历史数据,支持按数据类型、区域、地址、数值比较和时间范围筛选 +// @Tags 统计管理 +// @Produce json +// @Param pageNum query int false "页码(默认1)" +// @Param pageSize query int false "每页数量(默认50,最大200)" +// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate" +// @Param regionId query int false "区域ID" +// @Param addr query string false "地址筛选,模糊匹配 beltAddr" +// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq" +// @Param value query int false "数值比较值" +// @Param startTime query int false "开始时间(毫秒时间戳)" +// @Param endTime query int false "结束时间(毫秒时间戳)" +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "查询成功" +// @Router /admin/statistics/mqtt-measurements [get] +func (sc *StatisticsController) ListMqttMeasurementHistory(c *gin.Context) { + var params mqttMeasurementHistoryParams + if err := c.ShouldBindQuery(¶ms); err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + if params.PageNum < 1 { + params.PageNum = 1 + } + if params.PageSize < 1 || params.PageSize > 200 { + params.PageSize = 50 + } + if strings.TrimSpace(params.DataType) == "" { + params.DataType = "heart_rate" + } + + switch params.DataType { + case "heart_rate": + sc.listHeartRateMeasurementHistory(c, params) + case "step_count": + sc.listStepCountMeasurementHistory(c, params) + default: + writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed") + } +} + +// @Summary 获取MQTT测量图表数据 +// @Description 查询MQTT测量图表数据,返回同筛选条件下的完整历史序列,不参与分页 +// @Tags 统计管理 +// @Produce json +// @Param dataType query string false "数据类型 heart_rate|step_count,默认 heart_rate" +// @Param regionId query int false "区域ID" +// @Param addr query string false "地址筛选,模糊匹配 beltAddr" +// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq" +// @Param value query int false "数值比较值" +// @Param startTime query int false "开始时间(毫秒时间戳)" +// @Param endTime query int false "结束时间(毫秒时间戳)" +// @Security BearerAuth +// @Success 200 {object} SwagAPIResponse "查询成功" +// @Router /admin/statistics/mqtt-measurements/chart [get] +func (sc *StatisticsController) MqttMeasurementChartData(c *gin.Context) { + var params mqttMeasurementHistoryParams + if err := c.ShouldBindQuery(¶ms); err != nil { + writeError(c, http.StatusBadRequest, err.Error()) + return + } + if strings.TrimSpace(params.DataType) == "" { + params.DataType = "heart_rate" + } + + switch params.DataType { + case "heart_rate": + sc.chartHeartRateMeasurementHistory(c, params) + case "step_count": + sc.chartStepCountMeasurementHistory(c, params) + default: + writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed") + } +} + +func (sc *StatisticsController) listHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) { + baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{}) + baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at") + + var total int64 + if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to count mqtt heart rate records") + return + } + + offset := (params.PageNum - 1) * params.PageSize + var records []models.MqttHeartRateRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at DESC"). + Offset(offset). + Limit(params.PageSize). + Find(&records).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate records") + return + } + + var summary mqttMeasurementHistorySummary + summary.ValueLabel = "心率" + if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary") + return + } + + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "heart_rate", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.HeartRate), + ValueLabel: "心率", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: record.Battery, + IsActive: record.IsActive, + IsOnSkin: record.IsOnSkin, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + + writeSuccess(c, http.StatusOK, "query success", gin.H{ + "list": items, + "pagination": gin.H{ + "currentPage": params.PageNum, + "pageSize": params.PageSize, + "totalList": total, + "totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)), + }, + "summary": summary, + }) +} + +func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) { + baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{}) + baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at") + + var records []models.MqttHeartRateRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at ASC"). + Limit(10000). + Find(&records).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records") + return + } + + var summary mqttMeasurementHistorySummary + summary.ValueLabel = "心率" + if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary") + return + } + + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "heart_rate", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.HeartRate), + ValueLabel: "心率", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: record.Battery, + IsActive: record.IsActive, + IsOnSkin: record.IsOnSkin, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + + writeSuccess(c, http.StatusOK, "query success", gin.H{ + "list": items, + "summary": summary, + }) +} + +func (sc *StatisticsController) listStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) { + baseQuery := sc.DB.Model(&models.MqttStepCountRecord{}) + baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at") + + var total int64 + if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to count mqtt step count records") + return + } + + offset := (params.PageNum - 1) * params.PageSize + var records []models.MqttStepCountRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at DESC"). + Offset(offset). + Limit(params.PageSize). + Find(&records).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt step count records") + return + } + + var summary mqttMeasurementHistorySummary + summary.ValueLabel = "步数" + if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary") + return + } + + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "step_count", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.StepCount), + ValueLabel: "步数", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: 0, + IsActive: false, + IsOnSkin: false, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + + writeSuccess(c, http.StatusOK, "query success", gin.H{ + "list": items, + "pagination": gin.H{ + "currentPage": params.PageNum, + "pageSize": params.PageSize, + "totalList": total, + "totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)), + }, + "summary": summary, + }) +} + +func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) { + baseQuery := sc.DB.Model(&models.MqttStepCountRecord{}) + baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at") + + var records []models.MqttStepCountRecord + if err := baseQuery.Session(&gorm.Session{}). + Order("received_at ASC"). + Limit(10000). + Find(&records).Error; err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records") + return + } + + var summary mqttMeasurementHistorySummary + summary.ValueLabel = "步数" + if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil { + writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary") + return + } + + items := make([]mqttMeasurementHistoryItem, 0, len(records)) + for _, record := range records { + items = append(items, mqttMeasurementHistoryItem{ + ID: record.ID, + DataType: "step_count", + Identifier: record.Identifier, + Topic: record.Topic, + RegionID: record.RegionID, + Addr: record.BeltAddr, + Value: int64(record.StepCount), + ValueLabel: "步数", + PacketNum: record.PacketNum, + GatewayMAC: record.GatewayMAC, + Battery: 0, + IsActive: false, + IsOnSkin: false, + SignalRSSINeg: record.SignalRSSINeg, + SNR: record.SNR, + ReceivedAt: record.ReceivedAt, + CreatedAtMilli: record.CreatedAt.UnixMilli(), + }) + } + + writeSuccess(c, http.StatusOK, "query success", gin.H{ + "list": items, + "summary": summary, + }) +} + +func applyMqttMeasurementFilters(query *gorm.DB, params mqttMeasurementHistoryParams, addrField, valueField, timeField string) *gorm.DB { + if params.RegionID > 0 { + query = query.Where("region_id = ?", params.RegionID) + } + if addr := strings.TrimSpace(params.Addr); addr != "" { + query = query.Where(addrField+" LIKE ?", "%"+addr+"%") + } + if params.StartTime > 0 { + query = query.Where(timeField+" >= ?", params.StartTime) + } + if params.EndTime > 0 { + query = query.Where(timeField+" <= ?", params.EndTime) + } + if params.Value != nil { + switch strings.TrimSpace(params.ValueOperator) { + case "", "eq": + query = query.Where(valueField+" = ?", *params.Value) + case "gt": + query = query.Where(valueField+" > ?", *params.Value) + case "gte": + query = query.Where(valueField+" >= ?", *params.Value) + case "lt": + query = query.Where(valueField+" < ?", *params.Value) + case "lte": + query = query.Where(valueField+" <= ?", *params.Value) + } + } + return query +} + +func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, summary *mqttMeasurementHistorySummary) error { + type rawSummary struct { + RecordCount int64 + UniqueAddrs int64 + MinValue int64 + MaxValue int64 + AvgValue float64 + } + + var result rawSummary + if err := query.Session(&gorm.Session{}).Select( + "COUNT(*) as record_count, COUNT(DISTINCT " + addrField + ") as unique_addrs, COALESCE(MIN(" + valueField + "), 0) as min_value, COALESCE(MAX(" + valueField + "), 0) as max_value, COALESCE(AVG(" + valueField + "), 0) as avg_value", + ).Scan(&result).Error; err != nil { + return err + } + + summary.RecordCount = result.RecordCount + summary.UniqueAddrs = result.UniqueAddrs + summary.MinValue = result.MinValue + summary.MaxValue = result.MaxValue + summary.AvgValue = result.AvgValue + return nil +} + // --- 删除接口 --- // @Summary 删除AI分析记录 diff --git a/routes/routes.go b/routes/routes.go index 5aebe39..c9c98a2 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -165,6 +165,8 @@ func SetupRouter() *gin.Engine { admin.GET("/statistics/ai-analysis", statisticsController.StatisticsByRegion) admin.GET("/statistics/ai-analysis-timeline", statisticsController.TimelineStatistics) admin.GET("/statistics/mqtt-training-sessions", statisticsController.TrainingSessionStatisticsByRegion) + admin.GET("/statistics/mqtt-measurements", statisticsController.ListMqttMeasurementHistory) + admin.GET("/statistics/mqtt-measurements/chart", statisticsController.MqttMeasurementChartData) } v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket)