A signaling server for GB28181
This commit is contained in:
137
pkg/api/api-controller.go
Normal file
137
pkg/api/api-controller.go
Normal file
@ -0,0 +1,137 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/ossrs/srs-sip/pkg/service"
|
||||
)
|
||||
|
||||
func (h *HttpApiServer) RegisterRoutes(router *mux.Router) {
|
||||
|
||||
apiV1Router := router.PathPrefix("/srs-sip/v1").Subrouter()
|
||||
|
||||
// Add Auth middleware
|
||||
//apiV1Router.Use(authMiddleware)
|
||||
|
||||
apiV1Router.HandleFunc("/devices", h.ApiListDevices).Methods(http.MethodGet)
|
||||
apiV1Router.HandleFunc("/devices/{id}/channels", h.ApiGetChannelByDeviceId).Methods(http.MethodGet)
|
||||
apiV1Router.HandleFunc("/channels", h.ApiGetAllChannels).Methods(http.MethodGet)
|
||||
|
||||
apiV1Router.HandleFunc("/invite", h.ApiInvite).Methods(http.MethodPost)
|
||||
apiV1Router.HandleFunc("/bye", h.ApiBye).Methods(http.MethodPost)
|
||||
apiV1Router.HandleFunc("/ptz", h.ApiPTZControl).Methods(http.MethodPost)
|
||||
|
||||
apiV1Router.HandleFunc("", h.GetAPIRoutes(apiV1Router)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/srs-sip", h.ApiGetAPIVersion).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) RespondWithJSON(w http.ResponseWriter, code int, data interface{}) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
wrapper := map[string]interface{}{
|
||||
"code": code,
|
||||
"data": data,
|
||||
}
|
||||
json.NewEncoder(w).Encode(wrapper)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) RespondWithJSONSimple(w http.ResponseWriter, jsonStr string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(jsonStr))
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) GetAPIRoutes(router *mux.Router) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var routes []map[string]string
|
||||
|
||||
router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
|
||||
path, err := route.GetPathTemplate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
methods, err := route.GetMethods()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, method := range methods {
|
||||
routes = append(routes, map[string]string{
|
||||
"method": method,
|
||||
"path": path,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
h.RespondWithJSON(w, 0, routes)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiGetAPIVersion(w http.ResponseWriter, r *http.Request) {
|
||||
h.RespondWithJSONSimple(w, `{"version": "v1"}`)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiListDevices(w http.ResponseWriter, r *http.Request) {
|
||||
list := service.DM.GetDevices()
|
||||
h.RespondWithJSON(w, 0, list)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiGetChannelByDeviceId(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
id := vars["id"]
|
||||
channels := service.DM.ApiGetChannelByDeviceId(id)
|
||||
h.RespondWithJSON(w, 0, channels)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiGetAllChannels(w http.ResponseWriter, r *http.Request) {
|
||||
channels := service.DM.GetAllVideoChannels()
|
||||
h.RespondWithJSON(w, 0, channels)
|
||||
}
|
||||
|
||||
// request: {"device_id": "1", "channel_id": "1", "sub_stream": 0}
|
||||
// response: {"code": 0, "data": {"channel_id": "1", "url": "webrtc://"}}
|
||||
func (h *HttpApiServer) ApiInvite(w http.ResponseWriter, r *http.Request) {
|
||||
// Parse request
|
||||
var req map[string]string
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Get device and channel
|
||||
deviceID := req["device_id"]
|
||||
channelID := req["channel_id"]
|
||||
//subStream := req["sub_stream"]
|
||||
|
||||
code := 0
|
||||
url := ""
|
||||
|
||||
defer func() {
|
||||
data := map[string]string{
|
||||
"channel_id": channelID,
|
||||
"url": url,
|
||||
}
|
||||
h.RespondWithJSON(w, code, data)
|
||||
}()
|
||||
|
||||
if err := h.sipSvr.Uas.Invite(deviceID, channelID); err != nil {
|
||||
code = http.StatusInternalServerError
|
||||
return
|
||||
}
|
||||
c, ok := h.sipSvr.Uas.GetVideoChannelStatue(channelID)
|
||||
if !ok {
|
||||
code = http.StatusInternalServerError
|
||||
return
|
||||
}
|
||||
url = "webrtc://" + h.conf.MediaAddr + "/live/" + c.Ssrc
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiBye(w http.ResponseWriter, r *http.Request) {
|
||||
h.RespondWithJSONSimple(w, `{"msg":"Not implemented"}`)
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) ApiPTZControl(w http.ResponseWriter, r *http.Request) {
|
||||
h.RespondWithJSONSimple(w, `{"msg":"Not implemented"}`)
|
||||
}
|
||||
45
pkg/api/api.go
Normal file
45
pkg/api/api.go
Normal file
@ -0,0 +1,45 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/ossrs/srs-sip/pkg/service"
|
||||
)
|
||||
|
||||
type HttpApiServer struct {
|
||||
conf *config.MainConfig
|
||||
sipSvr *service.Service
|
||||
}
|
||||
|
||||
func NewHttpApiServer(r0 interface{}, svr *service.Service) (*HttpApiServer, error) {
|
||||
return &HttpApiServer{
|
||||
conf: r0.(*config.MainConfig),
|
||||
sipSvr: svr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *HttpApiServer) Start() {
|
||||
router := mux.NewRouter().StrictSlash(true)
|
||||
h.RegisterRoutes(router)
|
||||
|
||||
headers := handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"})
|
||||
methods := handlers.AllowedMethods([]string{"GET", "POST", "PUT", "DELETE", "OPTIONS"})
|
||||
origins := handlers.AllowedOrigins([]string{"*"})
|
||||
|
||||
go func() {
|
||||
ctx := context.Background()
|
||||
addr := fmt.Sprintf(":%v", h.conf.APIPort)
|
||||
logger.Tf(ctx, "http api listen on %s", addr)
|
||||
err := http.ListenAndServe(addr, handlers.CORS(headers, methods, origins)(router))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
56
pkg/config/config.go
Normal file
56
pkg/config/config.go
Normal file
@ -0,0 +1,56 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
type MainConfig struct {
|
||||
Serial string `ymal:"serial"`
|
||||
Realm string `ymal:"realm"`
|
||||
SipHost string `ymal:"sip-host"`
|
||||
SipPort int `ymal:"sip-port"`
|
||||
MediaAddr string `ymal:"media-addr"`
|
||||
HttpServerPort int `ymal:"http-server-port"`
|
||||
APIPort int `ymal:"api-port"`
|
||||
}
|
||||
|
||||
func GetLocalIP() (string, error) {
|
||||
ifaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
return "", nil
|
||||
}
|
||||
type Iface struct {
|
||||
Name string
|
||||
Addr net.IP
|
||||
}
|
||||
var candidates []Iface
|
||||
for _, ifc := range ifaces {
|
||||
if ifc.Flags&net.FlagUp == 0 || ifc.Flags&net.FlagUp == 0 {
|
||||
continue
|
||||
}
|
||||
if ifc.Flags&(net.FlagPointToPoint|net.FlagLoopback) != 0 {
|
||||
continue
|
||||
}
|
||||
addrs, err := ifc.Addrs()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
ipnet, ok := addr.(*net.IPNet)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if ip4 := ipnet.IP.To4(); ip4 != nil {
|
||||
candidates = append(candidates, Iface{
|
||||
Name: ifc.Name, Addr: ip4,
|
||||
})
|
||||
//logger.Tf("considering interface", "iface", ifc.Name, "ip", ip4)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
return "", fmt.Errorf("No local IP found")
|
||||
}
|
||||
return candidates[0].Addr.String(), nil
|
||||
}
|
||||
17
pkg/service/cascade.go
Normal file
17
pkg/service/cascade.go
Normal file
@ -0,0 +1,17 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
)
|
||||
|
||||
type Cascade struct {
|
||||
ua *sipgo.UserAgent
|
||||
sipCli *sipgo.Client
|
||||
sipSvr *sipgo.Server
|
||||
|
||||
ctx context.Context
|
||||
conf *config.MainConfig
|
||||
}
|
||||
166
pkg/service/device.go
Normal file
166
pkg/service/device.go
Normal file
@ -0,0 +1,166 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ossrs/srs-sip/pkg/utils"
|
||||
)
|
||||
|
||||
// <Item>
|
||||
// <DeviceID>34020000001320000002</DeviceID>
|
||||
// <Name>209</Name>
|
||||
// <Manufacturer>UNIVIEW</Manufacturer>
|
||||
// <Model>HIC6622-IR@X33-VF</Model>
|
||||
// <Owner>IPC-B2202.7.11.230222</Owner>
|
||||
// <CivilCode>CivilCode</CivilCode>
|
||||
// <Address>Address</Address>
|
||||
// <Parental>1</Parental>
|
||||
// <ParentID>75015310072008100002</ParentID>
|
||||
// <SafetyWay>0</SafetyWay>
|
||||
// <RegisterWay>1</RegisterWay>
|
||||
// <Secrecy>0</Secrecy>
|
||||
// <Status>ON</Status>
|
||||
// <Longitude>0.0000000</Longitude>
|
||||
// <Latitude>0.0000000</Latitude>
|
||||
// <Info>
|
||||
// <PTZType>1</PTZType>
|
||||
// <Resolution>6/4/2</Resolution>
|
||||
// <DownloadSpeed>0</DownloadSpeed>
|
||||
// </Info>
|
||||
// </Item>
|
||||
|
||||
type ChannelInfo struct {
|
||||
DeviceID string `json:"device_id"`
|
||||
ParentID string `json:"parent_id"`
|
||||
Name string `json:"name"`
|
||||
Manufacturer string `json:"manufacturer"`
|
||||
Model string `json:"model"`
|
||||
Owner string `json:"owner"`
|
||||
CivilCode string `json:"civil_code"`
|
||||
Address string `json:"address"`
|
||||
Port int `json:"port"`
|
||||
Parental int `json:"parental"`
|
||||
SafetyWay int `json:"safety_way"`
|
||||
RegisterWay int `json:"register_way"`
|
||||
Secrecy int `json:"secrecy"`
|
||||
IPAddress string `json:"ip_address"`
|
||||
Status ChannelStatus `json:"status"`
|
||||
Longitude float64 `json:"longitude"`
|
||||
Latitude float64 `json:"latitude"`
|
||||
Info struct {
|
||||
PTZType int `json:"ptz_type"`
|
||||
Resolution string `json:"resolution"`
|
||||
DownloadSpeed string `json:"download_speed"` // 1/2/4/8
|
||||
} `json:"info"`
|
||||
|
||||
// custom fields
|
||||
Ssrc string `json:"ssrc"`
|
||||
}
|
||||
|
||||
type ChannelStatus string
|
||||
|
||||
type DeviceInfo struct {
|
||||
DeviceID string `json:"device_id"`
|
||||
SourceAddr string `json:"source_addr"`
|
||||
NetworkType string `json:"network_type"`
|
||||
ChannelMap sync.Map `json:"-"`
|
||||
}
|
||||
|
||||
type deviceManager struct {
|
||||
devices sync.Map
|
||||
}
|
||||
|
||||
var instance *deviceManager
|
||||
var once sync.Once
|
||||
|
||||
func GetDeviceManager() *deviceManager {
|
||||
once.Do(func() {
|
||||
instance = &deviceManager{
|
||||
devices: sync.Map{},
|
||||
}
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
func (dm *deviceManager) AddDevice(id string, info *DeviceInfo) {
|
||||
dm.devices.Store(id, info)
|
||||
}
|
||||
|
||||
func (dm *deviceManager) RemoveDevice(id string) {
|
||||
dm.devices.Delete(id)
|
||||
}
|
||||
|
||||
func (dm *deviceManager) GetDevices() []*DeviceInfo {
|
||||
list := make([]*DeviceInfo, 0)
|
||||
dm.devices.Range(func(key, value interface{}) bool {
|
||||
list = append(list, value.(*DeviceInfo))
|
||||
return true
|
||||
})
|
||||
return list
|
||||
}
|
||||
|
||||
func (dm *deviceManager) GetDevice(id string) (*DeviceInfo, bool) {
|
||||
v, ok := dm.devices.Load(id)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return v.(*DeviceInfo), true
|
||||
}
|
||||
|
||||
func (dm *deviceManager) UpdateChannels(deviceID string, list ...ChannelInfo) {
|
||||
device, ok := dm.GetDevice(deviceID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for _, channel := range list {
|
||||
device.ChannelMap.Store(channel.DeviceID, channel)
|
||||
}
|
||||
dm.devices.Store(deviceID, device)
|
||||
}
|
||||
|
||||
func (dm *deviceManager) ApiGetChannelByDeviceId(deviceID string) []ChannelInfo {
|
||||
device, ok := dm.GetDevice(deviceID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
channels := make([]ChannelInfo, 0)
|
||||
device.ChannelMap.Range(func(key, value interface{}) bool {
|
||||
channels = append(channels, value.(ChannelInfo))
|
||||
return true
|
||||
})
|
||||
return channels
|
||||
}
|
||||
|
||||
func (dm *deviceManager) GetAllVideoChannels() []ChannelInfo {
|
||||
channels := make([]ChannelInfo, 0)
|
||||
dm.devices.Range(func(key, value interface{}) bool {
|
||||
device := value.(*DeviceInfo)
|
||||
device.ChannelMap.Range(func(key, value interface{}) bool {
|
||||
if utils.IsVideoChannel(value.(ChannelInfo).DeviceID) {
|
||||
channels = append(channels, value.(ChannelInfo))
|
||||
return true
|
||||
}
|
||||
return true
|
||||
})
|
||||
return true
|
||||
})
|
||||
return channels
|
||||
}
|
||||
|
||||
func (dm *deviceManager) GetDeviceInfoByChannel(channelID string) (*DeviceInfo, bool) {
|
||||
var device *DeviceInfo
|
||||
found := false
|
||||
dm.devices.Range(func(key, value interface{}) bool {
|
||||
d := value.(*DeviceInfo)
|
||||
_, ok := d.ChannelMap.Load(channelID)
|
||||
if ok {
|
||||
device = d
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return device, found
|
||||
}
|
||||
153
pkg/service/inbound.go
Normal file
153
pkg/service/inbound.go
Normal file
@ -0,0 +1,153 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
"golang.org/x/net/html/charset"
|
||||
)
|
||||
|
||||
const GB28181_ID_LENGTH = 20
|
||||
|
||||
type VideoChannelStatus struct {
|
||||
ID string
|
||||
ParentID string
|
||||
MediaHost string
|
||||
MediaPort int
|
||||
Ssrc string
|
||||
Status string
|
||||
}
|
||||
|
||||
func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
id := req.From().Address.User
|
||||
if len(id) != GB28181_ID_LENGTH {
|
||||
logger.E(s.ctx, "invalid device ID")
|
||||
return
|
||||
}
|
||||
|
||||
isUnregister := false
|
||||
if exps := req.GetHeaders("Expires"); len(exps) > 0 {
|
||||
exp := exps[0]
|
||||
expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
|
||||
if err != nil {
|
||||
logger.Ef(s.ctx, "parse expires header error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
if expSec == 0 {
|
||||
isUnregister = true
|
||||
}
|
||||
} else {
|
||||
logger.E(s.ctx, "empty expires header")
|
||||
return
|
||||
}
|
||||
|
||||
if isUnregister {
|
||||
DM.RemoveDevice(id)
|
||||
logger.Wf(s.ctx, "Device %s unregistered", id)
|
||||
return
|
||||
} else {
|
||||
if d, ok := DM.GetDevice(id); !ok {
|
||||
DM.AddDevice(id, &DeviceInfo{
|
||||
DeviceID: id,
|
||||
SourceAddr: req.Source(),
|
||||
NetworkType: req.Transport(),
|
||||
})
|
||||
s.respondRegister(req, http.StatusOK, "OK", tx)
|
||||
logger.Tf(s.ctx, "%s Register success, source:%s, req: %s", id, req.Source(), req.String())
|
||||
|
||||
go s.Catalog(id)
|
||||
} else {
|
||||
if d.SourceAddr != req.Source() {
|
||||
logger.Ef(s.ctx, "Device %s[%s] already registered, %s is NOT allowed.", id, d.SourceAddr, req.Source())
|
||||
// TODO: 国标没有明确定义重复ID注册的处理方式,这里暂时返回冲突
|
||||
s.respondRegister(req, http.StatusConflict, "Conflict Device ID", tx)
|
||||
} else {
|
||||
// TODO: 刷新DM里面的设备信息
|
||||
s.respondRegister(req, http.StatusOK, "OK", tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UAS) respondRegister(req *sip.Request, code sip.StatusCode, reason string, tx sip.ServerTransaction) {
|
||||
res := stack.NewRegisterResponse(req, code, reason)
|
||||
_ = tx.Respond(res)
|
||||
|
||||
}
|
||||
|
||||
func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
id := req.From().Address.User
|
||||
if len(id) != 20 {
|
||||
logger.Ef(s.ctx, "invalid device ID %s", req.String())
|
||||
}
|
||||
|
||||
//logger.Tf(s.ctx, "Received MESSAGE: %s", req.String())
|
||||
|
||||
temp := &struct {
|
||||
XMLName xml.Name
|
||||
CmdType string
|
||||
SN int // 请求序列号,一般用于对应 request 和 response
|
||||
DeviceID string
|
||||
DeviceName string
|
||||
Manufacturer string
|
||||
Model string
|
||||
Channel string
|
||||
DeviceList []ChannelInfo `xml:"DeviceList>Item"`
|
||||
// RecordList []*Record `xml:"RecordList>Item"`
|
||||
// SumNum int
|
||||
}{}
|
||||
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
if err := decoder.Decode(temp); err != nil {
|
||||
logger.Ef(s.ctx, "decode message error: %s\n message:%s", err.Error(), req.Body())
|
||||
}
|
||||
var body string
|
||||
switch temp.CmdType {
|
||||
case "Keepalive":
|
||||
logger.T(s.ctx, "Keepalive")
|
||||
if _, ok := DM.GetDevice(temp.DeviceID); !ok {
|
||||
// unregister device
|
||||
tx.Respond(sip.NewResponseFromRequest(req, http.StatusBadRequest, "", nil))
|
||||
return
|
||||
}
|
||||
case "SensorCatalog": // 兼容宇视,非国标
|
||||
case "Catalog":
|
||||
logger.T(s.ctx, "Catalog")
|
||||
DM.UpdateChannels(temp.DeviceID, temp.DeviceList...)
|
||||
//go s.AutoInvite(temp.DeviceID, temp.DeviceList...)
|
||||
case "Alarm":
|
||||
logger.T(s.ctx, "Alarm")
|
||||
default:
|
||||
logger.Wf(s.ctx, "Not supported CmdType: %s", temp.CmdType)
|
||||
response := sip.NewResponseFromRequest(req, http.StatusBadRequest, "", nil)
|
||||
tx.Respond(response)
|
||||
return
|
||||
}
|
||||
tx.Respond(sip.NewResponseFromRequest(req, http.StatusOK, "OK", []byte(body)))
|
||||
}
|
||||
|
||||
func (s *UAS) onNotify(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(s.ctx, "Received NOTIFY request")
|
||||
tx.Respond(sip.NewResponseFromRequest(req, http.StatusOK, "OK", nil))
|
||||
}
|
||||
|
||||
func (s *UAS) AddVideoChannelStatue(channelID string, status VideoChannelStatus) {
|
||||
s.channelsStatue.Store(channelID, status)
|
||||
}
|
||||
|
||||
func (s *UAS) GetVideoChannelStatue(channelID string) (VideoChannelStatus, bool) {
|
||||
v, ok := s.channelsStatue.Load(channelID)
|
||||
if !ok {
|
||||
return VideoChannelStatus{}, false
|
||||
}
|
||||
return v.(VideoChannelStatus), true
|
||||
}
|
||||
|
||||
func (s *UAS) RemoveVideoChannelStatue(channelID string) {
|
||||
s.channelsStatue.Delete(channelID)
|
||||
}
|
||||
169
pkg/service/outbound.go
Normal file
169
pkg/service/outbound.go
Normal file
@ -0,0 +1,169 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
"github.com/ossrs/srs-sip/pkg/utils"
|
||||
)
|
||||
|
||||
func (s *UAS) AutoInvite(deviceID string, list ...ChannelInfo) {
|
||||
for _, c := range list {
|
||||
if c.Status == "ON" && utils.IsVideoChannel(c.DeviceID) {
|
||||
if err := s.Invite(deviceID, c.DeviceID); err != nil {
|
||||
logger.Ef(s.ctx, "invite error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UAS) Invite(deviceID, channelID string) error {
|
||||
if s.isPublishing(channelID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ssrc := utils.CreateSSRC(true)
|
||||
|
||||
mediaPort, err := s.signal.Publish(ssrc, ssrc)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "api gb publish request error")
|
||||
}
|
||||
|
||||
mediaHost := strings.Split(s.conf.MediaAddr, ":")[0]
|
||||
if mediaHost == "" {
|
||||
return errors.Errorf("media host is empty")
|
||||
}
|
||||
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channelID, mediaHost),
|
||||
"s=" + "Play",
|
||||
"u=" + channelID + ":0",
|
||||
"c=IN IP4 " + mediaHost,
|
||||
"t=0 0", // start time and end time
|
||||
fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort),
|
||||
"a=recvonly",
|
||||
"a=rtpmap:96 PS/90000",
|
||||
"y=" + ssrc,
|
||||
"\r\n",
|
||||
}
|
||||
if true { // support tcp only
|
||||
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
}
|
||||
|
||||
// TODO: 需要考虑不同设备,通道ID相同的情况
|
||||
d, ok := DM.GetDeviceInfoByChannel(channelID)
|
||||
if !ok {
|
||||
return errors.Errorf("device not found by %s", channelID)
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("%s:%s,%s:0", channelID, ssrc, s.conf.Serial)
|
||||
|
||||
req, err := stack.NewInviteRequest([]byte(strings.Join(sdpInfo, "\r\n")), subject, stack.OutboundConfig{
|
||||
Via: d.SourceAddr,
|
||||
To: d.DeviceID,
|
||||
From: s.conf.Serial,
|
||||
Transport: d.NetworkType,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "build invite request error")
|
||||
}
|
||||
tx, err := s.sipCli.TransactionRequest(s.ctx, req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "transaction request error")
|
||||
}
|
||||
|
||||
res, err := s.waitAnswer(tx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "wait answer error")
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
return errors.Errorf("invite response error: %s", res.String())
|
||||
}
|
||||
|
||||
ack := sip.NewAckRequest(req, res, nil)
|
||||
s.sipCli.WriteRequest(ack)
|
||||
|
||||
s.AddVideoChannelStatue(channelID, VideoChannelStatus{
|
||||
ID: channelID,
|
||||
ParentID: deviceID,
|
||||
MediaHost: mediaHost,
|
||||
MediaPort: mediaPort,
|
||||
Ssrc: ssrc,
|
||||
Status: "ON",
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *UAS) isPublishing(channelID string) bool {
|
||||
c, err := s.GetVideoChannelStatue(channelID)
|
||||
if !err {
|
||||
return false
|
||||
}
|
||||
|
||||
if p, err := s.signal.GetStreamStatus(c.Ssrc); err != nil || !p {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *UAS) Bye() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *UAS) Catalog(deviceID string) error {
|
||||
var CatalogXML = `<?xml version="1.0"?><Query>
|
||||
<CmdType>Catalog</CmdType>
|
||||
<SN>%d</SN>
|
||||
<DeviceID>%s</DeviceID>
|
||||
</Query>
|
||||
`
|
||||
|
||||
d, ok := DM.GetDevice(deviceID)
|
||||
if !ok {
|
||||
return errors.Errorf("device %s not found", deviceID)
|
||||
}
|
||||
|
||||
body := fmt.Sprintf(CatalogXML, s.getSN(), deviceID)
|
||||
|
||||
req, err := stack.NewCatelogRequest([]byte(body), stack.OutboundConfig{
|
||||
Via: d.SourceAddr,
|
||||
To: d.DeviceID,
|
||||
From: s.conf.Serial,
|
||||
Transport: d.NetworkType,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "build catalog request error")
|
||||
}
|
||||
|
||||
tx, err := s.sipCli.TransactionRequest(s.ctx, req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "transaction request error")
|
||||
}
|
||||
|
||||
res, err := s.waitAnswer(tx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "wait answer error")
|
||||
}
|
||||
logger.Tf(s.ctx, "catalog response: %s", res.String())
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (s *UAS) waitAnswer(tx sip.ClientTransaction) (*sip.Response, error) {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return nil, errors.Errorf("context done")
|
||||
case res := <-tx.Responses():
|
||||
if res.StatusCode == 100 || res.StatusCode == 101 || res.StatusCode == 180 || res.StatusCode == 183 {
|
||||
return s.waitAnswer(tx)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
52
pkg/service/service.go
Normal file
52
pkg/service/service.go
Normal file
@ -0,0 +1,52 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
conf *config.MainConfig
|
||||
Uac *UAC
|
||||
Uas *UAS
|
||||
}
|
||||
|
||||
func NewService(ctx context.Context, r0 interface{}) (*Service, error) {
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
conf: r0.(*config.MainConfig),
|
||||
}
|
||||
s.Uac = NewUac()
|
||||
s.Uas = NewUas()
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Service) Start() error {
|
||||
zerolog.SetGlobalLevel(zerolog.Disabled)
|
||||
|
||||
ua, err := sipgo.NewUA(
|
||||
sipgo.WithUserAgent(UserAgent),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Uas.Start(ua, s.conf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if err := s.Uac.Start(ua, s.conf); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Stop() {
|
||||
s.Uac.Stop()
|
||||
s.Uas.Stop()
|
||||
}
|
||||
68
pkg/service/stack/request.go
Normal file
68
pkg/service/stack/request.go
Normal file
@ -0,0 +1,68 @@
|
||||
package stack
|
||||
|
||||
import (
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
)
|
||||
|
||||
type OutboundConfig struct {
|
||||
Transport string
|
||||
Via string
|
||||
From string
|
||||
To string
|
||||
}
|
||||
|
||||
func newRequest(method sip.RequestMethod, body []byte, conf OutboundConfig) (*sip.Request, error) {
|
||||
if len(conf.From) != 20 || len(conf.To) != 20 {
|
||||
return nil, errors.Errorf("From or To length is not 20")
|
||||
}
|
||||
|
||||
dest := conf.Via
|
||||
to := sip.Uri{User: conf.To, Host: conf.To[:10]}
|
||||
from := &sip.Uri{User: conf.From, Host: conf.From[:10]}
|
||||
|
||||
fromHeader := &sip.FromHeader{Address: *from, Params: sip.NewParams()}
|
||||
fromHeader.Params.Add("tag", sip.GenerateTagN(16))
|
||||
|
||||
req := sip.NewRequest(method, to)
|
||||
req.AppendHeader(fromHeader)
|
||||
req.AppendHeader(&sip.ToHeader{Address: to})
|
||||
req.AppendHeader(&sip.ContactHeader{Address: *from})
|
||||
req.AppendHeader(sip.NewHeader("Max-Forwards", "70"))
|
||||
req.SetBody(body)
|
||||
req.SetDestination(dest)
|
||||
req.SetTransport(conf.Transport)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func NewRegisterRequest(conf OutboundConfig) (*sip.Request, error) {
|
||||
req, err := newRequest(sip.REGISTER, nil, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.AppendHeader(sip.NewHeader("Expires", "3600"))
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func NewInviteRequest(body []byte, subject string, conf OutboundConfig) (*sip.Request, error) {
|
||||
req, err := newRequest(sip.INVITE, body, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
|
||||
req.AppendHeader(sip.NewHeader("Subject", subject))
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func NewCatelogRequest(body []byte, conf OutboundConfig) (*sip.Request, error) {
|
||||
req, err := newRequest(sip.MESSAGE, body, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml"))
|
||||
|
||||
return req, nil
|
||||
}
|
||||
25
pkg/service/stack/response.go
Normal file
25
pkg/service/stack/response.go
Normal file
@ -0,0 +1,25 @@
|
||||
package stack
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/emiago/sipgo/sip"
|
||||
)
|
||||
|
||||
const TIME_LAYOUT = "2024-01-01T00:00:00"
|
||||
const EXPIRES_TIME = 3600
|
||||
|
||||
func NewRegisterResponse(req *sip.Request, code sip.StatusCode, reason string) *sip.Response {
|
||||
resp := sip.NewResponseFromRequest(req, code, reason, nil)
|
||||
|
||||
newTo := &sip.ToHeader{Address: resp.To().Address, Params: sip.NewParams()}
|
||||
newTo.Params.Add("tag", sip.GenerateTagN(10))
|
||||
|
||||
resp.ReplaceHeader(newTo)
|
||||
resp.RemoveHeader("Allow")
|
||||
expires := sip.ExpiresHeader(EXPIRES_TIME)
|
||||
resp.AppendHeader(&expires)
|
||||
resp.AppendHeader(sip.NewHeader("Date", time.Now().Format(TIME_LAYOUT)))
|
||||
|
||||
return resp
|
||||
}
|
||||
122
pkg/service/uac.go
Normal file
122
pkg/service/uac.go
Normal file
@ -0,0 +1,122 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
)
|
||||
|
||||
const (
|
||||
UserAgent = "SRS-SIP/1.0"
|
||||
)
|
||||
|
||||
type UAC struct {
|
||||
*Cascade
|
||||
|
||||
SN uint32
|
||||
LocalIP string
|
||||
}
|
||||
|
||||
func NewUac() *UAC {
|
||||
ip, err := config.GetLocalIP()
|
||||
if err != nil {
|
||||
logger.E("get local ip failed")
|
||||
return nil
|
||||
}
|
||||
|
||||
c := &UAC{
|
||||
Cascade: &Cascade{},
|
||||
LocalIP: ip,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *UAC) Start(agent *sipgo.UserAgent, r0 interface{}) error {
|
||||
var err error
|
||||
|
||||
c.ctx = context.Background()
|
||||
c.conf = r0.(*config.MainConfig)
|
||||
|
||||
if agent == nil {
|
||||
ua, err := sipgo.NewUA(sipgo.WithUserAgent(UserAgent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agent = ua
|
||||
}
|
||||
|
||||
c.sipCli, err = sipgo.NewClient(agent, sipgo.WithClientHostname(c.LocalIP))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.sipSvr, err = sipgo.NewServer(agent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.sipSvr.OnInvite(c.onInvite)
|
||||
c.sipSvr.OnBye(c.onBye)
|
||||
c.sipSvr.OnMessage(c.onMessage)
|
||||
|
||||
go c.doRegister()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *UAC) Stop() {
|
||||
// TODO: 断开所有当前连接
|
||||
c.sipCli.Close()
|
||||
c.sipSvr.Close()
|
||||
}
|
||||
|
||||
func (c *UAC) doRegister() error {
|
||||
r, _ := stack.NewRegisterRequest(stack.OutboundConfig{
|
||||
From: "34020000001110000001",
|
||||
To: "34020000002000000001",
|
||||
Transport: "UDP",
|
||||
Via: fmt.Sprintf("%s:%d", c.LocalIP, c.conf.SipPort),
|
||||
})
|
||||
tx, err := c.sipCli.TransactionRequest(c.ctx, r)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "transaction request error")
|
||||
}
|
||||
|
||||
rs, _ := c.getResponse(tx)
|
||||
logger.Tf(c.ctx, "register response: %s", rs.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *UAC) OnRequest(req *sip.Request, tx sip.ServerTransaction) {
|
||||
switch req.Method {
|
||||
case "INVITE":
|
||||
c.onInvite(req, tx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *UAC) onInvite(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(c.ctx, "onInvite")
|
||||
}
|
||||
|
||||
func (c *UAC) onBye(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(c.ctx, "onBye")
|
||||
}
|
||||
|
||||
func (c *UAC) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.Tf(c.ctx, "onMessage %s", req.String())
|
||||
}
|
||||
|
||||
func (c *UAC) getResponse(tx sip.ClientTransaction) (*sip.Response, error) {
|
||||
select {
|
||||
case <-tx.Done():
|
||||
return nil, fmt.Errorf("transaction died")
|
||||
case res := <-tx.Responses():
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
137
pkg/service/uas.go
Normal file
137
pkg/service/uas.go
Normal file
@ -0,0 +1,137 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/ossrs/srs-sip/pkg/signaling"
|
||||
)
|
||||
|
||||
type UAS struct {
|
||||
*Cascade
|
||||
|
||||
SN uint32
|
||||
channelsStatue sync.Map
|
||||
signal signaling.ISignaling
|
||||
|
||||
sipConnUDP *net.UDPConn
|
||||
sipConnTCP *net.TCPListener
|
||||
}
|
||||
|
||||
var DM = GetDeviceManager()
|
||||
|
||||
func NewUas() *UAS {
|
||||
return &UAS{
|
||||
Cascade: &Cascade{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UAS) Start(agent *sipgo.UserAgent, r0 interface{}) error {
|
||||
ctx := context.Background()
|
||||
conf := r0.(*config.MainConfig)
|
||||
sig := &signaling.Srs{
|
||||
Ctx: ctx,
|
||||
Addr: "http://" + conf.MediaAddr,
|
||||
}
|
||||
s.signal = sig
|
||||
s.startSipServer(agent, ctx, r0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *UAS) Stop() {
|
||||
s.sipCli.Close()
|
||||
s.sipSvr.Close()
|
||||
}
|
||||
|
||||
func (s *UAS) startSipServer(agent *sipgo.UserAgent, ctx context.Context, r0 interface{}) error {
|
||||
conf := r0.(*config.MainConfig)
|
||||
s.ctx = ctx
|
||||
s.conf = conf
|
||||
|
||||
if agent == nil {
|
||||
ua, err := sipgo.NewUA(sipgo.WithUserAgent(UserAgent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agent = ua
|
||||
}
|
||||
|
||||
cli, err := sipgo.NewClient(agent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.sipCli = cli
|
||||
|
||||
svr, err := sipgo.NewServer(agent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.sipSvr = svr
|
||||
|
||||
s.sipSvr.OnRegister(s.onRegister)
|
||||
s.sipSvr.OnMessage(s.onMessage)
|
||||
s.sipSvr.OnNotify(s.onNotify)
|
||||
|
||||
if err := s.startUDP(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.startTCP(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *UAS) startUDP() error {
|
||||
lis, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||
IP: net.IPv4(0, 0, 0, 0),
|
||||
Port: s.conf.SipPort,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot listen on the UDP signaling port %d: %w", s.conf.SipPort, err)
|
||||
}
|
||||
s.sipConnUDP = lis
|
||||
logger.Tf(s.ctx, "sip signaling listening on UDP %s:%d", lis.LocalAddr().String(), s.conf.SipPort)
|
||||
|
||||
go func() {
|
||||
if err := s.sipSvr.ServeUDP(lis); err != nil {
|
||||
panic(fmt.Errorf("SIP listen UDP error: %w", err))
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *UAS) startTCP() error {
|
||||
lis, err := net.ListenTCP("tcp", &net.TCPAddr{
|
||||
IP: net.IPv4(0, 0, 0, 0),
|
||||
Port: s.conf.SipPort,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot listen on the TCP signaling port %d: %w", s.conf.SipPort, err)
|
||||
}
|
||||
s.sipConnTCP = lis
|
||||
logger.Tf(s.ctx, "sip signaling listening on TCP %s:%d", lis.Addr().String(), s.conf.SipPort)
|
||||
|
||||
go func() {
|
||||
if err := s.sipSvr.ServeTCP(lis); err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
panic(fmt.Errorf("SIP listen TCP error: %w", err))
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func sipErrorResponse(tx sip.ServerTransaction, req *sip.Request) {
|
||||
_ = tx.Respond(sip.NewResponseFromRequest(req, 400, "", nil))
|
||||
}
|
||||
|
||||
func (s *UAS) getSN() uint32 {
|
||||
s.SN++
|
||||
return s.SN
|
||||
}
|
||||
75
pkg/signaling/signaling.go
Normal file
75
pkg/signaling/signaling.go
Normal file
@ -0,0 +1,75 @@
|
||||
package signaling
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
)
|
||||
|
||||
type ISignaling interface {
|
||||
Publish(id, ssrc string) (int, error)
|
||||
Unpublish(id string) error
|
||||
GetStreamStatus(id string) (bool, error)
|
||||
}
|
||||
|
||||
// The r is HTTP API to request, like "http://localhost:1985/gb/v1/publish".
|
||||
// The req is the HTTP request body, will be marshal to JSON object. nil is no body
|
||||
// The res is the HTTP response body, already unmarshal to JSON object.
|
||||
func apiRequest(ctx context.Context, r string, req interface{}, res interface{}) error {
|
||||
var buf bytes.Buffer
|
||||
if req != nil {
|
||||
if err := json.NewEncoder(&buf).Encode(req); err != nil {
|
||||
return errors.Wrapf(err, "Marshal body %v", req)
|
||||
}
|
||||
}
|
||||
logger.Tf(ctx, "Request url api=%v with %v bytes", r, buf.Len())
|
||||
|
||||
method := "POST"
|
||||
if req == nil {
|
||||
method = "GET"
|
||||
}
|
||||
reqObj, err := http.NewRequest(method, r, &buf)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "HTTP request %v", buf.String())
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resObj, err := client.Do(reqObj.WithContext(ctx))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Do HTTP request %v", buf.String())
|
||||
}
|
||||
defer resObj.Body.Close()
|
||||
|
||||
if resObj.StatusCode != http.StatusOK {
|
||||
return errors.Errorf("Server returned status code=%v", resObj.StatusCode)
|
||||
}
|
||||
|
||||
b2, err := io.ReadAll(resObj.Body)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Read response for %v", buf.String())
|
||||
}
|
||||
logger.Tf(ctx, "Response from %v is %v bytes", r, len(b2))
|
||||
|
||||
errorCode := struct {
|
||||
Code int `json:"code"`
|
||||
}{}
|
||||
if err := json.Unmarshal(b2, &errorCode); err != nil {
|
||||
return errors.Wrapf(err, "Unmarshal %v", string(b2))
|
||||
}
|
||||
if errorCode.Code != 0 {
|
||||
return errors.Errorf("Server fail code=%v %v", errorCode.Code, string(b2))
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(b2, res); err != nil {
|
||||
return errors.Wrapf(err, "Unmarshal %v", string(b2))
|
||||
}
|
||||
logger.Tf(ctx, "Parse response to code=%v ok, %v", errorCode.Code, res)
|
||||
|
||||
return nil
|
||||
}
|
||||
95
pkg/signaling/srs.go
Normal file
95
pkg/signaling/srs.go
Normal file
@ -0,0 +1,95 @@
|
||||
package signaling
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
)
|
||||
|
||||
type Srs struct {
|
||||
Ctx context.Context
|
||||
Addr string // The address of SRS, eg: http://localhost:1985
|
||||
}
|
||||
|
||||
func (s *Srs) Publish(id, ssrc string) (int, error) {
|
||||
req := struct {
|
||||
Id string `json:"id"`
|
||||
SSRC string `json:"ssrc"`
|
||||
}{
|
||||
id, ssrc,
|
||||
}
|
||||
|
||||
res := struct {
|
||||
Code int `json:"code"`
|
||||
Port int `json:"port"`
|
||||
}{}
|
||||
|
||||
if err := apiRequest(s.Ctx, s.Addr+"/gb/v1/publish/", req, &res); err != nil {
|
||||
return 0, errors.Wrapf(err, "gb/v1/publish")
|
||||
}
|
||||
|
||||
return res.Port, nil
|
||||
}
|
||||
|
||||
func (s *Srs) Unpublish(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// {
|
||||
// "code": 0,
|
||||
// "server": "vid-y19n6nm",
|
||||
// "service": "382k456r",
|
||||
// "pid": "9495",
|
||||
// "streams": [{
|
||||
// "id": "vid-9y0ozy0",
|
||||
// "name": "0551954854",
|
||||
// "vhost": "vid-v2ws53u",
|
||||
// "app": "live",
|
||||
// "tcUrl": "webrtc://127.0.0.1:1985/live",
|
||||
// "url": "/live/0551954854",
|
||||
// "live_ms": 1720428680003,
|
||||
// "clients": 1,
|
||||
// "frames": 8431,
|
||||
// "send_bytes": 66463941,
|
||||
// "recv_bytes": 89323998,
|
||||
// "kbps": {
|
||||
// "recv_30s": 0,
|
||||
// "send_30s": 0
|
||||
// },
|
||||
// "publish": {
|
||||
// "active": false,
|
||||
// "cid": "b3op069g"
|
||||
// },
|
||||
// "video": null,
|
||||
// "audio": null
|
||||
// }]
|
||||
// }
|
||||
func (s *Srs) GetStreamStatus(id string) (bool, error) {
|
||||
type Stream struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Publish struct {
|
||||
Active bool `json:"active"`
|
||||
Cid string `json:"cid"`
|
||||
} `json:"publish"`
|
||||
}
|
||||
res := struct {
|
||||
Code int `json:"code"`
|
||||
Streams []Stream `json:"streams"`
|
||||
}{}
|
||||
|
||||
if err := apiRequest(s.Ctx, s.Addr+"/api/v1/streams?count=99", nil, &res); err != nil {
|
||||
return false, errors.Wrapf(err, "api/v1/stream")
|
||||
}
|
||||
|
||||
if len(res.Streams) == 0 {
|
||||
return false, nil
|
||||
} else {
|
||||
for _, v := range res.Streams {
|
||||
if v.Name == id {
|
||||
return v.Publish.Active, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
66
pkg/utils/utils.go
Normal file
66
pkg/utils/utils.go
Normal file
@ -0,0 +1,66 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"math/big"
|
||||
"os"
|
||||
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
)
|
||||
|
||||
func Parse(ctx context.Context) interface{} {
|
||||
fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
|
||||
var conf config.MainConfig
|
||||
fl.StringVar(&conf.Serial, "serial", "34020000002000000001", "The serial number")
|
||||
fl.StringVar(&conf.Realm, "realm", "3402000000", "The realm")
|
||||
fl.StringVar(&conf.SipHost, "sip-host", "0.0.0.0", "The SIP host")
|
||||
fl.IntVar(&conf.SipPort, "sip-port", 5060, "The SIP port")
|
||||
fl.StringVar(&conf.MediaAddr, "media-addr", "127.0.0.1:1985", "The api address of media server. like: 127.0.0.1:1985")
|
||||
fl.IntVar(&conf.HttpServerPort, "http-server-port", 8888, "The port of http server")
|
||||
fl.IntVar(&conf.APIPort, "api-port", 2020, "The port of http api server")
|
||||
|
||||
fl.Usage = func() {
|
||||
fl.PrintDefaults()
|
||||
}
|
||||
|
||||
if err := fl.Parse(os.Args[1:]); err == flag.ErrHelp {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
showHelp := conf.MediaAddr == ""
|
||||
if showHelp {
|
||||
fl.Usage()
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
return &conf
|
||||
}
|
||||
|
||||
func GenRandomNumber(n int) string {
|
||||
var result string
|
||||
for i := 0; i < n; i++ {
|
||||
randomDigit, _ := rand.Int(rand.Reader, big.NewInt(10))
|
||||
result += randomDigit.String()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func CreateSSRC(isLive bool) string {
|
||||
ssrc := make([]byte, 10)
|
||||
if isLive {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:], GenRandomNumber(9))
|
||||
return string(ssrc)
|
||||
}
|
||||
|
||||
// @see GB/T28181—2016 附录D 统一编码规则
|
||||
func IsVideoChannel(channelID string) bool {
|
||||
deviceType := channelID[10:13]
|
||||
return deviceType == "131" || deviceType == "132"
|
||||
}
|
||||
Reference in New Issue
Block a user