From 34e5ec416cbe60dc936e3253b8b4a5a7a01fd348 Mon Sep 17 00:00:00 2001 From: "milkywayrivers@gmail.com" Date: Fri, 1 Mar 2019 21:12:11 +0800 Subject: [PATCH] add close old option --- .gitignore | 1 + easydarwin.ini | 3 +++ main.go | 2 +- routers/streams.go | 2 +- rtsp/pusher.go | 4 ++-- rtsp/rtsp-server.go | 19 ++++++++++++++++--- rtsp/rtsp-session.go | 25 ++++++++++++++----------- 7 files changed, 38 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 50ed1896..3d9d4fb9 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ easydarwin_debug.ini Unnamed dev.ini debug +debug.ini diff --git a/easydarwin.ini b/easydarwin.ini index ccd6800f..0b41fbfe 100644 --- a/easydarwin.ini +++ b/easydarwin.ini @@ -12,6 +12,9 @@ timeout=28800 ; 是否使能gop cache。如果使能,服务器会缓存最后一个I帧以及其后的非I帧,以提高播放速度。但是可能在高并发的情况下带来内存压力。 gop_cache_enable=1 +; 新的session连接时,如果已有同一个PATH的session在推流,则关闭之前的。 +close_old=1 + ; 是否使能向服务器推流或者从服务器播放时验证用户名密码. [注意] 因为服务器端并不保存明文密码,所以推送或者播放时,客户端应该输入密码的md5后的值。 ; password should be the hex of md5(original password) authorization_enable=0 diff --git a/main.go b/main.go index 0f861195..55ddfa81 100644 --- a/main.go +++ b/main.go @@ -155,7 +155,7 @@ func (p *program) Start(s service.Service) (err error) { log.Printf("Pull stream err :%v", err) continue } - rtsp.GetServer().AddPusher(pusher) + rtsp.GetServer().AddPusher(pusher, false) //streams = streams[0:i] //streams = append(streams[:i], streams[i+1:]...) } diff --git a/routers/streams.go b/routers/streams.go index b1ce7046..143c8e4e 100644 --- a/routers/streams.go +++ b/routers/streams.go @@ -77,7 +77,7 @@ func (h *APIHandler) StreamStart(c *gin.Context) { return } log.Printf("Pull to push %v success ", form) - rtsp.GetServer().AddPusher(pusher) + rtsp.GetServer().AddPusher(pusher, false) // save to db. var stream = models.Stream{ URL: form.URL, diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 5c87d964..2763c24f 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -232,7 +232,7 @@ func (pusher *Pusher) Start() { if pusher.gopCacheEnable && pack.Type == RTP_TYPE_VIDEO { pusher.gopCacheLock.Lock() - if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && pusher.shouldSequeceStart(rtp) { + if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && pusher.shouldSequenceStart(rtp) { pusher.gopCache = make([]*RTPPack, 0) } pusher.gopCache = append(pusher.gopCache, pack) @@ -318,7 +318,7 @@ func (pusher *Pusher) ClearPlayer() { } } -func (pusher *Pusher) shouldSequeceStart(rtp *RTPInfo) bool { +func (pusher *Pusher) shouldSequenceStart(rtp *RTPInfo) bool { if strings.EqualFold(pusher.VCodec(), "h264") { var realNALU uint8 payloadHeader := rtp.Payload[0] //https://tools.ietf.org/html/rfc6184#section-5.2 diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index ff185ce1..4ee6211a 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -187,20 +187,33 @@ func (server *Server) Stop() { close(server.removePusherCh) } -func (server *Server) AddPusher(pusher *Pusher) { +func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool { logger := server.logger added := false server.pushersLock.Lock() - if _, ok := server.pushers[pusher.Path()]; !ok { + old, ok := server.pushers[pusher.Path()] + if !ok { server.pushers[pusher.Path()] = pusher - go pusher.Start() logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) added = true + } else { + if closeOld { + server.pushers[pusher.Path()] = pusher + logger.Printf("%v start, replace old pusher", pusher) + added = true + } } server.pushersLock.Unlock() + if ok && closeOld { + logger.Printf("old pusher %v stoped", pusher) + old.Stop() + server.removePusherCh <- old + } if added { + go pusher.Start() server.addPusherCh <- pusher } + return added } func (server *Server) RemovePusher(pusher *Pusher) { diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 746f8ae0..01105189 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -104,6 +104,7 @@ type Session struct { authorizationEnable bool nonce string + closeOld bool AControl string VControl string @@ -140,6 +141,7 @@ func NewSession(server *Server, conn net.Conn) *Session { timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0) + close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0) session := &Session{ ID: shortid.MustGenerate(), Server: server, @@ -154,6 +156,7 @@ func NewSession(server *Server, conn net.Conn) *Session { vRTPControlChannel: -1, aRTPChannel: -1, aRTPControlChannel: -1, + closeOld: close_old != 0, } session.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", session.ID), log.LstdFlags|log.Lshortfile) @@ -365,13 +368,13 @@ func (session *Session) handleRequest(req *Request) { session.connWLock.Unlock() session.OutBytes += len(outBytes) switch req.Method { - case "PLAY", "RECORD": - switch session.Type { - case SESSEION_TYPE_PLAYER: - session.Pusher.AddPlayer(session.Player) - case SESSION_TYPE_PUSHER: - session.Server.AddPusher(session.Pusher) - } + // case "PLAY", "RECORD": + // switch session.Type { + // case SESSEION_TYPE_PLAYER: + // session.Pusher.AddPlayer(session.Player) + // case SESSION_TYPE_PUSHER: + // session.Server.AddPusher(session.Pusher) + // } case "TEARDOWN": { session.Stop() @@ -435,12 +438,12 @@ func (session *Session) handleRequest(req *Request) { logger.Printf("video codec[%s]\n", session.VCodec) } session.Pusher = NewPusher(session) - if session.Server.GetPusher(session.Path) == nil { - session.Server.AddPusher(session.Pusher) - } else { + + addedToServer := session.Server.AddPusher(session.Pusher, session.closeOld) + if !addedToServer { + logger.Printf("reject pusher.") res.StatusCode = 406 res.Status = "Not Acceptable" - return } case "DESCRIBE": session.Type = SESSEION_TYPE_PLAYER