feat: mqtt data static.

This commit is contained in:
2026-05-13 10:37:26 +08:00
parent 1b447c782d
commit b5b6d64f5e
2 changed files with 397 additions and 0 deletions
+395
View File
@@ -32,6 +32,47 @@ type analysisRecordListParams struct {
EndTime int64 `form:"endTime"`
}
type mqttMeasurementHistoryParams struct {
PageNum int `form:"pageNum,default=1"`
PageSize int `form:"pageSize,default=50"`
DataType string `form:"dataType"`
RegionID uint32 `form:"regionId"`
Addr string `form:"addr"`
ValueOperator string `form:"valueOperator"`
Value *int64 `form:"value"`
StartTime int64 `form:"startTime"`
EndTime int64 `form:"endTime"`
}
type mqttMeasurementHistoryItem struct {
ID uint `json:"id"`
DataType string `json:"dataType"`
Identifier string `json:"identifier"`
Topic string `json:"topic"`
RegionID uint32 `json:"regionId"`
Addr string `json:"addr"`
Value int64 `json:"value"`
ValueLabel string `json:"valueLabel"`
PacketNum uint32 `json:"packetNum"`
GatewayMAC string `json:"gatewayMac"`
Battery uint32 `json:"battery"`
IsActive bool `json:"isActive"`
IsOnSkin bool `json:"isOnSkin"`
SignalRSSINeg float64 `json:"signalRssiNeg"`
SNR float64 `json:"snr"`
ReceivedAt int64 `json:"receivedAt"`
CreatedAtMilli int64 `json:"createdAt"`
}
type mqttMeasurementHistorySummary struct {
AvgValue float64 `json:"avgValue"`
MaxValue int64 `json:"maxValue"`
MinValue int64 `json:"minValue"`
RecordCount int64 `json:"recordCount"`
UniqueAddrs int64 `json:"uniqueAddrs"`
ValueLabel string `json:"valueLabel"`
}
// --- 查询接口 ---
// @Summary 获取AI分析记录列表
@@ -94,6 +135,360 @@ func (sc *StatisticsController) ListAIAnalysisRecords(c *gin.Context) {
})
}
// @Summary 获取MQTT测量历史数据
// @Description 分页查询MQTT测量历史数据支持按数据类型、区域、地址、数值比较和时间范围筛选
// @Tags 统计管理
// @Produce json
// @Param pageNum query int false "页码(默认1)"
// @Param pageSize query int false "每页数量(默认50,最大200)"
// @Param dataType query string false "数据类型 heart_rate|step_count默认 heart_rate"
// @Param regionId query int false "区域ID"
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
// @Param value query int false "数值比较值"
// @Param startTime query int false "开始时间(毫秒时间戳)"
// @Param endTime query int false "结束时间(毫秒时间戳)"
// @Security BearerAuth
// @Success 200 {object} SwagAPIResponse "查询成功"
// @Router /admin/statistics/mqtt-measurements [get]
func (sc *StatisticsController) ListMqttMeasurementHistory(c *gin.Context) {
var params mqttMeasurementHistoryParams
if err := c.ShouldBindQuery(&params); err != nil {
writeError(c, http.StatusBadRequest, err.Error())
return
}
if params.PageNum < 1 {
params.PageNum = 1
}
if params.PageSize < 1 || params.PageSize > 200 {
params.PageSize = 50
}
if strings.TrimSpace(params.DataType) == "" {
params.DataType = "heart_rate"
}
switch params.DataType {
case "heart_rate":
sc.listHeartRateMeasurementHistory(c, params)
case "step_count":
sc.listStepCountMeasurementHistory(c, params)
default:
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
}
}
// @Summary 获取MQTT测量图表数据
// @Description 查询MQTT测量图表数据返回同筛选条件下的完整历史序列不参与分页
// @Tags 统计管理
// @Produce json
// @Param dataType query string false "数据类型 heart_rate|step_count默认 heart_rate"
// @Param regionId query int false "区域ID"
// @Param addr query string false "地址筛选,模糊匹配 beltAddr"
// @Param valueOperator query string false "数值比较符 gt|gte|lt|lte|eq"
// @Param value query int false "数值比较值"
// @Param startTime query int false "开始时间(毫秒时间戳)"
// @Param endTime query int false "结束时间(毫秒时间戳)"
// @Security BearerAuth
// @Success 200 {object} SwagAPIResponse "查询成功"
// @Router /admin/statistics/mqtt-measurements/chart [get]
func (sc *StatisticsController) MqttMeasurementChartData(c *gin.Context) {
var params mqttMeasurementHistoryParams
if err := c.ShouldBindQuery(&params); err != nil {
writeError(c, http.StatusBadRequest, err.Error())
return
}
if strings.TrimSpace(params.DataType) == "" {
params.DataType = "heart_rate"
}
switch params.DataType {
case "heart_rate":
sc.chartHeartRateMeasurementHistory(c, params)
case "step_count":
sc.chartStepCountMeasurementHistory(c, params)
default:
writeError(c, http.StatusBadRequest, "unsupported dataType, only heart_rate or step_count is allowed")
}
}
func (sc *StatisticsController) listHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
var total int64
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to count mqtt heart rate records")
return
}
offset := (params.PageNum - 1) * params.PageSize
var records []models.MqttHeartRateRecord
if err := baseQuery.Session(&gorm.Session{}).
Order("received_at DESC").
Offset(offset).
Limit(params.PageSize).
Find(&records).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate records")
return
}
var summary mqttMeasurementHistorySummary
summary.ValueLabel = "心率"
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
return
}
items := make([]mqttMeasurementHistoryItem, 0, len(records))
for _, record := range records {
items = append(items, mqttMeasurementHistoryItem{
ID: record.ID,
DataType: "heart_rate",
Identifier: record.Identifier,
Topic: record.Topic,
RegionID: record.RegionID,
Addr: record.BeltAddr,
Value: int64(record.HeartRate),
ValueLabel: "心率",
PacketNum: record.PacketNum,
GatewayMAC: record.GatewayMAC,
Battery: record.Battery,
IsActive: record.IsActive,
IsOnSkin: record.IsOnSkin,
SignalRSSINeg: record.SignalRSSINeg,
SNR: record.SNR,
ReceivedAt: record.ReceivedAt,
CreatedAtMilli: record.CreatedAt.UnixMilli(),
})
}
writeSuccess(c, http.StatusOK, "query success", gin.H{
"list": items,
"pagination": gin.H{
"currentPage": params.PageNum,
"pageSize": params.PageSize,
"totalList": total,
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
},
"summary": summary,
})
}
func (sc *StatisticsController) chartHeartRateMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
baseQuery := sc.DB.Model(&models.MqttHeartRateRecord{})
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "heart_rate", "received_at")
var records []models.MqttHeartRateRecord
if err := baseQuery.Session(&gorm.Session{}).
Order("received_at ASC").
Limit(10000).
Find(&records).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate chart records")
return
}
var summary mqttMeasurementHistorySummary
summary.ValueLabel = "心率"
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "heart_rate", "belt_addr", &summary); err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt heart rate summary")
return
}
items := make([]mqttMeasurementHistoryItem, 0, len(records))
for _, record := range records {
items = append(items, mqttMeasurementHistoryItem{
ID: record.ID,
DataType: "heart_rate",
Identifier: record.Identifier,
Topic: record.Topic,
RegionID: record.RegionID,
Addr: record.BeltAddr,
Value: int64(record.HeartRate),
ValueLabel: "心率",
PacketNum: record.PacketNum,
GatewayMAC: record.GatewayMAC,
Battery: record.Battery,
IsActive: record.IsActive,
IsOnSkin: record.IsOnSkin,
SignalRSSINeg: record.SignalRSSINeg,
SNR: record.SNR,
ReceivedAt: record.ReceivedAt,
CreatedAtMilli: record.CreatedAt.UnixMilli(),
})
}
writeSuccess(c, http.StatusOK, "query success", gin.H{
"list": items,
"summary": summary,
})
}
func (sc *StatisticsController) listStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
var total int64
if err := baseQuery.Session(&gorm.Session{}).Count(&total).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to count mqtt step count records")
return
}
offset := (params.PageNum - 1) * params.PageSize
var records []models.MqttStepCountRecord
if err := baseQuery.Session(&gorm.Session{}).
Order("received_at DESC").
Offset(offset).
Limit(params.PageSize).
Find(&records).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count records")
return
}
var summary mqttMeasurementHistorySummary
summary.ValueLabel = "步数"
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
return
}
items := make([]mqttMeasurementHistoryItem, 0, len(records))
for _, record := range records {
items = append(items, mqttMeasurementHistoryItem{
ID: record.ID,
DataType: "step_count",
Identifier: record.Identifier,
Topic: record.Topic,
RegionID: record.RegionID,
Addr: record.BeltAddr,
Value: int64(record.StepCount),
ValueLabel: "步数",
PacketNum: record.PacketNum,
GatewayMAC: record.GatewayMAC,
Battery: 0,
IsActive: false,
IsOnSkin: false,
SignalRSSINeg: record.SignalRSSINeg,
SNR: record.SNR,
ReceivedAt: record.ReceivedAt,
CreatedAtMilli: record.CreatedAt.UnixMilli(),
})
}
writeSuccess(c, http.StatusOK, "query success", gin.H{
"list": items,
"pagination": gin.H{
"currentPage": params.PageNum,
"pageSize": params.PageSize,
"totalList": total,
"totalPage": int((total + int64(params.PageSize) - 1) / int64(params.PageSize)),
},
"summary": summary,
})
}
func (sc *StatisticsController) chartStepCountMeasurementHistory(c *gin.Context, params mqttMeasurementHistoryParams) {
baseQuery := sc.DB.Model(&models.MqttStepCountRecord{})
baseQuery = applyMqttMeasurementFilters(baseQuery, params, "belt_addr", "step_count", "received_at")
var records []models.MqttStepCountRecord
if err := baseQuery.Session(&gorm.Session{}).
Order("received_at ASC").
Limit(10000).
Find(&records).Error; err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count chart records")
return
}
var summary mqttMeasurementHistorySummary
summary.ValueLabel = "步数"
if err := buildMqttMeasurementSummary(baseQuery.Session(&gorm.Session{}), "step_count", "belt_addr", &summary); err != nil {
writeError(c, http.StatusInternalServerError, "failed to query mqtt step count summary")
return
}
items := make([]mqttMeasurementHistoryItem, 0, len(records))
for _, record := range records {
items = append(items, mqttMeasurementHistoryItem{
ID: record.ID,
DataType: "step_count",
Identifier: record.Identifier,
Topic: record.Topic,
RegionID: record.RegionID,
Addr: record.BeltAddr,
Value: int64(record.StepCount),
ValueLabel: "步数",
PacketNum: record.PacketNum,
GatewayMAC: record.GatewayMAC,
Battery: 0,
IsActive: false,
IsOnSkin: false,
SignalRSSINeg: record.SignalRSSINeg,
SNR: record.SNR,
ReceivedAt: record.ReceivedAt,
CreatedAtMilli: record.CreatedAt.UnixMilli(),
})
}
writeSuccess(c, http.StatusOK, "query success", gin.H{
"list": items,
"summary": summary,
})
}
func applyMqttMeasurementFilters(query *gorm.DB, params mqttMeasurementHistoryParams, addrField, valueField, timeField string) *gorm.DB {
if params.RegionID > 0 {
query = query.Where("region_id = ?", params.RegionID)
}
if addr := strings.TrimSpace(params.Addr); addr != "" {
query = query.Where(addrField+" LIKE ?", "%"+addr+"%")
}
if params.StartTime > 0 {
query = query.Where(timeField+" >= ?", params.StartTime)
}
if params.EndTime > 0 {
query = query.Where(timeField+" <= ?", params.EndTime)
}
if params.Value != nil {
switch strings.TrimSpace(params.ValueOperator) {
case "", "eq":
query = query.Where(valueField+" = ?", *params.Value)
case "gt":
query = query.Where(valueField+" > ?", *params.Value)
case "gte":
query = query.Where(valueField+" >= ?", *params.Value)
case "lt":
query = query.Where(valueField+" < ?", *params.Value)
case "lte":
query = query.Where(valueField+" <= ?", *params.Value)
}
}
return query
}
func buildMqttMeasurementSummary(query *gorm.DB, valueField, addrField string, summary *mqttMeasurementHistorySummary) error {
type rawSummary struct {
RecordCount int64
UniqueAddrs int64
MinValue int64
MaxValue int64
AvgValue float64
}
var result rawSummary
if err := query.Session(&gorm.Session{}).Select(
"COUNT(*) as record_count, COUNT(DISTINCT " + addrField + ") as unique_addrs, COALESCE(MIN(" + valueField + "), 0) as min_value, COALESCE(MAX(" + valueField + "), 0) as max_value, COALESCE(AVG(" + valueField + "), 0) as avg_value",
).Scan(&result).Error; err != nil {
return err
}
summary.RecordCount = result.RecordCount
summary.UniqueAddrs = result.UniqueAddrs
summary.MinValue = result.MinValue
summary.MaxValue = result.MaxValue
summary.AvgValue = result.AvgValue
return nil
}
// --- 删除接口 ---
// @Summary 删除AI分析记录
+2
View File
@@ -165,6 +165,8 @@ func SetupRouter() *gin.Engine {
admin.GET("/statistics/ai-analysis", statisticsController.StatisticsByRegion)
admin.GET("/statistics/ai-analysis-timeline", statisticsController.TimelineStatistics)
admin.GET("/statistics/mqtt-training-sessions", statisticsController.TrainingSessionStatisticsByRegion)
admin.GET("/statistics/mqtt-measurements", statisticsController.ListMqttMeasurementHistory)
admin.GET("/statistics/mqtt-measurements/chart", statisticsController.MqttMeasurementChartData)
}
v1.GET("/admin/system-debug/mqtt/ws", systemDebugController.MqttWebSocket)