update log
This commit is contained in:
committed by
Haibo Chen(陈海博)
parent
b55a9d9a82
commit
5b4dbca9da
@ -1,12 +1,11 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/models"
|
||||
)
|
||||
|
||||
@ -98,7 +97,10 @@ func (dm *deviceManager) checkHeartbeats() {
|
||||
device.SourceAddr = ""
|
||||
device.Online = false
|
||||
dm.devices.Store(key, device)
|
||||
logger.Wf(context.Background(), "Device %s is offline due to heartbeat timeout, HeartBeatInterval: %v", device.DeviceID, device.HeartBeatInterval)
|
||||
slog.Warn("Device is offline due to heartbeat timeout",
|
||||
"device_id", device.DeviceID,
|
||||
"heartbeat_interval", device.HeartBeatInterval,
|
||||
"heartbeat_count", device.HeartBeatCount)
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
@ -3,12 +3,12 @@ package service
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/models"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
"golang.org/x/net/html/charset"
|
||||
@ -31,7 +31,7 @@ func (s *UAS) isSameIP(addr1, addr2 string) bool {
|
||||
func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
id := req.From().Address.User
|
||||
if len(id) != GB28181_ID_LENGTH {
|
||||
logger.E(s.ctx, "invalid device ID")
|
||||
slog.Error("invalid device ID")
|
||||
return
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
// Validate Authorization
|
||||
authInfo := ParseAuthorization(authHeader[0].Value())
|
||||
if !ValidateAuth(authInfo, s.conf.GB28181.Auth.Password) {
|
||||
logger.Ef(s.ctx, "%s auth failed, source: %s", id, req.Source())
|
||||
slog.Error("auth failed", "device_id", id, "source", req.Source())
|
||||
s.respondRegister(req, http.StatusForbidden, "Auth Failed", tx)
|
||||
return
|
||||
}
|
||||
@ -61,20 +61,20 @@ func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
exp := exps[0]
|
||||
expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
|
||||
if err != nil {
|
||||
logger.Ef(s.ctx, "parse expires header error: %s", err.Error())
|
||||
slog.Error("parse expires header error", "error", err.Error())
|
||||
return
|
||||
}
|
||||
if expSec == 0 {
|
||||
isUnregister = true
|
||||
}
|
||||
} else {
|
||||
logger.E(s.ctx, "empty expires header")
|
||||
slog.Error("empty expires header")
|
||||
return
|
||||
}
|
||||
|
||||
if isUnregister {
|
||||
DM.RemoveDevice(id)
|
||||
logger.Wf(s.ctx, "Device %s unregistered", id)
|
||||
slog.Warn("Device unregistered", "device_id", id)
|
||||
return
|
||||
} else {
|
||||
if d, ok := DM.GetDevice(id); !ok {
|
||||
@ -84,13 +84,13 @@ func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
NetworkType: req.Transport(),
|
||||
})
|
||||
s.respondRegister(req, http.StatusOK, "OK", tx)
|
||||
logger.Tf(s.ctx, "%s Register success, source:%s, req: %s", id, req.Source(), req.String())
|
||||
slog.Info("Register success", "device_id", id, "source", req.Source(), "request", req.String())
|
||||
|
||||
go s.ConfigDownload(id)
|
||||
go s.Catalog(id)
|
||||
} else {
|
||||
if d.SourceAddr != "" && !s.isSameIP(d.SourceAddr, req.Source()) {
|
||||
logger.Ef(s.ctx, "Device %s[%s] already registered, please change another ID.", id, d.SourceAddr, req.Source())
|
||||
slog.Error("Device already registered", "device_id", id, "old_source", d.SourceAddr, "new_source", req.Source())
|
||||
// TODO: 如果ID重复,应采用虚拟ID
|
||||
s.respondRegister(req, http.StatusBadRequest, "Conflict Device ID", tx)
|
||||
} else {
|
||||
@ -99,7 +99,7 @@ func (s *UAS) onRegister(req *sip.Request, tx sip.ServerTransaction) {
|
||||
DM.UpdateDevice(id, d)
|
||||
s.respondRegister(req, http.StatusOK, "OK", tx)
|
||||
|
||||
logger.Tf(s.ctx, "%s Re-register success, source:%s, req: %s", id, req.Source(), req.String())
|
||||
slog.Info("Re-register success", "device_id", id, "source", req.Source(), "request", req.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -114,21 +114,21 @@ func (s *UAS) respondRegister(req *sip.Request, code sip.StatusCode, reason stri
|
||||
func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
id := req.From().Address.User
|
||||
if len(id) != 20 {
|
||||
logger.Ef(s.ctx, "invalid device ID %s", req.String())
|
||||
slog.Error("invalid device ID", "request", req.String())
|
||||
}
|
||||
|
||||
//logger.Tf(s.ctx, "Received MESSAGE: %s", req.String())
|
||||
slog.Debug("Received MESSAGE", "request", req.String())
|
||||
|
||||
temp := &models.XmlMessageInfo{}
|
||||
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
if err := decoder.Decode(temp); err != nil {
|
||||
logger.Ef(s.ctx, "decode message error: %s\n message:%s", err.Error(), req.Body())
|
||||
slog.Error("decode message error", "error", err.Error(), "message", req.Body())
|
||||
}
|
||||
var body string
|
||||
switch temp.CmdType {
|
||||
case "Keepalive":
|
||||
logger.T(s.ctx, "Keepalive")
|
||||
slog.Debug("Keepalive")
|
||||
if d, ok := DM.GetDevice(temp.DeviceID); ok && d.Online {
|
||||
// 更新设备心跳时间
|
||||
DM.UpdateDeviceHeartbeat(temp.DeviceID)
|
||||
@ -138,16 +138,16 @@ func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
}
|
||||
case "SensorCatalog": // 兼容宇视,非国标
|
||||
case "Catalog":
|
||||
logger.T(s.ctx, "Catalog")
|
||||
slog.Debug("Catalog")
|
||||
DM.UpdateChannels(temp.DeviceID, temp.DeviceList...)
|
||||
//go s.AutoInvite(temp.DeviceID, temp.DeviceList...)
|
||||
case "ConfigDownload":
|
||||
logger.T(s.ctx, "ConfigDownload")
|
||||
slog.Debug("ConfigDownload")
|
||||
DM.UpdateDeviceConfig(temp.DeviceID, &temp.BasicParam)
|
||||
case "Alarm":
|
||||
logger.T(s.ctx, "Alarm")
|
||||
slog.Debug("Alarm")
|
||||
case "RecordInfo":
|
||||
logger.T(s.ctx, "RecordInfo")
|
||||
slog.Debug("RecordInfo")
|
||||
// 从 recordQueryResults 中获取对应通道的结果通道
|
||||
if ch, ok := s.recordQueryResults.Load(temp.DeviceID); ok {
|
||||
// 发送查询结果
|
||||
@ -155,7 +155,7 @@ func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
resultChan <- temp
|
||||
}
|
||||
default:
|
||||
logger.Wf(s.ctx, "Not supported CmdType: %s", temp.CmdType)
|
||||
slog.Warn("Not supported CmdType", "cmd_type", temp.CmdType)
|
||||
response := sip.NewResponseFromRequest(req, http.StatusBadRequest, "", nil)
|
||||
tx.Respond(response)
|
||||
return
|
||||
@ -164,6 +164,6 @@ func (s *UAS) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
}
|
||||
|
||||
func (s *UAS) onNotify(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(s.ctx, "Received NOTIFY request")
|
||||
slog.Debug("Received NOTIFY request")
|
||||
tx.Respond(sip.NewResponseFromRequest(req, http.StatusOK, "OK", nil))
|
||||
}
|
||||
|
||||
@ -2,12 +2,12 @@ package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/media"
|
||||
"github.com/ossrs/srs-sip/pkg/models"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
@ -110,7 +110,7 @@ Scale: %.1f
|
||||
}
|
||||
|
||||
func (s *UAS) AddSession(key string, status Session) {
|
||||
logger.Tf(s.ctx, "AddSession: %s, %+v", key, status)
|
||||
slog.Info("AddSession", "key", key, "status", status)
|
||||
s.Streams.Store(key, status)
|
||||
}
|
||||
|
||||
@ -314,7 +314,7 @@ func (s *UAS) Bye(req models.ByeRequest) error {
|
||||
|
||||
defer func() {
|
||||
if err := s.media.Unpublish(session.Ssrc); err != nil {
|
||||
logger.Ef(s.ctx, "unpublish stream error: %s", err)
|
||||
slog.Error("unpublish stream error", "error", err, "stream_id", session.Ssrc)
|
||||
}
|
||||
s.RemoveSession(key)
|
||||
}()
|
||||
@ -523,7 +523,11 @@ func (s *UAS) QueryRecord(deviceID, channelID string, startTime, endTime int64)
|
||||
return nil, errors.Errorf("context done")
|
||||
case records := <-resultChan:
|
||||
allRecords = append(allRecords, records.RecordList...)
|
||||
logger.Tf(s.ctx, "[channel %s] 应收总数 %d, 实收总数 %d, 本次收到 %d", channelID, records.SumNum, len(allRecords), len(records.RecordList))
|
||||
slog.Info("Record query result",
|
||||
"channel", channelID,
|
||||
"expected_count", records.SumNum,
|
||||
"actual_count", len(allRecords),
|
||||
"batch_count", len(records.RecordList))
|
||||
|
||||
if len(allRecords) == records.SumNum {
|
||||
return allRecords, nil
|
||||
|
||||
@ -3,11 +3,11 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/ossrs/srs-sip/pkg/service/stack"
|
||||
)
|
||||
@ -26,7 +26,7 @@ type UAC struct {
|
||||
func NewUac() *UAC {
|
||||
ip, err := config.GetLocalIP()
|
||||
if err != nil {
|
||||
logger.E("get local ip failed")
|
||||
slog.Error("get local ip failed", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ func (c *UAC) doRegister() error {
|
||||
}
|
||||
|
||||
rs, _ := c.getResponse(tx)
|
||||
logger.Tf(c.ctx, "register response: %s", rs.String())
|
||||
slog.Info("register response", "response", rs.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -101,15 +101,15 @@ func (c *UAC) OnRequest(req *sip.Request, tx sip.ServerTransaction) {
|
||||
}
|
||||
|
||||
func (c *UAC) onInvite(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(c.ctx, "onInvite")
|
||||
slog.Debug("onInvite")
|
||||
}
|
||||
|
||||
func (c *UAC) onBye(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.T(c.ctx, "onBye")
|
||||
slog.Debug("onBye")
|
||||
}
|
||||
|
||||
func (c *UAC) onMessage(req *sip.Request, tx sip.ServerTransaction) {
|
||||
logger.Tf(c.ctx, "onMessage %s", req.String())
|
||||
slog.Debug("onMessage", "request", req.String())
|
||||
}
|
||||
|
||||
func (c *UAC) getResponse(tx sip.ClientTransaction) (*sip.Response, error) {
|
||||
|
||||
@ -4,12 +4,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/ossrs/srs-sip/pkg/config"
|
||||
"github.com/ossrs/srs-sip/pkg/db"
|
||||
"github.com/ossrs/srs-sip/pkg/media"
|
||||
@ -100,7 +100,7 @@ func (s *UAS) startUDP() error {
|
||||
return fmt.Errorf("cannot listen on the UDP signaling port %d: %w", s.conf.GB28181.Port, err)
|
||||
}
|
||||
s.sipConnUDP = lis
|
||||
logger.Tf(s.ctx, "sip signaling listening on UDP %s:%d", lis.LocalAddr().String(), s.conf.GB28181.Port)
|
||||
slog.Info("sip signaling listening on UDP", "address", lis.LocalAddr().String(), "port", s.conf.GB28181.Port)
|
||||
|
||||
go func() {
|
||||
if err := s.sipSvr.ServeUDP(lis); err != nil {
|
||||
@ -119,7 +119,7 @@ func (s *UAS) startTCP() error {
|
||||
return fmt.Errorf("cannot listen on the TCP signaling port %d: %w", s.conf.GB28181.Port, err)
|
||||
}
|
||||
s.sipConnTCP = lis
|
||||
logger.Tf(s.ctx, "sip signaling listening on TCP %s:%d", lis.Addr().String(), s.conf.GB28181.Port)
|
||||
slog.Info("sip signaling listening on TCP", "address", lis.Addr().String(), "port", s.conf.GB28181.Port)
|
||||
|
||||
go func() {
|
||||
if err := s.sipSvr.ServeTCP(lis); err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
|
||||
Reference in New Issue
Block a user