Browse Source

keep players when pusher been replaced

pull/154/head
milkywayrivers@gmail.com 6 years ago
parent
commit
c875da4d94
  1. 7
      easydarwin.ini
  2. 2
      main.go
  3. 2
      routers/streams.go
  4. 50
      rtsp/pusher.go
  5. 2
      rtsp/rtsp-client.go
  6. 32
      rtsp/rtsp-server.go
  7. 39
      rtsp/rtsp-session.go

7
easydarwin.ini

@ -12,9 +12,14 @@ timeout=28800
; 是否使能gop cache。如果使能,服务器会缓存最后一个I帧以及其后的非I帧,以提高播放速度。但是可能在高并发的情况下带来内存压力。
gop_cache_enable=1
; 新的session连接时,如果已有同一个PATH的session在推流,则关闭之前的。
; 新的推流器连接时,如果已有同一个推流器(PATH相同)在推流,是否关闭老的推流器。
; 如果为0,则不会关闭老的推流器,新的推流器会被响应406错误,否则会关闭老的推流器,新的推流器会响应成功。
close_old=0
; 当close_old为1时,是否保留被关闭的推流器对应的播放器。
; 如果为0,则原推流器对应的播放器会被断开。否则会被保留下来。注意,如果该选项为1,可能某些播放器会有异常,因为RTP序列可能不一致了。
keep_players=0
; 是否使能向服务器推流或者从服务器播放时验证用户名密码. [注意] 因为服务器端并不保存明文密码,所以推送或者播放时,客户端应该输入密码的md5后的值。
; password should be the hex of md5(original password)
authorization_enable=0

2
main.go

@ -155,7 +155,7 @@ func (p *program) Start(s service.Service) (err error) {
log.Printf("Pull stream err :%v", err)
continue
}
rtsp.GetServer().AddPusher(pusher, false)
rtsp.GetServer().AddPusher(pusher)
//streams = streams[0:i]
//streams = append(streams[:i], streams[i+1:]...)
}

2
routers/streams.go

@ -77,7 +77,7 @@ func (h *APIHandler) StreamStart(c *gin.Context) {
return
}
log.Printf("Pull to push %v success ", form)
rtsp.GetServer().AddPusher(pusher, false)
rtsp.GetServer().AddPusher(pusher)
// save to db.
var stream = models.Stream{
URL: form.URL,

50
rtsp/pusher.go

@ -18,7 +18,6 @@ type Pusher struct {
gopCache []*RTPPack
gopCacheLock sync.RWMutex
UDPServer *UDPServer
spsppsInSTAPaPack bool
cond *sync.Cond
queue []*RTPPack
@ -187,10 +186,24 @@ func NewPusher(session *Session) (pusher *Pusher) {
cond: sync.NewCond(&sync.Mutex{}),
queue: make([]*RTPPack, 0),
}
pusher.bindSession(session)
return
}
func (pusher *Pusher) bindSession(session *Session) {
pusher.Session = session
session.RTPHandles = append(session.RTPHandles, func(pack *RTPPack) {
if session != pusher.Session {
session.logger.Printf("Session recv rtp to pusher.but pusher got a new session[%v].", pusher.Session.ID)
return
}
pusher.QueueRTP(pack)
})
session.StopHandles = append(session.StopHandles, func() {
if session != pusher.Session {
session.logger.Printf("Session stop to release pusher.but pusher got a new session[%v].", pusher.Session.ID)
return
}
pusher.ClearPlayer()
pusher.Server().RemovePusher(pusher)
pusher.cond.Broadcast()
@ -199,7 +212,37 @@ func NewPusher(session *Session) (pusher *Pusher) {
pusher.UDPServer = nil
}
})
return
}
func (pusher *Pusher) RebindSession(session *Session) bool {
if pusher.RTSPClient != nil {
pusher.Logger().Printf("call RebindSession[%s] to a Client-Pusher. got false", session.ID)
return false
}
sess := pusher.Session
pusher.bindSession(session)
session.Pusher = pusher
pusher.gopCacheLock.Lock()
pusher.gopCache = make([]*RTPPack, 0)
pusher.gopCacheLock.Unlock()
if sess != nil {
sess.Stop()
}
return true
}
func (pusher *Pusher) RebindClient(client *RTSPClient) bool {
if pusher.Session != nil {
pusher.Logger().Printf("call RebindClient[%s] to a Session-Pusher. got false", client.ID)
return false
}
sess := pusher.RTSPClient
pusher.RTSPClient = client
if sess != nil {
sess.Stop()
}
return true
}
func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher {
@ -312,10 +355,11 @@ func (pusher *Pusher) ClearPlayer() {
}
pusher.players = make(map[string]*Player)
pusher.playersLock.Unlock()
go func() { // do not block
for _, v := range players {
v.Stop()
}
}()
}
func (pusher *Pusher) shouldSequenceStart(rtp *RTPInfo) bool {

2
rtsp/rtsp-client.go

@ -501,7 +501,7 @@ func (client *RTSPClient) Stop() {
client.Conn.Close()
client.Conn = nil
}
if client.UDPServer != nil{
if client.UDPServer != nil {
client.UDPServer.Stop()
client.UDPServer = nil
}

32
rtsp/rtsp-server.go

@ -187,28 +187,19 @@ func (server *Server) Stop() {
close(server.removePusherCh)
}
func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool {
func (server *Server) AddPusher(pusher *Pusher) bool {
logger := server.logger
added := false
server.pushersLock.Lock()
old, ok := server.pushers[pusher.Path()]
_, ok := server.pushers[pusher.Path()]
if !ok {
server.pushers[pusher.Path()] = pusher
logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers))
added = true
} else {
if closeOld {
server.pushers[pusher.Path()] = pusher
logger.Printf("%v start, replace old pusher", pusher)
added = true
}
added = false
}
server.pushersLock.Unlock()
if ok && closeOld {
logger.Printf("old pusher %v stoped", pusher)
old.Stop()
server.removePusherCh <- old
}
if added {
go pusher.Start()
server.addPusherCh <- pusher
@ -216,6 +207,23 @@ func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool {
return added
}
func (server *Server) TryAttachToPusher(session *Session) (int, *Pusher) {
server.pushersLock.Lock()
attached := 0
var pusher *Pusher = nil
if _pusher, ok := server.pushers[session.Path]; ok {
if _pusher.RebindSession(session) {
session.logger.Printf("Attached to a pusher")
attached = 1
pusher = _pusher
} else {
attached = -1
}
}
server.pushersLock.Unlock()
return attached, pusher
}
func (server *Server) RemovePusher(pusher *Pusher) {
logger := server.logger
removed := false

39
rtsp/rtsp-session.go

@ -368,7 +368,7 @@ func (session *Session) handleRequest(req *Request) {
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
defer func() {
if p := recover(); p != nil {
logger.Printf("handleRequest err ocurs:%v",p)
logger.Printf("handleRequest err ocurs:%v", p)
res.StatusCode = 500
res.Status = fmt.Sprintf("Inner Server Error, %v", p)
}
@ -449,14 +449,47 @@ func (session *Session) handleRequest(req *Request) {
session.VCodec = sdp.Codec
logger.Printf("video codec[%s]\n", session.VCodec)
}
addPusher := false
if session.closeOld {
r, _ := session.Server.TryAttachToPusher(session)
if r < -1 {
logger.Printf("reject pusher.")
res.StatusCode = 406
res.Status = "Not Acceptable"
} else if r == 0 {
addPusher = true
} else {
logger.Printf("Attached to old pusher")
// 尝试发给客户端ANNOUCE
// players := pusher.GetPlayers()
// for _, v := range players {
// sess := v.Session
// hearers := make(map[string]string)
// hearers["Content-Type"] = "application/sdp"
// hearers["Session"] = sess.ID
// hearers["Content-Length"] = strconv.Itoa(len(v.SDPRaw))
// var req = Request{Method: ANNOUNCE, URL: v.URL, Version: "1.0", Header: hearers, Body: pusher.SDPRaw()}
// sess.connWLock.Lock()
// logger.Println(req.String())
// outBytes := []byte(req.String())
// sess.connRW.Write(outBytes)
// sess.connRW.Flush()
// sess.connWLock.Unlock()
// }
}
} else {
addPusher = true
}
if addPusher {
session.Pusher = NewPusher(session)
addedToServer := session.Server.AddPusher(session.Pusher, session.closeOld)
addedToServer := session.Server.AddPusher(session.Pusher)
if !addedToServer {
logger.Printf("reject pusher.")
res.StatusCode = 406
res.Status = "Not Acceptable"
}
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL

Loading…
Cancel
Save