From 9a951304884e02b681c372952e299cffc022ff6b Mon Sep 17 00:00:00 2001 From: laoboli <1293528695@qq.com> Date: Tue, 28 Apr 2026 18:52:03 +0800 Subject: [PATCH] feat: start record. --- config.sample.yaml | 2 + config/config.go | 20 +++--- main.go | 1 + models/mqtt_data.go | 21 ++++++ mqtt/listener.go | 172 ++++++++++++++++++++++++++++++++++++++++++-- test/a.md | 25 +++++++ test/b.md | 25 +++++++ 7 files changed, 251 insertions(+), 15 deletions(-) create mode 100644 test/a.md create mode 100644 test/b.md diff --git a/config.sample.yaml b/config.sample.yaml index d58bbb9..293c2a5 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -18,3 +18,5 @@ mqtt: region: "+" use_tls: true qos: 0 + enable_measurement_subscriptions: false + enable_training_event_subscription: true diff --git a/config/config.go b/config/config.go index f144d4a..ff57b04 100644 --- a/config/config.go +++ b/config/config.go @@ -27,15 +27,17 @@ type AIConfig struct { } type MQTTConfig struct { - Enabled bool `mapstructure:"enabled" yaml:"enabled"` - Host string `mapstructure:"host" yaml:"host"` - Port int `mapstructure:"port" yaml:"port"` - Username string `mapstructure:"username" yaml:"username"` - Password string `mapstructure:"password" yaml:"password"` - ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"` - Region string `mapstructure:"region" yaml:"region"` - UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"` - QoS int `mapstructure:"qos" yaml:"qos"` + Enabled bool `mapstructure:"enabled" yaml:"enabled"` + Host string `mapstructure:"host" yaml:"host"` + Port int `mapstructure:"port" yaml:"port"` + Username string `mapstructure:"username" yaml:"username"` + Password string `mapstructure:"password" yaml:"password"` + ClientIDPrefix string `mapstructure:"client_id_prefix" yaml:"client_id_prefix"` + Region string `mapstructure:"region" yaml:"region"` + UseTLS bool `mapstructure:"use_tls" yaml:"use_tls"` + QoS int `mapstructure:"qos" yaml:"qos"` + EnableMeasurementSubscriptions bool `mapstructure:"enable_measurement_subscriptions" yaml:"enable_measurement_subscriptions"` + EnableTrainingEventSubscription bool `mapstructure:"enable_training_event_subscription" yaml:"enable_training_event_subscription"` } type AppConfig struct { diff --git a/main.go b/main.go index f109f18..09d6da9 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ func main() { &models.MqttHeartRateRecord{}, &models.MqttStepCountRecord{}, &models.MqttGatewayStatusRecord{}, + &models.MqttTrainingSessionRecord{}, ) if err := mqtt.Start(config.DB, config.App.MQTT); err != nil { diff --git a/models/mqtt_data.go b/models/mqtt_data.go index be0330d..dca78e7 100644 --- a/models/mqtt_data.go +++ b/models/mqtt_data.go @@ -55,3 +55,24 @@ type MqttGatewayStatusRecord struct { ChargingRatePercentage int32 `json:"chargingRatePercentage"` ReceivedAt int64 `gorm:"type:bigint;index" json:"receivedAt"` } + +type MqttTrainingSessionRecord struct { + gorm.Model + Identifier string `gorm:"uniqueIndex;size:255" json:"identifier"` + Topic string `gorm:"size:255;index" json:"topic"` + TestID string `gorm:"size:255;index" json:"testId"` + EventType string `gorm:"size:32;index" json:"eventType"` + RegionID uint32 `gorm:"index" json:"regionId"` + FlavorType string `gorm:"size:64;index" json:"flavorType"` + RawFlavor string `gorm:"size:64" json:"rawFlavor"` + AppName string `gorm:"size:255" json:"appName"` + StartedAt *int64 `gorm:"type:bigint;index" json:"startedAt"` + EndedAt *int64 `gorm:"type:bigint;index" json:"endedAt"` + PublishedAt int64 `gorm:"type:bigint;index" json:"publishedAt"` + ReceivedAt int64 `gorm:"type:bigint;index" json:"receivedAt"` + RawPayload string `gorm:"type:text" json:"rawPayload"` +} + +func (MqttTrainingSessionRecord) TableName() string { + return "mqtt_training_sessions" +} diff --git a/mqtt/listener.go b/mqtt/listener.go index 83c7bce..ea06184 100644 --- a/mqtt/listener.go +++ b/mqtt/listener.go @@ -2,6 +2,7 @@ package mqtt import ( "crypto/tls" + "encoding/json" "fmt" "hr_receiver/config" "hr_receiver/models" @@ -14,6 +15,7 @@ import ( "google.golang.org/protobuf/proto" "gorm.io/gorm" "gorm.io/gorm/clause" + "gorm.io/gorm/schema" ) const ( @@ -28,6 +30,16 @@ type Listener struct { writeCh chan interface{} } +type trainingSessionPayload struct { + Type string `json:"type"` + EventType string `json:"eventType"` + TestID string `json:"testId"` + RegionID string `json:"regionId"` + Flavor string `json:"flavor"` + AppName string `json:"appName"` + Timestamp string `json:"timestamp"` +} + func Start(db *gorm.DB, cfg config.MQTTConfig) error { if !cfg.Enabled { log.Println("mqtt listener disabled") @@ -106,10 +118,16 @@ func (l *Listener) connect() error { } func (l *Listener) subscribe(client mqtt.Client) error { - topics := []string{ - fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region), - fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region), - fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region), + var topics []string + if l.cfg.EnableMeasurementSubscriptions { + topics = append(topics, + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/hr", l.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/measurement/band/+/step", l.cfg.Region), + fmt.Sprintf("/whgw/v2/region/%s/gateway/+/status", l.cfg.Region), + ) + } + if l.cfg.EnableTrainingEventSubscription { + topics = append(topics, "/whgw/v2/region/test/+/+") } for _, topic := range topics { token := client.Subscribe(topic, byte(l.cfg.QoS), l.handleMessage) @@ -134,14 +152,23 @@ func (l *Listener) handleMessage(_ mqtt.Client, msg mqtt.Message) { if len(msg.Payload()) == 0 { return } + now := time.Now().UnixMilli() var packet whgw_hrpb.GatewaySlaveOutCloudMasterInMsg + if isTrainingEventTopic(msg.Topic()) { + record, ok := buildTrainingSessionRecord(msg.Topic(), msg.Payload(), now) + if !ok { + return + } + l.enqueue(record) + return + } + if err := proto.Unmarshal(msg.Payload(), &packet); err != nil { log.Printf("mqtt payload parse failed, topic=%s err=%v", msg.Topic(), err) return } - now := time.Now().UnixMilli() switch payload := packet.Choice.(type) { case *whgw_hrpb.GatewaySlaveOutCloudMasterInMsg_NtfHrMeasurement: record := buildHeartRateRecord(payload.NtfHrMeasurement, msg.Topic(), now) @@ -174,13 +201,48 @@ func (l *Listener) writeWorker() { } }() - if err := l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error; err != nil { + if err := l.persistRecord(record); err != nil { log.Printf("mqtt record persist failed, type=%T err=%v", record, err) } }() } } +func (l *Listener) persistRecord(record interface{}) error { + switch r := record.(type) { + case *models.MqttTrainingSessionRecord: + return l.persistTrainingSession(r) + default: + return l.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error + } +} + +func (l *Listener) persistTrainingSession(record *models.MqttTrainingSessionRecord) error { + assignments := map[string]interface{}{ + "topic": record.Topic, + "event_type": record.EventType, + "region_id": record.RegionID, + "flavor_type": record.FlavorType, + "raw_flavor": record.RawFlavor, + "app_name": record.AppName, + "published_at": record.PublishedAt, + "received_at": record.ReceivedAt, + "raw_payload": record.RawPayload, + "updated_at": time.Now(), + } + if record.StartedAt != nil { + assignments["started_at"] = *record.StartedAt + } + if record.EndedAt != nil { + assignments["ended_at"] = *record.EndedAt + } + + return l.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "identifier"}}, + DoUpdates: clause.Assignments(assignments), + }).Create(record).Error +} + func buildHeartRateRecord(measurement *whgw_hrpb.HrMeasurement, topic string, now int64) models.MqttHeartRateRecord { regionID := measurement.GetGatewayInfo().GetRegionId() if regionID == 0 { @@ -298,3 +360,101 @@ func parseRegionFromTopic(topic string) uint32 { } return 0 } + +func isTrainingEventTopic(topic string) bool { + return strings.HasPrefix(topic, "/whgw/v2/region/test/") +} + +func buildTrainingSessionRecord(topic string, payload []byte, now int64) (*models.MqttTrainingSessionRecord, bool) { + var event trainingSessionPayload + if err := json.Unmarshal(payload, &event); err != nil { + log.Printf("mqtt training event parse failed, topic=%s err=%v", topic, err) + return nil, false + } + if event.Type != "mqtt_test" { + log.Printf("mqtt training event ignored, unsupported type topic=%s type=%s", topic, event.Type) + return nil, false + } + + flavorType := normalizeFlavor(event.Flavor) + if flavorType != "heartrate" { + log.Printf("mqtt training event ignored, unsupported flavor topic=%s flavor=%s", topic, event.Flavor) + return nil, false + } + + regionID := parseUint32(event.RegionID) + if regionID == 0 { + regionID = parseRegionFromTrainingTopic(topic) + } + publishedAt := parseRFC3339Milli(event.Timestamp) + if publishedAt == 0 { + publishedAt = now + } + + record := &models.MqttTrainingSessionRecord{ + Identifier: buildTrainingSessionIdentifier(flavorType, regionID, event.TestID), + Topic: topic, + TestID: event.TestID, + EventType: event.EventType, + RegionID: regionID, + FlavorType: flavorType, + RawFlavor: event.Flavor, + AppName: event.AppName, + PublishedAt: publishedAt, + ReceivedAt: now, + RawPayload: string(payload), + } + switch event.EventType { + case "start_test": + record.StartedAt = &publishedAt + case "stop_test": + record.EndedAt = &publishedAt + default: + log.Printf("mqtt training event ignored, unsupported event topic=%s event=%s", topic, event.EventType) + return nil, false + } + return record, true +} + +func buildTrainingSessionIdentifier(flavorType string, regionID uint32, testID string) string { + return schema.NamingStrategy{}.IndexName( + "mqtt_training_session", + fmt.Sprintf("%s_%d_%s", flavorType, regionID, testID), + ) +} + +func normalizeFlavor(flavor string) string { + switch strings.ToLower(strings.TrimSpace(flavor)) { + case "hr", "heartrate": + return "heartrate" + default: + return strings.ToLower(strings.TrimSpace(flavor)) + } +} + +func parseUint32(value string) uint32 { + var result uint32 + if _, err := fmt.Sscanf(strings.TrimSpace(value), "%d", &result); err == nil { + return result + } + return 0 +} + +func parseRegionFromTrainingTopic(topic string) uint32 { + parts := strings.Split(topic, "/") + if len(parts) >= 6 { + return parseUint32(parts[5]) + } + return 0 +} + +func parseRFC3339Milli(value string) int64 { + if strings.TrimSpace(value) == "" { + return 0 + } + t, err := time.Parse(time.RFC3339, value) + if err != nil { + return 0 + } + return t.UnixMilli() +} diff --git a/test/a.md b/test/a.md new file mode 100644 index 0000000..56e0699 --- /dev/null +++ b/test/a.md @@ -0,0 +1,25 @@ +泉州市丰泽区幼儿园园际教研组(健康领域)听课记录表 + +## 表格 +| 活动时间 | 2026年1月16日(上午) | 活动地点 | 丰泽区 东海实验幼儿园 | 班级 | 大三班 | +| --- | --- | --- | --- | --- | --- | +活动内容 | 大班健康活动 《勇敢小兵》(动作发展) | 执教者 | 丰泽区东海实验 幼儿园 贺晓影 | 指导者 | 金达兰 鄞婉瑜 | +研讨问题 | 1.结合《指南》健康领域“动作发展”目标与幼儿“爬”的核心经验,活动中哪些策略有效支持了不同年龄段幼儿“爬”动作的进阶发展?又该如何 +化调整,让活动更贴合幼儿个体差异,助力其核心经验的深度习得? 2.如何通过数据赋能、优化体育活动设计与组织,避免运动量不足或过度,同时助助幼儿“爬”动作技能稳步提升? | +活动目标 | 1.初步掌握匍匐爬的动作,能手脚协调配合、身体平稳向前移动。 2.提升对身体的控制能力和动作的协调性、灵敏性。 3.体验不怕困难、 勇于挑战的精神,萌发对中国人民解放军的崇敬之情与民族自豪感。 | +活动准备 | 1.经验准备:具备手膝爬、手脚爬、投掷、跳跃、平衡等相关动作经验;班级开展《我爱解放军》主题探究活动,幼儿了解解放军事迹,并 +萌发崇敬之情。 2.物质准备:爬行垫、椅子、沙包、口哨、铃铛等若干。 3.场地设置: ![anchim 图片 2](84bb75bb08135772a5547fe0bf68a328)![anchim 图片 3](909e8a8978ad9a0262cc5f26b5cde329) ![anchim 图片 4](a16d232ae054a5c3589b39ce7839894d) | +活动实录 | 评价与建议 | +一、开始部分 1.情景导入 引导语:小朋友们,80多年前,中国解放军没有先进的武器,靠着小米加步枪和勇敢无畏的精神打败了侵略者。今天,我们也 +当一回勇敢的小士兵,你们有没有信心”! 2.热身运动 行进中听信号快速卧倒(2-3次),强调原地卧倒时“贴地”的动作要求,并提醒幼儿卧倒动作需保 护好膝盖的安全要点。 | | +二、基本部分 (一)游戏一:自主探索,学习基础动作 1.问题导入:如何贴地移动,才能又轻又快? 2.幼儿自主探索俯身匍匐爬动作,教师巡回观察 +。 3.分享交流,梳理问题 提问:你们觉得哪种方式比较不容易被敌人发现?为什么? 鼓励幼儿大胆表达自己的发现和感受,梳理出“身体贴地、手脚协 +发力”是关键。 4.动作示范与练习: (1)示范讲解:小肚子贴地面,膝盖内侧轻轻贴。双手交替向前撑,双脚蹬地使劲冲。小屁股左右扭,不抬头来来 +不勾。 (2)动作练习: ①个别幼儿示范:关注双手握拳、贴紧地面、脚蹬地、左右扭等动作要领。 ②分组循环练习:四路纵队依次通过平坦爬行区域域 +教师重点关注动作规范性,对能力弱的幼儿进行辅助。 ③提出上次游戏问题再次练习。 (二)游戏二:穿越火线,突破低矮障碍 1.情境升级: 引导 +语:可恶的敌人不仅布置了红外线,而且还在上面点起浓烟,我们需要低姿匍匐爬,趴低一点才能呼吸到新鲜空气。 2.再次练习: 幼儿分组通过低矮障 +,教师在障碍旁实时指导,并提醒幼儿注意与同伴保持距离,避免碰撞。 (三)游戏三:士兵出击,完成终极任务 1.任务布置:敌人在前面设置了重重 +障碍,我们需要爬过浓烟区、翻过敌人的围墙,登上敌人的岛屿,拿起炸弹,轰炸敌人的碉堡。 2.游戏开展: (1)明确规则:投掷完炸弹回到出发的 +地方继续出发。 (2)幼儿在音乐中开展游戏,教师关注动作质量、安全秩序,鼓励幼儿坚持完成任务。 三、结束部分: 1.舒缓放松:师幼随轻柔音乐做 +全身拉伸、拍打放松、呼吸调节”,帮助幼儿缓解运动疲劳。 2.分享与迁移:除了作战、军事训练经常运用匍匐爬行,生活中还有什么时候会用到? 3..整理环节:指导幼儿合作整理器材。 | | + diff --git a/test/b.md b/test/b.md new file mode 100644 index 0000000..599e456 --- /dev/null +++ b/test/b.md @@ -0,0 +1,25 @@ +泉州市丰泽区幼儿园园际教研组(健康领域)听课记录表 + +## 表格 +| 活动时间 | 2026年1月16日(上午) | 活动地点 | 丰泽区 东海实验幼儿园 | 班级 | 小四班 | +| --- | --- | --- | --- | --- | --- | +活动内容 | 小班健康活动 《蚂蚁运粮》(动作发展) | 执教者 | 丰泽区泉秀实验 幼儿园 吴思莹 | 指导者 | 陈心雅 郭慧兰 | +研讨问题 | 1.结合《指南》健康领域“动作发展”目标与幼儿“爬”的核心经验,活动中哪些策略有效支持了不同年龄段幼儿“爬”动作的进阶发展?又该如何 +足或过度,同时助 助幼儿"爬"动作技能稳步提升? | +活动目标 | 1.掌握手膝着地爬的基本动作要领。 2.乐于尝试倒退爬及携带 “粮食” 道具爬行,增强身体控制能力。 3.喜欢参与 “蚂蚁运粮” 游戏,感 受集体游戏的快乐。 | +活动准备 | 1.材料准备:蚂蚁头饰若干,特制粘扣衣服(幼儿每人一件),彩色毛球(作为“粮食”),彩虹伞、彩虹网、爬行隧道、地垫,轻快的背景 +音乐。 2.环境准备:铺设地垫,划分“蚂蚁洞穴”(爬行隧道)、“障碍小路”、“粮仓”(毛球放置区)三个游戏区域。 活动场地: ![inlnim 图片 1](c0 +4c04f6c8503dc23bcb0e04caec40c7) 3.经验准备 (1)幼儿已初步了解蚂蚁爬行的特点,知道蚂蚁会群体搬运食物的生活习性。 (2)幼儿玩过游戏《快慢爬》、《棕熊来了》,使用过特制粘扣衣服。 | +活动实录 | 评价与建议 | +(一)激趣引入,热身准备 1.教师扮演蚂蚁妈妈,带领蚂蚁宝贝四散站立。 引导语:我是蚂蚁妈妈,可爱的小蚂蚁们,今天妈妈带着大家一起玩!我们 +先跟着音乐热身起来吧! 2.热身运动。 带领幼儿做热身操:头部→肩部→侧身→膝盖→体转→放松,充分活动关节。 (二)自主探索,学习动作。 1.经验 +唤醒,自由爬行。 引导语:蚂蚁宝贝们,我们一起在彩虹伞中爬一爬、玩一玩吧! 2.分享交流,梳理经验。 请“小蚂蚁”示范爬行玩法,教师梳理示范 +手膝着地爬动作要领。 小结:小蚂蚁们学会的这种爬的方式,有个好听的名字叫——手膝着地爬,爬的时候,要注意头抬起,两只小手要放好,两只小腿 +要跟上。 3.练习本领一:“快慢爬”,听鼓声集体练习手膝着地快慢爬。 4.练习本领二:“倒退爬”,以“棕熊来了”游戏引导幼儿尝试手膝着地倒退爬。 +(三)游戏学习,巩固提升 1.介绍“蚂蚁运粮”游戏玩法。 玩法:小蚂蚁们从“蚂蚁洞穴”出发,快速爬过隧道“穿草丛”,手膝着地爬过“障碍小路”,到“ +粮仓”粘1个毛球在身上,中途听到棕熊来了的声音,用倒退爬的方式给棕熊让路,等棕熊走了再继续向前爬行将粮食运回洞穴。 2.请个别幼儿示范玩法 +。 教师扮演“蚂蚁妈妈”,提醒幼儿遵守规则,看到棕熊时倒退爬,时刻关注幼儿安全,鼓励胆小的幼儿大胆尝试。 3.提出要求,让幼儿进一步明确游戏 +则。 (1)游戏过程要注意手膝爬的动作要领和安全。 (2)每只蚂蚁每次只取一粒粮食粘在身上运回。 (3)遵守规则:前一只蚂蚁爬过,下一只蚂蚂 +再出发。 4.幼儿分组游戏。 第一次游戏:教师引导幼儿按颜色分成3组进行游戏。 教师重点观察、指导幼儿手膝着地向前爬、后退爬的动作,督促幼幼 +遵守游戏规则。 第二次游戏:创设3组不同难度的障碍,引导幼儿自主参与。 重点鼓励幼儿灵活运用不同的爬行方式穿越障碍物夺取粮食。 小结:恭恭 +小蚂蚁们勇敢迎接挑战,成功运回所有粮食。 (四)放松身心,调节情绪 教师带领幼儿进行放松:四散站在场地上,轻轻拍打胳膊、腿部,按摩膝盖盖手腕,放松身心。 | |