Merge pull request #14 from duiniuluantanqin/main

support re-register
This commit is contained in:
Haibo Chen(陈海博)
2025-03-12 15:30:34 +08:00
committed by GitHub
9 changed files with 107226 additions and 37 deletions

18
.vscode/launch.json vendored
View File

@ -5,14 +5,28 @@
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{ {
"name": "Launch Binary", "name": "Launch Binary (Windows)",
"type": "go",
"request": "launch",
"mode": "exec",
"program": "${workspaceFolder}/objs/srs-sip.exe",
"cwd": "${workspaceFolder}/objs",
"env": {},
"args": [],
"preLaunchTask": "build-windows",
"windows": {}
},
{
"name": "Launch Binary (Linux)",
"type": "go", "type": "go",
"request": "launch", "request": "launch",
"mode": "exec", "mode": "exec",
"program": "${workspaceFolder}/objs/srs-sip", "program": "${workspaceFolder}/objs/srs-sip",
"cwd": "${workspaceFolder}/objs", "cwd": "${workspaceFolder}/objs",
"env": {}, "env": {},
"args": [] "args": [],
"preLaunchTask": "build-linux",
"linux": {}
} }
] ]
} }

25
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,25 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "build-windows",
"type": "shell",
"command": ".\\build.bat build",
"group": "build",
"presentation": {
"reveal": "always"
},
"problemMatcher": []
},
{
"label": "build-linux",
"type": "shell",
"command": "./build.sh build",
"group": "build",
"presentation": {
"reveal": "always"
},
"problemMatcher": []
}
]
}

BIN
doc/GBT28181-2016.pdf Normal file

Binary file not shown.

File diff suppressed because one or more lines are too long

View File

@ -68,6 +68,28 @@ type ChannelInfo struct {
type ChannelStatus string type ChannelStatus string
// BasicParam
// <! -- 基本参数配置(可选)-->
// <elementname="BasicParam"minOccurs="0">
// <complexType>
// <sequence>
// <! -- 设备名称(可选)-->
// <elementname="Name"type="string" minOccurs="0"/>
// <! -- 注册过期时间(可选)-->
// <elementname="Expiration"type="integer" minOccurs="0"/>
// <! -- 心跳间隔时间(可选)-->
// <elementname="HeartBeatInterval"type="integer" minOccurs="0"/>
// <! -- 心跳超时次数(可选)-->
// <elementname="HeartBeatCount"type="integer" minOccurs="0"/>
// </sequence>
// </complexType>
type BasicParam struct {
Name string `xml:"Name"`
Expiration int `xml:"Expiration"`
HeartBeatInterval int `xml:"HeartBeatInterval"`
HeartBeatCount int `xml:"HeartBeatCount"`
}
type XmlMessageInfo struct { type XmlMessageInfo struct {
XMLName xml.Name XMLName xml.Name
CmdType string CmdType string
@ -79,5 +101,6 @@ type XmlMessageInfo struct {
Channel string Channel string
DeviceList []ChannelInfo `xml:"DeviceList>Item"` DeviceList []ChannelInfo `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"` RecordList []*Record `xml:"RecordList>Item"`
BasicParam BasicParam `xml:"BasicParam"`
SumNum int SumNum int
} }

View File

@ -1,9 +1,12 @@
package service package service
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/srs-sip/pkg/models" "github.com/ossrs/srs-sip/pkg/models"
) )
@ -12,10 +15,20 @@ type DeviceInfo struct {
SourceAddr string `json:"source_addr"` SourceAddr string `json:"source_addr"`
NetworkType string `json:"network_type"` NetworkType string `json:"network_type"`
ChannelMap sync.Map `json:"-"` ChannelMap sync.Map `json:"-"`
Online bool `json:"online"`
HeartBeatInterval int `json:"heart_beat_interval"`
HeartBeatCount int `json:"heart_beat_count"`
lastHeartbeat time.Time `json:"-"`
} }
const (
DefaultHeartbeatInterval = 60 * time.Second // 心跳检查间隔时间
)
type deviceManager struct { type deviceManager struct {
devices sync.Map devices sync.Map
heartbeatChecker *time.Ticker // 心跳检查定时器
stopChan chan struct{} // 停止信号通道
} }
var instance *deviceManager var instance *deviceManager
@ -25,12 +38,96 @@ func GetDeviceManager() *deviceManager {
once.Do(func() { once.Do(func() {
instance = &deviceManager{ instance = &deviceManager{
devices: sync.Map{}, devices: sync.Map{},
stopChan: make(chan struct{}),
} }
// 启动心跳检查
instance.startHeartbeatChecker()
}) })
return instance return instance
} }
// 启动心跳检查器
func (dm *deviceManager) startHeartbeatChecker() {
dm.heartbeatChecker = time.NewTicker(3 * time.Second) // 每3秒检查一次
go func() {
for {
select {
case <-dm.heartbeatChecker.C:
dm.checkHeartbeats()
case <-dm.stopChan:
dm.heartbeatChecker.Stop()
return
}
}
}()
}
// 停止心跳检查器
func (dm *deviceManager) stopHeartbeatChecker() {
close(dm.stopChan)
}
// 检查所有设备的心跳状态
func (dm *deviceManager) checkHeartbeats() {
now := time.Now()
dm.devices.Range(func(key, value interface{}) bool {
device := value.(*DeviceInfo)
if device.HeartBeatInterval == 0 {
device.HeartBeatInterval = int(DefaultHeartbeatInterval)
}
// 如果最后心跳时间超过超时时间,则将设备所有通道状态设置为离线
if now.Sub(device.lastHeartbeat) > time.Duration(device.HeartBeatInterval*device.HeartBeatCount)*time.Second {
isOffline := false
device.ChannelMap.Range(func(key, value interface{}) bool {
channel := value.(models.ChannelInfo)
if channel.Status != models.ChannelStatus("OFF") {
isOffline = true
channel.Status = models.ChannelStatus("OFF")
device.ChannelMap.Store(key, channel)
}
return true
})
if isOffline {
device.SourceAddr = ""
device.Online = false
dm.devices.Store(key, device)
logger.Wf(context.Background(), "Device %s is offline due to heartbeat timeout, HeartBeatInterval: %v", device.DeviceID, device.HeartBeatInterval)
}
}
return true
})
}
func (dm *deviceManager) UpdateDeviceHeartbeat(id string) {
if device, ok := dm.GetDevice(id); ok {
device.lastHeartbeat = time.Now()
// 检查是否需要将通道状态设置为在线
isUpdated := false
device.ChannelMap.Range(func(key, value interface{}) bool {
channel := value.(models.ChannelInfo)
if channel.Status != models.ChannelStatus("ON") {
isUpdated = true
channel.Status = models.ChannelStatus("ON")
device.ChannelMap.Store(key, channel)
}
return true
})
if isUpdated {
device.Online = true
dm.devices.Store(id, device)
}
}
}
func (dm *deviceManager) AddDevice(id string, info *DeviceInfo) { func (dm *deviceManager) AddDevice(id string, info *DeviceInfo) {
// 设置初始心跳时间
info.lastHeartbeat = time.Now()
channel := models.ChannelInfo{ channel := models.ChannelInfo{
DeviceID: id, DeviceID: id,
ParentID: id, ParentID: id,
@ -62,6 +159,31 @@ func (dm *deviceManager) GetDevice(id string) (*DeviceInfo, bool) {
return v.(*DeviceInfo), true return v.(*DeviceInfo), true
} }
// UpdateDevice 更新设备信息
func (dm *deviceManager) UpdateDevice(id string, device *DeviceInfo) {
dm.devices.Store(id, device)
}
// UpdateDeviceConfig 更新设备配置信息
func (dm *deviceManager) UpdateDeviceConfig(deviceID string, basicParam *models.BasicParam) {
device, ok := dm.GetDevice(deviceID)
if !ok {
return
}
if basicParam != nil {
// 更新设备心跳相关配置
if basicParam.HeartBeatInterval > 0 {
device.HeartBeatInterval = basicParam.HeartBeatInterval
}
if basicParam.HeartBeatCount > 0 {
device.HeartBeatCount = basicParam.HeartBeatCount
}
dm.devices.Store(deviceID, device)
}
}
// ChannelParser defines interface for different manufacturer's channel parsing // ChannelParser defines interface for different manufacturer's channel parsing
type ChannelParser interface { type ChannelParser interface {
ParseChannels(list ...models.ChannelInfo) ([]models.ChannelInfo, error) ParseChannels(list ...models.ChannelInfo) ([]models.ChannelInfo, error)

View File

@ -73,15 +73,21 @@ func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
s.respondRegister(req, http.StatusOK, "OK", tx) s.respondRegister(req, http.StatusOK, "OK", tx)
logger.Tf(s.ctx, "%s Register success, source:%s, req: %s", id, req.Source(), req.String()) logger.Tf(s.ctx, "%s Register success, source:%s, req: %s", id, req.Source(), req.String())
go s.ConfigDownload(id)
go s.Catalog(id) go s.Catalog(id)
} else { } else {
if d.SourceAddr != req.Source() { if d.SourceAddr != "" && d.SourceAddr != req.Source() {
logger.Ef(s.ctx, "Device %s[%s] already registered, %s is NOT allowed.", id, d.SourceAddr, req.Source()) logger.Ef(s.ctx, "Device %s[%s] already registered, %s is NOT allowed.", id, d.SourceAddr, req.Source())
// TODO: 国标没有明确定义重复ID注册的处理方式这里暂时返回冲突 // TODO: 如果ID重复应采用虚拟ID
s.respondRegister(req, http.StatusConflict, "Conflict Device ID", tx) s.respondRegister(req, http.StatusBadRequest, "Conflict Device ID", tx)
} else { } else {
// TODO: 刷新DM里面的设备信息 d.SourceAddr = req.Source()
d.NetworkType = req.Transport()
d.Online = true
DM.UpdateDevice(id, d)
s.respondRegister(req, http.StatusOK, "OK", tx) s.respondRegister(req, http.StatusOK, "OK", tx)
logger.Tf(s.ctx, "%s Re-register success, source:%s, req: %s", id, req.Source(), req.String())
} }
} }
} }
@ -111,8 +117,10 @@ func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
switch temp.CmdType { switch temp.CmdType {
case "Keepalive": case "Keepalive":
logger.T(s.ctx, "Keepalive") logger.T(s.ctx, "Keepalive")
if _, ok := DM.GetDevice(temp.DeviceID); !ok { if d, ok := DM.GetDevice(temp.DeviceID); ok && d.Online {
// unregister device // 更新设备心跳时间
DM.UpdateDeviceHeartbeat(temp.DeviceID)
} else {
tx.Respond(sip.NewResponseFromRequest(req, http.StatusBadRequest, "", nil)) tx.Respond(sip.NewResponseFromRequest(req, http.StatusBadRequest, "", nil))
return return
} }
@ -121,6 +129,9 @@ func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
logger.T(s.ctx, "Catalog") logger.T(s.ctx, "Catalog")
DM.UpdateChannels(temp.DeviceID, temp.DeviceList...) DM.UpdateChannels(temp.DeviceID, temp.DeviceList...)
//go s.AutoInvite(temp.DeviceID, temp.DeviceList...) //go s.AutoInvite(temp.DeviceID, temp.DeviceList...)
case "ConfigDownload":
logger.T(s.ctx, "ConfigDownload")
DM.UpdateDeviceConfig(temp.DeviceID, &temp.BasicParam)
case "Alarm": case "Alarm":
logger.T(s.ctx, "Alarm") logger.T(s.ctx, "Alarm")
case "RecordInfo": case "RecordInfo":

View File

@ -108,6 +108,7 @@ Scale: %.1f
speedRequest.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSRTSP")) speedRequest.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSRTSP"))
return speedRequest return speedRequest
} }
func (s *UAS) AddSession(key string, status Session) { func (s *UAS) AddSession(key string, status Session) {
logger.Tf(s.ctx, "AddSession: %s, %+v", key, status) logger.Tf(s.ctx, "AddSession: %s, %+v", key, status)
s.Streams.Store(key, status) s.Streams.Store(key, status)
@ -176,6 +177,37 @@ func (s *UAS) InitMediaServer(req models.InviteRequest) error {
return nil return nil
} }
func (s *UAS) handleSipTransaction(req *sip.Request) (*sip.Response, error) {
tx, err := s.sipCli.TransactionRequest(s.ctx, req)
if err != nil {
return nil, errors.Wrapf(err, "transaction request error")
}
res, err := s.waitAnswer(tx)
if err != nil {
return nil, errors.Wrapf(err, "wait answer error")
}
if res.StatusCode != 200 {
return nil, errors.Errorf("response error: %s", res.String())
}
return res, nil
}
func (s *UAS) isPublishing(key string) bool {
c, ok := s.GetSession(key)
if !ok {
return false
}
// Check if stream already exists
if p, err := s.media.GetStreamStatus(c.Ssrc); err != nil || !p {
return false
}
return true
}
func (s *UAS) Invite(req models.InviteRequest) (*Session, error) { func (s *UAS) Invite(req models.InviteRequest) (*Session, error) {
key := fmt.Sprintf("%d:%s:%s:%d:%d:%d:%d", req.MediaServerId, req.DeviceID, req.ChannelID, req.SubStream, req.PlayType, req.StartTime, req.EndTime) key := fmt.Sprintf("%d:%s:%s:%d:%d:%d:%d", req.MediaServerId, req.DeviceID, req.ChannelID, req.SubStream, req.PlayType, req.StartTime, req.EndTime)
@ -267,19 +299,6 @@ func (s *UAS) Invite(req models.InviteRequest) (*Session, error) {
return &session, nil return &session, nil
} }
func (s *UAS) isPublishing(key string) bool {
c, ok := s.GetSession(key)
if !ok {
return false
}
// Check if stream already exists
if p, err := s.media.GetStreamStatus(c.Ssrc); err != nil || !p {
return false
}
return true
}
func (s *UAS) Bye(req models.ByeRequest) error { func (s *UAS) Bye(req models.ByeRequest) error {
key, session := s.GetSessionByURL(req.URL) key, session := s.GetSessionByURL(req.URL)
@ -513,19 +532,41 @@ func (s *UAS) QueryRecord(deviceID, channelID string, startTime, endTime int64)
} }
} }
func (s *UAS) handleSipTransaction(req *sip.Request) (*sip.Response, error) { // ConfigDownload
tx, err := s.sipCli.TransactionRequest(s.ctx, req) // <?xml version="1.0"?>
if err != nil { // <Control>
return nil, errors.Wrapf(err, "transaction request error") // <CmdType>ConfigDownload</CmdType>
// <SN>474</SN>
// <DeviceID>33010602001310019325</DeviceID>
// </Control>
func (s *UAS) ConfigDownload(deviceID string) error {
var deviceConfigXML = `<?xml version="1.0"?>
<Control>
<CmdType>ConfigDownload</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<ConfigType>BasicParam</ConfigType>
</Control>
`
d, ok := DM.GetDevice(deviceID)
if !ok {
return errors.Errorf("device %s not found", deviceID)
} }
res, err := s.waitAnswer(tx) body := fmt.Sprintf(deviceConfigXML, s.getSN(), deviceID)
req, err := stack.NewMessageRequest([]byte(body), stack.OutboundConfig{
Via: d.SourceAddr,
To: d.DeviceID,
From: s.conf.GB28181.Serial,
Transport: d.NetworkType,
})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "wait answer error") return errors.Wrapf(err, "build device config request error")
}
if res.StatusCode != 200 {
return nil, errors.Errorf("response error: %s", res.String())
} }
return res, nil _, err = s.handleSipTransaction(req)
return err
} }

View File

@ -49,4 +49,6 @@ func (s *Service) Start() error {
func (s *Service) Stop() { func (s *Service) Stop() {
s.Uac.Stop() s.Uac.Stop()
s.Uas.Stop() s.Uas.Stop()
// 停止设备心跳检查器
GetDeviceManager().stopHeartbeatChecker()
} }