diff --git a/rtsp/player.go b/rtsp/player.go index 0216936b..14ce0f4b 100644 --- a/rtsp/player.go +++ b/rtsp/player.go @@ -3,6 +3,7 @@ package rtsp import ( "sync" "time" + "github.com/penggy/EasyGoLib/utils" ) type Player struct { @@ -10,14 +11,22 @@ type Player struct { Pusher *Pusher cond *sync.Cond queue []*RTPPack + queueLimit int + dropPacketWhenPaused bool + paused bool } func NewPlayer(session *Session, pusher *Pusher) (player *Player) { + queueLimit := utils.Conf().Section("rtsp").Key("player_queue_limit").MustInt(0) + dropPacketWhenPaused := utils.Conf().Section("rtsp").Key("drop_packet_when_paused").MustInt(0) player = &Player{ Session: session, Pusher: pusher, cond: sync.NewCond(&sync.Mutex{}), queue: make([]*RTPPack, 0), + queueLimit: queueLimit, + dropPacketWhenPaused: dropPacketWhenPaused != 0, + paused: false, } session.StopHandles = append(session.StopHandles, func() { pusher.RemovePlayer(player) @@ -32,8 +41,18 @@ func (player *Player) QueueRTP(pack *RTPPack) *Player { logger.Printf("player queue enter nil pack, drop it") return player } + if player.paused && player.dropPacketWhenPaused { + return player + } player.cond.L.Lock() player.queue = append(player.queue, pack) + if oldLen := len(player.queue); player.queueLimit > 0 && oldLen > player.queueLimit { + player.queue = player.queue[1:] + if player.debugLogEnable { + len := len(player.queue) + logger.Printf("Player %s, QueueRTP, exceeds limit(%d), drop %d old packets, current queue.len=%d\n", player.String(), player.queueLimit, oldLen - len, len) + } + } player.cond.Signal() player.cond.L.Unlock() return player @@ -52,7 +71,11 @@ func (player *Player) Start() { pack = player.queue[0] player.queue = player.queue[1:] } + queueLen := len(player.queue) player.cond.L.Unlock() + if player.paused { + continue + } if pack == nil { if !player.Stoped { logger.Printf("player not stoped, but queue take out nil pack") @@ -63,9 +86,23 @@ func (player *Player) Start() { logger.Println(err) } elapsed := time.Now().Sub(timer) - if elapsed >= 30*time.Second { - logger.Printf("Send a package.type:%d\n", pack.Type) + if player.debugLogEnable && elapsed >= 30*time.Second { + logger.Printf("Player %s, Send a package.type:%d, queue.len=%d\n", player.String(), pack.Type, queueLen) timer = time.Now() } } } + +func (player *Player) Pause(paused bool) { + if paused { + player.logger.Printf("Player %s, Pause\n", player.String()) + } else { + player.logger.Printf("Player %s, Play\n", player.String()) + } + player.cond.L.Lock() + if paused && player.dropPacketWhenPaused && len(player.queue) > 0 { + player.queue = make([]*RTPPack, 0) + } + player.paused = paused + player.cond.L.Unlock() +} \ No newline at end of file diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 2aa98e90..962e8512 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -311,6 +311,13 @@ func (pusher *Pusher) GetPlayers() (players map[string]*Player) { return } +func (pusher *Pusher) HasPlayer(player *Player) bool { + pusher.playersLock.Lock() + _, ok := pusher.players[player.ID] + pusher.playersLock.Unlock() + return ok +} + func (pusher *Pusher) AddPlayer(player *Player) *Pusher { logger := pusher.Logger() if pusher.gopCacheEnable { diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index ab080101..37b22182 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -48,6 +48,9 @@ type RTSPClient struct { OptionIntervalMillis int64 SDPRaw string + debugLogEnable bool + lastRtpSN uint16 + Agent string authLine string @@ -71,6 +74,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent if err != nil { return } + debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0) client = &RTSPClient{ Server: server, Stoped: false, @@ -85,6 +89,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent OptionIntervalMillis: sendOptionMillis, StartAt: time.Now(), Agent: agent, + debugLogEnable: debugLogEnable != 0, } client.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", client.ID), log.LstdFlags|log.Lshortfile) if !utils.Debug { @@ -417,11 +422,24 @@ func (client *RTSPClient) startStream() { client.logger.Printf("session tcp got nil rtp pack") continue } - elapsed := time.Now().Sub(loggerTime) - if elapsed >= 10*time.Second { - client.logger.Printf("%v read rtp frame.", client) - loggerTime = time.Now() + + if client.debugLogEnable { + rtp := ParseRTP(pack.Buffer.Bytes()) + if rtp != nil { + rtpSN := uint16(rtp.SequenceNumber) + if client.lastRtpSN != 0 && client.lastRtpSN + 1 != rtpSN { + client.logger.Printf("%s, %d packets lost, current SN=%d, last SN=%d\n", client.String(), rtpSN - client.lastRtpSN, rtpSN, client.lastRtpSN) + } + client.lastRtpSN = rtpSN + } + + elapsed := time.Now().Sub(loggerTime) + if elapsed >= 30*time.Second { + client.logger.Printf("%v read rtp frame.", client) + loggerTime = time.Now() + } } + client.InBytes += int(length + 4) for _, h := range client.RTPHandles { h(pack) diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 5183c0e8..ed9b077c 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -105,6 +105,7 @@ type Session struct { authorizationEnable bool nonce string closeOld bool + debugLogEnable bool AControl string VControl string @@ -133,7 +134,7 @@ type Session struct { } func (session *Session) String() string { - return fmt.Sprintf("session[%v][%v][%s][%s]", session.Type, session.TransType, session.Path, session.ID) + return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.Path, session.ID, session.Conn.RemoteAddr().String()) } func NewSession(server *Server, conn net.Conn) *Session { @@ -142,6 +143,7 @@ func NewSession(server *Server, conn net.Conn) *Session { timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0) close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0) + debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0) session := &Session{ ID: shortid.MustGenerate(), Server: server, @@ -150,6 +152,7 @@ func NewSession(server *Server, conn net.Conn) *Session { StartAt: time.Now(), Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0), authorizationEnable: authorizationEnable != 0, + debugLogEnable: debugLogEnable != 0, RTPHandles: make([]func(*RTPPack), 0), StopHandles: make([]func(), 0), vRTPChannel: -1, @@ -383,7 +386,11 @@ func (session *Session) handleRequest(req *Request) { case "PLAY", "RECORD": switch session.Type { case SESSEION_TYPE_PLAYER: - session.Pusher.AddPlayer(session.Player) + if session.Pusher.HasPlayer(session.Player) { + session.Player.Pause(false) + } else { + session.Pusher.AddPlayer(session.Player) + } // case SESSION_TYPE_PUSHER: // session.Server.AddPusher(session.Pusher) } @@ -683,6 +690,13 @@ func (session *Session) handleRequest(req *Request) { res.Status = "Error Status" return } + case "PAUSE": + if session.Player == nil { + res.StatusCode = 500 + res.Status = "Error Status" + return + } + session.Player.Pause(true) } }