diff --git a/easydarwin.ini b/easydarwin.ini index ecc5f2ed..0ed7b516 100644 --- a/easydarwin.ini +++ b/easydarwin.ini @@ -12,9 +12,14 @@ timeout=28800 ; 是否使能gop cache。如果使能,服务器会缓存最后一个I帧以及其后的非I帧,以提高播放速度。但是可能在高并发的情况下带来内存压力。 gop_cache_enable=1 -; 新的session连接时,如果已有同一个PATH的session在推流,则关闭之前的。 +; 新的推流器连接时,如果已有同一个推流器(PATH相同)在推流,是否关闭老的推流器。 +; 如果为0,则不会关闭老的推流器,新的推流器会被响应406错误,否则会关闭老的推流器,新的推流器会响应成功。 close_old=0 +; 当close_old为1时,是否保留被关闭的推流器对应的播放器。 +; 如果为0,则原推流器对应的播放器会被断开。否则会被保留下来。注意,如果该选项为1,可能某些播放器会有异常,因为RTP序列可能不一致了。 +keep_players=0 + ; 是否使能向服务器推流或者从服务器播放时验证用户名密码. [注意] 因为服务器端并不保存明文密码,所以推送或者播放时,客户端应该输入密码的md5后的值。 ; password should be the hex of md5(original password) authorization_enable=0 diff --git a/main.go b/main.go index 55ddfa81..0f861195 100644 --- a/main.go +++ b/main.go @@ -155,7 +155,7 @@ func (p *program) Start(s service.Service) (err error) { log.Printf("Pull stream err :%v", err) continue } - rtsp.GetServer().AddPusher(pusher, false) + rtsp.GetServer().AddPusher(pusher) //streams = streams[0:i] //streams = append(streams[:i], streams[i+1:]...) } diff --git a/routers/streams.go b/routers/streams.go index 143c8e4e..b1ce7046 100644 --- a/routers/streams.go +++ b/routers/streams.go @@ -77,7 +77,7 @@ func (h *APIHandler) StreamStart(c *gin.Context) { return } log.Printf("Pull to push %v success ", form) - rtsp.GetServer().AddPusher(pusher, false) + rtsp.GetServer().AddPusher(pusher) // save to db. var stream = models.Stream{ URL: form.URL, diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 2763c24f..2aa98e90 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -12,13 +12,12 @@ import ( type Pusher struct { *Session *RTSPClient - players map[string]*Player //SessionID <-> Player - playersLock sync.RWMutex - gopCacheEnable bool - gopCache []*RTPPack - gopCacheLock sync.RWMutex - UDPServer *UDPServer - + players map[string]*Player //SessionID <-> Player + playersLock sync.RWMutex + gopCacheEnable bool + gopCache []*RTPPack + gopCacheLock sync.RWMutex + UDPServer *UDPServer spsppsInSTAPaPack bool cond *sync.Cond queue []*RTPPack @@ -187,10 +186,24 @@ func NewPusher(session *Session) (pusher *Pusher) { cond: sync.NewCond(&sync.Mutex{}), queue: make([]*RTPPack, 0), } + pusher.bindSession(session) + return +} + +func (pusher *Pusher) bindSession(session *Session) { + pusher.Session = session session.RTPHandles = append(session.RTPHandles, func(pack *RTPPack) { + if session != pusher.Session { + session.logger.Printf("Session recv rtp to pusher.but pusher got a new session[%v].", pusher.Session.ID) + return + } pusher.QueueRTP(pack) }) session.StopHandles = append(session.StopHandles, func() { + if session != pusher.Session { + session.logger.Printf("Session stop to release pusher.but pusher got a new session[%v].", pusher.Session.ID) + return + } pusher.ClearPlayer() pusher.Server().RemovePusher(pusher) pusher.cond.Broadcast() @@ -199,7 +212,37 @@ func NewPusher(session *Session) (pusher *Pusher) { pusher.UDPServer = nil } }) - return +} + +func (pusher *Pusher) RebindSession(session *Session) bool { + if pusher.RTSPClient != nil { + pusher.Logger().Printf("call RebindSession[%s] to a Client-Pusher. got false", session.ID) + return false + } + sess := pusher.Session + pusher.bindSession(session) + session.Pusher = pusher + + pusher.gopCacheLock.Lock() + pusher.gopCache = make([]*RTPPack, 0) + pusher.gopCacheLock.Unlock() + if sess != nil { + sess.Stop() + } + return true +} + +func (pusher *Pusher) RebindClient(client *RTSPClient) bool { + if pusher.Session != nil { + pusher.Logger().Printf("call RebindClient[%s] to a Session-Pusher. got false", client.ID) + return false + } + sess := pusher.RTSPClient + pusher.RTSPClient = client + if sess != nil { + sess.Stop() + } + return true } func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher { @@ -312,10 +355,11 @@ func (pusher *Pusher) ClearPlayer() { } pusher.players = make(map[string]*Player) pusher.playersLock.Unlock() - - for _, v := range players { - v.Stop() - } + go func() { // do not block + for _, v := range players { + v.Stop() + } + }() } func (pusher *Pusher) shouldSequenceStart(rtp *RTPInfo) bool { diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index 29711419..ab080101 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -501,7 +501,7 @@ func (client *RTSPClient) Stop() { client.Conn.Close() client.Conn = nil } - if client.UDPServer != nil{ + if client.UDPServer != nil { client.UDPServer.Stop() client.UDPServer = nil } diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index 4ee6211a..59bc2ad8 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -187,28 +187,19 @@ func (server *Server) Stop() { close(server.removePusherCh) } -func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool { +func (server *Server) AddPusher(pusher *Pusher) bool { logger := server.logger added := false server.pushersLock.Lock() - old, ok := server.pushers[pusher.Path()] + _, ok := server.pushers[pusher.Path()] if !ok { server.pushers[pusher.Path()] = pusher logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) added = true } else { - if closeOld { - server.pushers[pusher.Path()] = pusher - logger.Printf("%v start, replace old pusher", pusher) - added = true - } + added = false } server.pushersLock.Unlock() - if ok && closeOld { - logger.Printf("old pusher %v stoped", pusher) - old.Stop() - server.removePusherCh <- old - } if added { go pusher.Start() server.addPusherCh <- pusher @@ -216,6 +207,23 @@ func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool { return added } +func (server *Server) TryAttachToPusher(session *Session) (int, *Pusher) { + server.pushersLock.Lock() + attached := 0 + var pusher *Pusher = nil + if _pusher, ok := server.pushers[session.Path]; ok { + if _pusher.RebindSession(session) { + session.logger.Printf("Attached to a pusher") + attached = 1 + pusher = _pusher + } else { + attached = -1 + } + } + server.pushersLock.Unlock() + return attached, pusher +} + func (server *Server) RemovePusher(pusher *Pusher) { logger := server.logger removed := false diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 09773c2b..5183c0e8 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -368,7 +368,7 @@ func (session *Session) handleRequest(req *Request) { res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "") defer func() { if p := recover(); p != nil { - logger.Printf("handleRequest err ocurs:%v",p) + logger.Printf("handleRequest err ocurs:%v", p) res.StatusCode = 500 res.Status = fmt.Sprintf("Inner Server Error, %v", p) } @@ -449,13 +449,46 @@ func (session *Session) handleRequest(req *Request) { session.VCodec = sdp.Codec logger.Printf("video codec[%s]\n", session.VCodec) } - session.Pusher = NewPusher(session) - - addedToServer := session.Server.AddPusher(session.Pusher, session.closeOld) - if !addedToServer { - logger.Printf("reject pusher.") - res.StatusCode = 406 - res.Status = "Not Acceptable" + addPusher := false + if session.closeOld { + r, _ := session.Server.TryAttachToPusher(session) + if r < -1 { + logger.Printf("reject pusher.") + res.StatusCode = 406 + res.Status = "Not Acceptable" + } else if r == 0 { + addPusher = true + } else { + logger.Printf("Attached to old pusher") + // 尝试发给客户端ANNOUCE + // players := pusher.GetPlayers() + // for _, v := range players { + // sess := v.Session + + // hearers := make(map[string]string) + // hearers["Content-Type"] = "application/sdp" + // hearers["Session"] = sess.ID + // hearers["Content-Length"] = strconv.Itoa(len(v.SDPRaw)) + // var req = Request{Method: ANNOUNCE, URL: v.URL, Version: "1.0", Header: hearers, Body: pusher.SDPRaw()} + // sess.connWLock.Lock() + // logger.Println(req.String()) + // outBytes := []byte(req.String()) + // sess.connRW.Write(outBytes) + // sess.connRW.Flush() + // sess.connWLock.Unlock() + // } + } + } else { + addPusher = true + } + if addPusher { + session.Pusher = NewPusher(session) + addedToServer := session.Server.AddPusher(session.Pusher) + if !addedToServer { + logger.Printf("reject pusher.") + res.StatusCode = 406 + res.Status = "Not Acceptable" + } } case "DESCRIBE": session.Type = SESSEION_TYPE_PLAYER