add close old option

pull/154/head
milkywayrivers@gmail.com 6 years ago
parent 0e3adbd5f7
commit 34e5ec416c

1
.gitignore vendored

@ -17,3 +17,4 @@ easydarwin_debug.ini
Unnamed Unnamed
dev.ini dev.ini
debug debug
debug.ini

@ -12,6 +12,9 @@ timeout=28800
; 是否使能gop cache。如果使能服务器会缓存最后一个I帧以及其后的非I帧以提高播放速度。但是可能在高并发的情况下带来内存压力。 ; 是否使能gop cache。如果使能服务器会缓存最后一个I帧以及其后的非I帧以提高播放速度。但是可能在高并发的情况下带来内存压力。
gop_cache_enable=1 gop_cache_enable=1
; 新的session连接时如果已有同一个PATH的session在推流则关闭之前的。
close_old=1
; 是否使能向服务器推流或者从服务器播放时验证用户名密码. [注意] 因为服务器端并不保存明文密码所以推送或者播放时客户端应该输入密码的md5后的值。 ; 是否使能向服务器推流或者从服务器播放时验证用户名密码. [注意] 因为服务器端并不保存明文密码所以推送或者播放时客户端应该输入密码的md5后的值。
; password should be the hex of md5(original password) ; password should be the hex of md5(original password)
authorization_enable=0 authorization_enable=0

@ -155,7 +155,7 @@ func (p *program) Start(s service.Service) (err error) {
log.Printf("Pull stream err :%v", err) log.Printf("Pull stream err :%v", err)
continue continue
} }
rtsp.GetServer().AddPusher(pusher) rtsp.GetServer().AddPusher(pusher, false)
//streams = streams[0:i] //streams = streams[0:i]
//streams = append(streams[:i], streams[i+1:]...) //streams = append(streams[:i], streams[i+1:]...)
} }

@ -77,7 +77,7 @@ func (h *APIHandler) StreamStart(c *gin.Context) {
return return
} }
log.Printf("Pull to push %v success ", form) log.Printf("Pull to push %v success ", form)
rtsp.GetServer().AddPusher(pusher) rtsp.GetServer().AddPusher(pusher, false)
// save to db. // save to db.
var stream = models.Stream{ var stream = models.Stream{
URL: form.URL, URL: form.URL,

@ -232,7 +232,7 @@ func (pusher *Pusher) Start() {
if pusher.gopCacheEnable && pack.Type == RTP_TYPE_VIDEO { if pusher.gopCacheEnable && pack.Type == RTP_TYPE_VIDEO {
pusher.gopCacheLock.Lock() pusher.gopCacheLock.Lock()
if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && pusher.shouldSequeceStart(rtp) { if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && pusher.shouldSequenceStart(rtp) {
pusher.gopCache = make([]*RTPPack, 0) pusher.gopCache = make([]*RTPPack, 0)
} }
pusher.gopCache = append(pusher.gopCache, pack) pusher.gopCache = append(pusher.gopCache, pack)
@ -318,7 +318,7 @@ func (pusher *Pusher) ClearPlayer() {
} }
} }
func (pusher *Pusher) shouldSequeceStart(rtp *RTPInfo) bool { func (pusher *Pusher) shouldSequenceStart(rtp *RTPInfo) bool {
if strings.EqualFold(pusher.VCodec(), "h264") { if strings.EqualFold(pusher.VCodec(), "h264") {
var realNALU uint8 var realNALU uint8
payloadHeader := rtp.Payload[0] //https://tools.ietf.org/html/rfc6184#section-5.2 payloadHeader := rtp.Payload[0] //https://tools.ietf.org/html/rfc6184#section-5.2

@ -187,20 +187,33 @@ func (server *Server) Stop() {
close(server.removePusherCh) close(server.removePusherCh)
} }
func (server *Server) AddPusher(pusher *Pusher) { func (server *Server) AddPusher(pusher *Pusher, closeOld bool) bool {
logger := server.logger logger := server.logger
added := false added := false
server.pushersLock.Lock() server.pushersLock.Lock()
if _, ok := server.pushers[pusher.Path()]; !ok { old, ok := server.pushers[pusher.Path()]
if !ok {
server.pushers[pusher.Path()] = pusher server.pushers[pusher.Path()] = pusher
go pusher.Start()
logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers))
added = true added = true
} else {
if closeOld {
server.pushers[pusher.Path()] = pusher
logger.Printf("%v start, replace old pusher", pusher)
added = true
}
} }
server.pushersLock.Unlock() server.pushersLock.Unlock()
if ok && closeOld {
logger.Printf("old pusher %v stoped", pusher)
old.Stop()
server.removePusherCh <- old
}
if added { if added {
go pusher.Start()
server.addPusherCh <- pusher server.addPusherCh <- pusher
} }
return added
} }
func (server *Server) RemovePusher(pusher *Pusher) { func (server *Server) RemovePusher(pusher *Pusher) {

@ -104,6 +104,7 @@ type Session struct {
authorizationEnable bool authorizationEnable bool
nonce string nonce string
closeOld bool
AControl string AControl string
VControl string VControl string
@ -140,6 +141,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0) authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0)
close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0)
session := &Session{ session := &Session{
ID: shortid.MustGenerate(), ID: shortid.MustGenerate(),
Server: server, Server: server,
@ -154,6 +156,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
vRTPControlChannel: -1, vRTPControlChannel: -1,
aRTPChannel: -1, aRTPChannel: -1,
aRTPControlChannel: -1, aRTPControlChannel: -1,
closeOld: close_old != 0,
} }
session.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", session.ID), log.LstdFlags|log.Lshortfile) session.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", session.ID), log.LstdFlags|log.Lshortfile)
@ -365,13 +368,13 @@ func (session *Session) handleRequest(req *Request) {
session.connWLock.Unlock() session.connWLock.Unlock()
session.OutBytes += len(outBytes) session.OutBytes += len(outBytes)
switch req.Method { switch req.Method {
case "PLAY", "RECORD": // case "PLAY", "RECORD":
switch session.Type { // switch session.Type {
case SESSEION_TYPE_PLAYER: // case SESSEION_TYPE_PLAYER:
session.Pusher.AddPlayer(session.Player) // session.Pusher.AddPlayer(session.Player)
case SESSION_TYPE_PUSHER: // case SESSION_TYPE_PUSHER:
session.Server.AddPusher(session.Pusher) // session.Server.AddPusher(session.Pusher)
} // }
case "TEARDOWN": case "TEARDOWN":
{ {
session.Stop() session.Stop()
@ -435,12 +438,12 @@ func (session *Session) handleRequest(req *Request) {
logger.Printf("video codec[%s]\n", session.VCodec) logger.Printf("video codec[%s]\n", session.VCodec)
} }
session.Pusher = NewPusher(session) session.Pusher = NewPusher(session)
if session.Server.GetPusher(session.Path) == nil {
session.Server.AddPusher(session.Pusher) addedToServer := session.Server.AddPusher(session.Pusher, session.closeOld)
} else { if !addedToServer {
logger.Printf("reject pusher.")
res.StatusCode = 406 res.StatusCode = 406
res.Status = "Not Acceptable" res.Status = "Not Acceptable"
return
} }
case "DESCRIBE": case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER session.Type = SESSEION_TYPE_PLAYER

Loading…
Cancel
Save