mirror of https://github.com/EasyDarwin/EasyDarwin
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
5.6 KiB
217 lines
5.6 KiB
package rtsp |
|
|
|
import ( |
|
"fmt" |
|
"log" |
|
"net" |
|
"os" |
|
"os/exec" |
|
"path" |
|
"sync" |
|
"syscall" |
|
"time" |
|
|
|
"github.com/penggy/EasyGoLib/utils" |
|
) |
|
|
|
type Server struct { |
|
TCPListener *net.TCPListener |
|
TCPPort int |
|
Stoped bool |
|
pushers map[string]*Pusher // Path <-> Pusher |
|
pushersLock sync.RWMutex |
|
addPusherCh chan *Pusher |
|
removePusherCh chan *Pusher |
|
} |
|
|
|
var Instance *Server = nil |
|
|
|
func GetServer() *Server { |
|
if Instance == nil { |
|
Instance = &Server{ |
|
Stoped: true, |
|
TCPPort: utils.Conf().Section("rtsp").Key("port").MustInt(554), |
|
pushers: make(map[string]*Pusher), |
|
addPusherCh: make(chan *Pusher), |
|
removePusherCh: make(chan *Pusher), |
|
} |
|
} |
|
return Instance |
|
} |
|
|
|
func (server *Server) Start() (err error) { |
|
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", server.TCPPort)) |
|
if err != nil { |
|
return |
|
} |
|
listener, err := net.ListenTCP("tcp", addr) |
|
if err != nil { |
|
return |
|
} |
|
|
|
localRecord := utils.Conf().Section("rtsp").Key("save_stream_to_mp4").MustInt(0) |
|
ffmpeg := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("") |
|
mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("") |
|
SaveStreamToLocal := false |
|
if (len(ffmpeg) > 0) && localRecord > 0 && len(mp4Path) > 0 { |
|
err := utils.EnsureDir(mp4Path) |
|
if err != nil { |
|
log.Printf("Create mp4_dir_path[%s] err:%v.", mp4Path, err) |
|
} else { |
|
SaveStreamToLocal = true |
|
} |
|
} |
|
go func() { // save to local. |
|
pusher2ffmpegMap := make(map[*Pusher]*exec.Cmd) |
|
if SaveStreamToLocal { |
|
log.Printf("Prepare to save stream to local....") |
|
defer log.Printf("End save stream to local....") |
|
} |
|
var pusher *Pusher |
|
addChnOk := true |
|
removeChnOk := true |
|
for addChnOk || removeChnOk { |
|
select { |
|
case pusher, addChnOk = <-server.addPusherCh: |
|
if SaveStreamToLocal { |
|
if addChnOk { |
|
dir := path.Join(mp4Path, pusher.Path()) |
|
err := utils.EnsureDir(dir) |
|
if err != nil { |
|
log.Printf("EnsureDir:[%s] err:%v.", dir, err) |
|
continue |
|
} |
|
path := path.Join(dir, fmt.Sprintf("%s.mp4", time.Now().Format("20060102150405"))) |
|
cmd := exec.Command(ffmpeg, "-i", pusher.URL(), "-c:v", "copy", "-c:a", "copy", path) |
|
cmd.Stdout = os.Stdout |
|
cmd.Stderr = os.Stderr |
|
err = cmd.Start() |
|
if err != nil { |
|
log.Printf("Start ffmpeg err:%v", err) |
|
} |
|
pusher2ffmpegMap[pusher] = cmd |
|
log.Printf("add ffmpeg to pull stream from pusher[%v]", pusher) |
|
} else { |
|
log.Printf("addPusherChan closed") |
|
} |
|
} |
|
case pusher, removeChnOk = <-server.removePusherCh: |
|
if SaveStreamToLocal { |
|
if removeChnOk { |
|
cmd := pusher2ffmpegMap[pusher] |
|
proc := cmd.Process |
|
if proc != nil { |
|
log.Printf("prepare to SIGTERM to process:%v", proc) |
|
proc.Signal(syscall.SIGTERM) |
|
// proc.Kill() |
|
} |
|
delete(pusher2ffmpegMap, pusher) |
|
log.Printf("delete ffmpeg from pull stream from pusher[%v]", pusher) |
|
} else { |
|
for _, cmd := range pusher2ffmpegMap { |
|
proc := cmd.Process |
|
if proc != nil { |
|
log.Printf("prepare to SIGTERM to process:%v", proc) |
|
proc.Signal(syscall.SIGTERM) |
|
} |
|
} |
|
pusher2ffmpegMap = make(map[*Pusher]*exec.Cmd) |
|
log.Printf("removePusherChan closed") |
|
} |
|
} |
|
} |
|
} |
|
}() |
|
|
|
server.Stoped = false |
|
server.TCPListener = listener |
|
log.Println("rtsp server start on", server.TCPPort) |
|
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) |
|
for !server.Stoped { |
|
conn, err := server.TCPListener.Accept() |
|
if err != nil { |
|
log.Println(err) |
|
continue |
|
} |
|
if tcpConn, ok := conn.(*net.TCPConn); ok { |
|
if err := tcpConn.SetReadBuffer(networkBuffer); err != nil { |
|
log.Printf("rtsp server conn set read buffer error, %v", err) |
|
} |
|
if err := tcpConn.SetWriteBuffer(networkBuffer); err != nil { |
|
log.Printf("rtsp server conn set write buffer error, %v", err) |
|
} |
|
} |
|
|
|
session := NewSession(server, conn) |
|
go session.Start() |
|
} |
|
return |
|
} |
|
|
|
func (server *Server) Stop() { |
|
log.Println("rtsp server stop on", server.TCPPort) |
|
server.Stoped = true |
|
if server.TCPListener != nil { |
|
server.TCPListener.Close() |
|
server.TCPListener = nil |
|
} |
|
server.pushersLock.Lock() |
|
server.pushers = make(map[string]*Pusher) |
|
server.pushersLock.Unlock() |
|
|
|
close(server.addPusherCh) |
|
close(server.removePusherCh) |
|
} |
|
|
|
func (server *Server) AddPusher(pusher *Pusher) { |
|
added := false |
|
server.pushersLock.Lock() |
|
if _, ok := server.pushers[pusher.Path()]; !ok { |
|
server.pushers[pusher.Path()] = pusher |
|
go pusher.Start() |
|
log.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) |
|
added = true |
|
} |
|
server.pushersLock.Unlock() |
|
if added { |
|
server.addPusherCh <- pusher |
|
} |
|
} |
|
|
|
func (server *Server) RemovePusher(pusher *Pusher) { |
|
removed := false |
|
server.pushersLock.Lock() |
|
if _pusher, ok := server.pushers[pusher.Path()]; ok && pusher.ID() == _pusher.ID() { |
|
delete(server.pushers, pusher.Path()) |
|
log.Printf("%v end, now pusher size[%d]\n", pusher, len(server.pushers)) |
|
removed = true |
|
} |
|
server.pushersLock.Unlock() |
|
if removed { |
|
server.removePusherCh <- pusher |
|
} |
|
} |
|
|
|
func (server *Server) GetPusher(path string) (pusher *Pusher) { |
|
server.pushersLock.RLock() |
|
pusher = server.pushers[path] |
|
server.pushersLock.RUnlock() |
|
return |
|
} |
|
|
|
func (server *Server) GetPushers() (pushers map[string]*Pusher) { |
|
pushers = make(map[string]*Pusher) |
|
server.pushersLock.RLock() |
|
for k, v := range server.pushers { |
|
pushers[k] = v |
|
} |
|
server.pushersLock.RUnlock() |
|
return |
|
} |
|
|
|
func (server *Server) GetPusherSize() (size int) { |
|
server.pushersLock.RLock() |
|
size = len(server.pushers) |
|
server.pushersLock.RUnlock() |
|
return |
|
}
|
|
|