Browse Source

support PAUSE method, dropping packets when paused or the length of queue exceeds limit

pull/154/head
xiaozhanzhong 6 years ago
parent
commit
dc3d123712
  1. 41
      rtsp/player.go
  2. 7
      rtsp/pusher.go
  3. 26
      rtsp/rtsp-client.go
  4. 18
      rtsp/rtsp-session.go

41
rtsp/player.go

@ -3,6 +3,7 @@ package rtsp
import (
"sync"
"time"
"github.com/penggy/EasyGoLib/utils"
)
type Player struct {
@ -10,14 +11,22 @@ type Player struct {
Pusher *Pusher
cond *sync.Cond
queue []*RTPPack
queueLimit int
dropPacketWhenPaused bool
paused bool
}
func NewPlayer(session *Session, pusher *Pusher) (player *Player) {
queueLimit := utils.Conf().Section("rtsp").Key("player_queue_limit").MustInt(0)
dropPacketWhenPaused := utils.Conf().Section("rtsp").Key("drop_packet_when_paused").MustInt(0)
player = &Player{
Session: session,
Pusher: pusher,
cond: sync.NewCond(&sync.Mutex{}),
queue: make([]*RTPPack, 0),
queueLimit: queueLimit,
dropPacketWhenPaused: dropPacketWhenPaused != 0,
paused: false,
}
session.StopHandles = append(session.StopHandles, func() {
pusher.RemovePlayer(player)
@ -32,8 +41,18 @@ func (player *Player) QueueRTP(pack *RTPPack) *Player {
logger.Printf("player queue enter nil pack, drop it")
return player
}
if player.paused && player.dropPacketWhenPaused {
return player
}
player.cond.L.Lock()
player.queue = append(player.queue, pack)
if oldLen := len(player.queue); player.queueLimit > 0 && oldLen > player.queueLimit {
player.queue = player.queue[1:]
if player.debugLogEnable {
len := len(player.queue)
logger.Printf("Player %s, QueueRTP, exceeds limit(%d), drop %d old packets, current queue.len=%d\n", player.String(), player.queueLimit, oldLen - len, len)
}
}
player.cond.Signal()
player.cond.L.Unlock()
return player
@ -52,7 +71,11 @@ func (player *Player) Start() {
pack = player.queue[0]
player.queue = player.queue[1:]
}
queueLen := len(player.queue)
player.cond.L.Unlock()
if player.paused {
continue
}
if pack == nil {
if !player.Stoped {
logger.Printf("player not stoped, but queue take out nil pack")
@ -63,9 +86,23 @@ func (player *Player) Start() {
logger.Println(err)
}
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
logger.Printf("Send a package.type:%d\n", pack.Type)
if player.debugLogEnable && elapsed >= 30*time.Second {
logger.Printf("Player %s, Send a package.type:%d, queue.len=%d\n", player.String(), pack.Type, queueLen)
timer = time.Now()
}
}
}
func (player *Player) Pause(paused bool) {
if paused {
player.logger.Printf("Player %s, Pause\n", player.String())
} else {
player.logger.Printf("Player %s, Play\n", player.String())
}
player.cond.L.Lock()
if paused && player.dropPacketWhenPaused && len(player.queue) > 0 {
player.queue = make([]*RTPPack, 0)
}
player.paused = paused
player.cond.L.Unlock()
}

7
rtsp/pusher.go

@ -311,6 +311,13 @@ func (pusher *Pusher) GetPlayers() (players map[string]*Player) {
return
}
func (pusher *Pusher) HasPlayer(player *Player) bool {
pusher.playersLock.Lock()
_, ok := pusher.players[player.ID]
pusher.playersLock.Unlock()
return ok
}
func (pusher *Pusher) AddPlayer(player *Player) *Pusher {
logger := pusher.Logger()
if pusher.gopCacheEnable {

26
rtsp/rtsp-client.go

@ -48,6 +48,9 @@ type RTSPClient struct {
OptionIntervalMillis int64
SDPRaw string
debugLogEnable bool
lastRtpSN uint16
Agent string
authLine string
@ -71,6 +74,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
if err != nil {
return
}
debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0)
client = &RTSPClient{
Server: server,
Stoped: false,
@ -85,6 +89,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
OptionIntervalMillis: sendOptionMillis,
StartAt: time.Now(),
Agent: agent,
debugLogEnable: debugLogEnable != 0,
}
client.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", client.ID), log.LstdFlags|log.Lshortfile)
if !utils.Debug {
@ -417,11 +422,24 @@ func (client *RTSPClient) startStream() {
client.logger.Printf("session tcp got nil rtp pack")
continue
}
elapsed := time.Now().Sub(loggerTime)
if elapsed >= 10*time.Second {
client.logger.Printf("%v read rtp frame.", client)
loggerTime = time.Now()
if client.debugLogEnable {
rtp := ParseRTP(pack.Buffer.Bytes())
if rtp != nil {
rtpSN := uint16(rtp.SequenceNumber)
if client.lastRtpSN != 0 && client.lastRtpSN + 1 != rtpSN {
client.logger.Printf("%s, %d packets lost, current SN=%d, last SN=%d\n", client.String(), rtpSN - client.lastRtpSN, rtpSN, client.lastRtpSN)
}
client.lastRtpSN = rtpSN
}
elapsed := time.Now().Sub(loggerTime)
if elapsed >= 30*time.Second {
client.logger.Printf("%v read rtp frame.", client)
loggerTime = time.Now()
}
}
client.InBytes += int(length + 4)
for _, h := range client.RTPHandles {
h(pack)

18
rtsp/rtsp-session.go

@ -105,6 +105,7 @@ type Session struct {
authorizationEnable bool
nonce string
closeOld bool
debugLogEnable bool
AControl string
VControl string
@ -133,7 +134,7 @@ type Session struct {
}
func (session *Session) String() string {
return fmt.Sprintf("session[%v][%v][%s][%s]", session.Type, session.TransType, session.Path, session.ID)
return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.Path, session.ID, session.Conn.RemoteAddr().String())
}
func NewSession(server *Server, conn net.Conn) *Session {
@ -142,6 +143,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0)
close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0)
debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0)
session := &Session{
ID: shortid.MustGenerate(),
Server: server,
@ -150,6 +152,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
StartAt: time.Now(),
Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0),
authorizationEnable: authorizationEnable != 0,
debugLogEnable: debugLogEnable != 0,
RTPHandles: make([]func(*RTPPack), 0),
StopHandles: make([]func(), 0),
vRTPChannel: -1,
@ -383,7 +386,11 @@ func (session *Session) handleRequest(req *Request) {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
session.Pusher.AddPlayer(session.Player)
if session.Pusher.HasPlayer(session.Player) {
session.Player.Pause(false)
} else {
session.Pusher.AddPlayer(session.Player)
}
// case SESSION_TYPE_PUSHER:
// session.Server.AddPusher(session.Pusher)
}
@ -683,6 +690,13 @@ func (session *Session) handleRequest(req *Request) {
res.Status = "Error Status"
return
}
case "PAUSE":
if session.Player == nil {
res.StatusCode = 500
res.Status = "Error Status"
return
}
session.Player.Pause(true)
}
}

Loading…
Cancel
Save