You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
EasyDarwin/rtsp/pusher.go

141 lines
3.3 KiB

6 years ago
package rtsp
import (
"log"
"strings"
"sync"
"github.com/penggy/EasyGoLib/utils"
)
type Pusher struct {
*Session
players map[string]*Player //SessionID <-> Player
playersLock sync.RWMutex
gopCacheEnable bool
gopCache []*RTPPack
gopCacheLock sync.RWMutex
UDPServer *UDPServer
cond *sync.Cond
queue []*RTPPack
}
func NewPusher(session *Session) (pusher *Pusher) {
pusher = &Pusher{
Session: session,
players: make(map[string]*Player),
gopCacheEnable: utils.Conf().Section("rtsp").Key("gop_cache_enable").MustBool(true),
gopCache: make([]*RTPPack, 0),
cond: sync.NewCond(&sync.Mutex{}),
queue: make([]*RTPPack, 0),
}
session.RTPHandles = append(session.RTPHandles, func(pack *RTPPack) {
pusher.QueueRTP(pack)
})
session.StopHandles = append(session.StopHandles, func() {
pusher.Server.RemovePusher(pusher)
pusher.cond.Broadcast()
if pusher.UDPServer != nil {
pusher.UDPServer.Stop()
pusher.UDPServer = nil
}
})
return
}
func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher {
pusher.cond.L.Lock()
pusher.queue = append(pusher.queue, pack)
pusher.cond.Signal()
pusher.cond.L.Unlock()
return pusher
}
func (pusher *Pusher) Start() {
for !pusher.Stoped {
var pack *RTPPack
pusher.cond.L.Lock()
if len(pusher.queue) == 0 {
pusher.cond.Wait()
}
if len(pusher.queue) > 0 {
pack = pusher.queue[0]
pusher.queue = pusher.queue[1:]
}
pusher.cond.L.Unlock()
if pack == nil {
if !pusher.Stoped {
log.Printf("pusher not stoped, but queue take out nil pack")
}
continue
}
if pusher.gopCacheEnable {
pusher.gopCacheLock.Lock()
if strings.EqualFold(pusher.VCodec, "h264") {
if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStart() {
pusher.gopCache = make([]*RTPPack, 0)
}
pusher.gopCache = append(pusher.gopCache, pack)
} else if strings.EqualFold(pusher.VCodec, "h265") {
if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStartH265() {
pusher.gopCache = make([]*RTPPack, 0)
}
pusher.gopCache = append(pusher.gopCache, pack)
}
pusher.gopCacheLock.Unlock()
}
pusher.BroadcastRTP(pack)
}
}
func (pusher *Pusher) BroadcastRTP(pack *RTPPack) *Pusher {
for _, player := range pusher.GetPlayers() {
player.QueueRTP(pack)
pusher.OutBytes += pack.Buffer.Len()
}
return pusher
}
func (pusher *Pusher) GetPlayers() (players map[string]*Player) {
players = make(map[string]*Player)
pusher.playersLock.RLock()
for k, v := range pusher.players {
players[k] = v
}
pusher.playersLock.RUnlock()
return
}
func (pusher *Pusher) AddPlayer(player *Player) *Pusher {
if pusher.gopCacheEnable {
pusher.gopCacheLock.RLock()
for _, pack := range pusher.gopCache {
player.QueueRTP(pack)
pusher.OutBytes += pack.Buffer.Len()
}
pusher.gopCacheLock.RUnlock()
}
pusher.playersLock.Lock()
if _, ok := pusher.players[player.ID]; !ok {
pusher.players[player.ID] = player
go player.Start()
log.Printf("%v start, now player size[%d]", player, len(pusher.players))
}
pusher.playersLock.Unlock()
return pusher
}
func (pusher *Pusher) RemovePlayer(player *Player) *Pusher {
pusher.playersLock.Lock()
delete(pusher.players, player.ID)
log.Printf("%v end, now player size[%d]\n", player, len(pusher.players))
pusher.playersLock.Unlock()
return pusher
}