mirror of https://github.com/EasyDarwin/EasyDarwin
Merge pull request #154 from zhuohekeji/zhdev
support PAUSE method, dropping packets when paused or the length of q…pull/172/head
commit
8637f73e52
|
@ -3,6 +3,7 @@ package rtsp
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"github.com/penggy/EasyGoLib/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Player struct {
|
type Player struct {
|
||||||
|
@ -10,14 +11,22 @@ type Player struct {
|
||||||
Pusher *Pusher
|
Pusher *Pusher
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
queue []*RTPPack
|
queue []*RTPPack
|
||||||
|
queueLimit int
|
||||||
|
dropPacketWhenPaused bool
|
||||||
|
paused bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlayer(session *Session, pusher *Pusher) (player *Player) {
|
func NewPlayer(session *Session, pusher *Pusher) (player *Player) {
|
||||||
|
queueLimit := utils.Conf().Section("rtsp").Key("player_queue_limit").MustInt(0)
|
||||||
|
dropPacketWhenPaused := utils.Conf().Section("rtsp").Key("drop_packet_when_paused").MustInt(0)
|
||||||
player = &Player{
|
player = &Player{
|
||||||
Session: session,
|
Session: session,
|
||||||
Pusher: pusher,
|
Pusher: pusher,
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
cond: sync.NewCond(&sync.Mutex{}),
|
||||||
queue: make([]*RTPPack, 0),
|
queue: make([]*RTPPack, 0),
|
||||||
|
queueLimit: queueLimit,
|
||||||
|
dropPacketWhenPaused: dropPacketWhenPaused != 0,
|
||||||
|
paused: false,
|
||||||
}
|
}
|
||||||
session.StopHandles = append(session.StopHandles, func() {
|
session.StopHandles = append(session.StopHandles, func() {
|
||||||
pusher.RemovePlayer(player)
|
pusher.RemovePlayer(player)
|
||||||
|
@ -32,8 +41,18 @@ func (player *Player) QueueRTP(pack *RTPPack) *Player {
|
||||||
logger.Printf("player queue enter nil pack, drop it")
|
logger.Printf("player queue enter nil pack, drop it")
|
||||||
return player
|
return player
|
||||||
}
|
}
|
||||||
|
if player.paused && player.dropPacketWhenPaused {
|
||||||
|
return player
|
||||||
|
}
|
||||||
player.cond.L.Lock()
|
player.cond.L.Lock()
|
||||||
player.queue = append(player.queue, pack)
|
player.queue = append(player.queue, pack)
|
||||||
|
if oldLen := len(player.queue); player.queueLimit > 0 && oldLen > player.queueLimit {
|
||||||
|
player.queue = player.queue[1:]
|
||||||
|
if player.debugLogEnable {
|
||||||
|
len := len(player.queue)
|
||||||
|
logger.Printf("Player %s, QueueRTP, exceeds limit(%d), drop %d old packets, current queue.len=%d\n", player.String(), player.queueLimit, oldLen - len, len)
|
||||||
|
}
|
||||||
|
}
|
||||||
player.cond.Signal()
|
player.cond.Signal()
|
||||||
player.cond.L.Unlock()
|
player.cond.L.Unlock()
|
||||||
return player
|
return player
|
||||||
|
@ -52,7 +71,11 @@ func (player *Player) Start() {
|
||||||
pack = player.queue[0]
|
pack = player.queue[0]
|
||||||
player.queue = player.queue[1:]
|
player.queue = player.queue[1:]
|
||||||
}
|
}
|
||||||
|
queueLen := len(player.queue)
|
||||||
player.cond.L.Unlock()
|
player.cond.L.Unlock()
|
||||||
|
if player.paused {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if pack == nil {
|
if pack == nil {
|
||||||
if !player.Stoped {
|
if !player.Stoped {
|
||||||
logger.Printf("player not stoped, but queue take out nil pack")
|
logger.Printf("player not stoped, but queue take out nil pack")
|
||||||
|
@ -63,9 +86,23 @@ func (player *Player) Start() {
|
||||||
logger.Println(err)
|
logger.Println(err)
|
||||||
}
|
}
|
||||||
elapsed := time.Now().Sub(timer)
|
elapsed := time.Now().Sub(timer)
|
||||||
if elapsed >= 30*time.Second {
|
if player.debugLogEnable && elapsed >= 30*time.Second {
|
||||||
logger.Printf("Send a package.type:%d\n", pack.Type)
|
logger.Printf("Player %s, Send a package.type:%d, queue.len=%d\n", player.String(), pack.Type, queueLen)
|
||||||
timer = time.Now()
|
timer = time.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (player *Player) Pause(paused bool) {
|
||||||
|
if paused {
|
||||||
|
player.logger.Printf("Player %s, Pause\n", player.String())
|
||||||
|
} else {
|
||||||
|
player.logger.Printf("Player %s, Play\n", player.String())
|
||||||
|
}
|
||||||
|
player.cond.L.Lock()
|
||||||
|
if paused && player.dropPacketWhenPaused && len(player.queue) > 0 {
|
||||||
|
player.queue = make([]*RTPPack, 0)
|
||||||
|
}
|
||||||
|
player.paused = paused
|
||||||
|
player.cond.L.Unlock()
|
||||||
|
}
|
|
@ -311,6 +311,13 @@ func (pusher *Pusher) GetPlayers() (players map[string]*Player) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pusher *Pusher) HasPlayer(player *Player) bool {
|
||||||
|
pusher.playersLock.Lock()
|
||||||
|
_, ok := pusher.players[player.ID]
|
||||||
|
pusher.playersLock.Unlock()
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
func (pusher *Pusher) AddPlayer(player *Player) *Pusher {
|
func (pusher *Pusher) AddPlayer(player *Player) *Pusher {
|
||||||
logger := pusher.Logger()
|
logger := pusher.Logger()
|
||||||
if pusher.gopCacheEnable {
|
if pusher.gopCacheEnable {
|
||||||
|
|
|
@ -48,6 +48,9 @@ type RTSPClient struct {
|
||||||
OptionIntervalMillis int64
|
OptionIntervalMillis int64
|
||||||
SDPRaw string
|
SDPRaw string
|
||||||
|
|
||||||
|
debugLogEnable bool
|
||||||
|
lastRtpSN uint16
|
||||||
|
|
||||||
Agent string
|
Agent string
|
||||||
authLine string
|
authLine string
|
||||||
|
|
||||||
|
@ -71,6 +74,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0)
|
||||||
client = &RTSPClient{
|
client = &RTSPClient{
|
||||||
Server: server,
|
Server: server,
|
||||||
Stoped: false,
|
Stoped: false,
|
||||||
|
@ -85,6 +89,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
|
||||||
OptionIntervalMillis: sendOptionMillis,
|
OptionIntervalMillis: sendOptionMillis,
|
||||||
StartAt: time.Now(),
|
StartAt: time.Now(),
|
||||||
Agent: agent,
|
Agent: agent,
|
||||||
|
debugLogEnable: debugLogEnable != 0,
|
||||||
}
|
}
|
||||||
client.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", client.ID), log.LstdFlags|log.Lshortfile)
|
client.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", client.ID), log.LstdFlags|log.Lshortfile)
|
||||||
if !utils.Debug {
|
if !utils.Debug {
|
||||||
|
@ -417,11 +422,24 @@ func (client *RTSPClient) startStream() {
|
||||||
client.logger.Printf("session tcp got nil rtp pack")
|
client.logger.Printf("session tcp got nil rtp pack")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
elapsed := time.Now().Sub(loggerTime)
|
|
||||||
if elapsed >= 10*time.Second {
|
if client.debugLogEnable {
|
||||||
client.logger.Printf("%v read rtp frame.", client)
|
rtp := ParseRTP(pack.Buffer.Bytes())
|
||||||
loggerTime = time.Now()
|
if rtp != nil {
|
||||||
|
rtpSN := uint16(rtp.SequenceNumber)
|
||||||
|
if client.lastRtpSN != 0 && client.lastRtpSN + 1 != rtpSN {
|
||||||
|
client.logger.Printf("%s, %d packets lost, current SN=%d, last SN=%d\n", client.String(), rtpSN - client.lastRtpSN, rtpSN, client.lastRtpSN)
|
||||||
|
}
|
||||||
|
client.lastRtpSN = rtpSN
|
||||||
|
}
|
||||||
|
|
||||||
|
elapsed := time.Now().Sub(loggerTime)
|
||||||
|
if elapsed >= 30*time.Second {
|
||||||
|
client.logger.Printf("%v read rtp frame.", client)
|
||||||
|
loggerTime = time.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client.InBytes += int(length + 4)
|
client.InBytes += int(length + 4)
|
||||||
for _, h := range client.RTPHandles {
|
for _, h := range client.RTPHandles {
|
||||||
h(pack)
|
h(pack)
|
||||||
|
|
|
@ -105,6 +105,7 @@ type Session struct {
|
||||||
authorizationEnable bool
|
authorizationEnable bool
|
||||||
nonce string
|
nonce string
|
||||||
closeOld bool
|
closeOld bool
|
||||||
|
debugLogEnable bool
|
||||||
|
|
||||||
AControl string
|
AControl string
|
||||||
VControl string
|
VControl string
|
||||||
|
@ -133,7 +134,7 @@ type Session struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (session *Session) String() string {
|
func (session *Session) String() string {
|
||||||
return fmt.Sprintf("session[%v][%v][%s][%s]", session.Type, session.TransType, session.Path, session.ID)
|
return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.Path, session.ID, session.Conn.RemoteAddr().String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSession(server *Server, conn net.Conn) *Session {
|
func NewSession(server *Server, conn net.Conn) *Session {
|
||||||
|
@ -142,6 +143,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
|
||||||
timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
|
timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
|
||||||
authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0)
|
authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0)
|
||||||
close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0)
|
close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0)
|
||||||
|
debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0)
|
||||||
session := &Session{
|
session := &Session{
|
||||||
ID: shortid.MustGenerate(),
|
ID: shortid.MustGenerate(),
|
||||||
Server: server,
|
Server: server,
|
||||||
|
@ -150,6 +152,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
|
||||||
StartAt: time.Now(),
|
StartAt: time.Now(),
|
||||||
Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0),
|
Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0),
|
||||||
authorizationEnable: authorizationEnable != 0,
|
authorizationEnable: authorizationEnable != 0,
|
||||||
|
debugLogEnable: debugLogEnable != 0,
|
||||||
RTPHandles: make([]func(*RTPPack), 0),
|
RTPHandles: make([]func(*RTPPack), 0),
|
||||||
StopHandles: make([]func(), 0),
|
StopHandles: make([]func(), 0),
|
||||||
vRTPChannel: -1,
|
vRTPChannel: -1,
|
||||||
|
@ -383,7 +386,11 @@ func (session *Session) handleRequest(req *Request) {
|
||||||
case "PLAY", "RECORD":
|
case "PLAY", "RECORD":
|
||||||
switch session.Type {
|
switch session.Type {
|
||||||
case SESSEION_TYPE_PLAYER:
|
case SESSEION_TYPE_PLAYER:
|
||||||
session.Pusher.AddPlayer(session.Player)
|
if session.Pusher.HasPlayer(session.Player) {
|
||||||
|
session.Player.Pause(false)
|
||||||
|
} else {
|
||||||
|
session.Pusher.AddPlayer(session.Player)
|
||||||
|
}
|
||||||
// case SESSION_TYPE_PUSHER:
|
// case SESSION_TYPE_PUSHER:
|
||||||
// session.Server.AddPusher(session.Pusher)
|
// session.Server.AddPusher(session.Pusher)
|
||||||
}
|
}
|
||||||
|
@ -683,6 +690,13 @@ func (session *Session) handleRequest(req *Request) {
|
||||||
res.Status = "Error Status"
|
res.Status = "Error Status"
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case "PAUSE":
|
||||||
|
if session.Player == nil {
|
||||||
|
res.StatusCode = 500
|
||||||
|
res.Status = "Error Status"
|
||||||
|
return
|
||||||
|
}
|
||||||
|
session.Player.Pause(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue