diff --git a/main.go b/main.go index 5cac80ad..0f861195 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/penggy/EasyGoLib/db" + "github.com/EasyDarwin/EasyDarwin/models" "github.com/EasyDarwin/EasyDarwin/routers" "github.com/EasyDarwin/EasyDarwin/rtsp" @@ -108,6 +110,11 @@ func (p *program) Start(s service.Service) (err error) { } p.StartRTSP() p.StartHTTP() + + if !utils.Debug { + log.Println("log files -->", utils.LogDir()) + log.SetOutput(utils.GetLogWriter()) + } go func() { for range routers.API.RestartChan { p.StopHTTP() @@ -117,6 +124,44 @@ func (p *program) Start(s service.Service) (err error) { p.StartHTTP() } }() + + go func() { + log.Printf("demon pull streams") + for { + var streams []models.Stream + db.SQLite.Find(&streams) + if err := db.SQLite.Find(&streams).Error; err != nil { + log.Printf("find stream err:%v", err) + return + } + for i := len(streams) - 1; i > -1; i-- { + v := streams[i] + agent := fmt.Sprintf("EasyDarwinGo/%s", routers.BuildVersion) + if routers.BuildDateTime != "" { + agent = fmt.Sprintf("%s(%s)", agent, routers.BuildDateTime) + } + client, err := rtsp.NewRTSPClient(rtsp.GetServer(), v.URL, int64(v.HeartbeatInterval)*1000, agent) + if err != nil { + continue + } + client.CustomPath = v.CustomPath + + pusher := rtsp.NewClientPusher(client) + if rtsp.GetServer().GetPusher(pusher.Path()) != nil { + continue + } + err = client.Start(time.Duration(v.IdleTimeout) * time.Second) + if err != nil { + log.Printf("Pull stream err :%v", err) + continue + } + rtsp.GetServer().AddPusher(pusher) + //streams = streams[0:i] + //streams = append(streams[:i], streams[i+1:]...) + } + time.Sleep(10 * time.Second) + } + }() return } @@ -137,12 +182,7 @@ func main() { // log log.SetPrefix("[EasyDarwin] ") log.SetFlags(log.Lshortfile | log.LstdFlags) - if !utils.Debug { - log.Println("log files -->", utils.LogDir()) - log.Printf("git commit code:%s", gitCommitCode) - log.Printf("build date:%s", buildDateTime) - log.SetOutput(utils.GetLogWriter()) - } + log.Printf("git commit code:%s", gitCommitCode) log.Printf("build date:%s", buildDateTime) routers.BuildVersion = fmt.Sprintf("%s.%s", routers.BuildVersion, gitCommitCode) diff --git a/models/models.go b/models/models.go index 90d9c490..5290bb02 100644 --- a/models/models.go +++ b/models/models.go @@ -10,7 +10,7 @@ func Init() (err error) { if err != nil { return } - db.SQLite.AutoMigrate(User{}) + db.SQLite.AutoMigrate(User{}, Stream{}) count := 0 sec := utils.Conf().Section("http") defUser := sec.Key("default_username").MustString("admin") diff --git a/models/stream.go b/models/stream.go new file mode 100644 index 00000000..6011a4bf --- /dev/null +++ b/models/stream.go @@ -0,0 +1,8 @@ +package models + +type Stream struct { + URL string `gorm:"type:varchar(256);primary_key;unique"` + CustomPath string `gorm:"type:varchar(256)"` + IdleTimeout int + HeartbeatInterval int +} diff --git a/routers/record.go b/routers/record.go index b3f8594b..c35007ae 100644 --- a/routers/record.go +++ b/routers/record.go @@ -139,7 +139,7 @@ func (h *APIHandler) RecordFiles(c *gin.Context) { if info.Name() == ".DS_Store" { return nil } - if !strings.HasSuffix(info.Name(), ".m3u8") { + if !strings.HasSuffix(strings.ToLower(info.Name()), ".m3u8") && !strings.HasSuffix(strings.ToLower(info.Name()), ".ts") { return nil } cmd := exec.Command(ffprobe, "-i", path) diff --git a/routers/streams.go b/routers/streams.go index 8cc57a43..55d054ef 100644 --- a/routers/streams.go +++ b/routers/streams.go @@ -2,6 +2,8 @@ package routers import ( "fmt" + "github.com/EasyDarwin/EasyDarwin/models" + "github.com/penggy/EasyGoLib/db" "log" "net/http" "strings" @@ -65,6 +67,18 @@ func (h *APIHandler) StreamStart(c *gin.Context) { } log.Printf("Pull to push %v success ", form) rtsp.GetServer().AddPusher(pusher) + // save to db. + var stream = models.Stream{ + URL: form.URL, + CustomPath: form.CustomPath, + IdleTimeout: form.IdleTimeout, + HeartbeatInterval: form.HeartbeatInterval, + } + if db.SQLite.Where(&models.Stream{URL: form.URL}).First(&models.Stream{}).RecordNotFound() { + db.SQLite.Create(&stream) + } else { + db.SQLite.Save(&stream) + } c.IndentedJSON(200, pusher.ID()) } @@ -91,6 +105,11 @@ func (h *APIHandler) StreamStop(c *gin.Context) { v.Stop() c.IndentedJSON(200, "OK") log.Printf("Stop %v success ", v) + if v.RTSPClient != nil { + var stream models.Stream + stream.URL = v.RTSPClient.URL + db.SQLite.Delete(stream) + } return } } diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index e2f731c7..bfb499f4 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -77,7 +77,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent URL: rawUrl, ID: shortid.MustGenerate(), Path: url.Path, - TransType: TRANS_TYPE_UDP, + TransType: TRANS_TYPE_TCP, vRTPChannel: 0, vRTPControlChannel: 1, aRTPChannel: 2, @@ -295,7 +295,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { return err } headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort) - client.Conn.timeout = 0 // UDP ignore timeout + client.Conn.timeout = 0 // UDP ignore timeout } if Session != "" { headers["Session"] = Session @@ -328,7 +328,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { return err } headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort) - client.Conn.timeout = 0 // UDP ignore timeout + client.Conn.timeout = 0 // UDP ignore timeout } if Session != "" { headers["Session"] = Session diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index e9f07b95..486da230 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -8,6 +8,7 @@ import ( "os/exec" "path" "strconv" + "strings" "sync" "syscall" "time" @@ -56,7 +57,7 @@ func (server *Server) Start() (err error) { localRecord := utils.Conf().Section("rtsp").Key("save_stream_to_local").MustInt(0) ffmpeg := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("") m3u8_dir_path := utils.Conf().Section("rtsp").Key("m3u8_dir_path").MustString("") - ts_duration_second := utils.Conf().Section("rtsp").Key("ts_duration_second").MustInt(10 * 60) + ts_duration_second := utils.Conf().Section("rtsp").Key("ts_duration_second").MustInt(6) SaveStreamToLocal := false if (len(ffmpeg) > 0) && localRecord > 0 && len(m3u8_dir_path) > 0 { err := utils.EnsureDir(m3u8_dir_path) @@ -80,16 +81,23 @@ func (server *Server) Start() (err error) { case pusher, addChnOk = <-server.addPusherCh: if SaveStreamToLocal { if addChnOk { - dir := path.Join(m3u8_dir_path, pusher.Path(), time.Now().Format("20060102150405")) + dir := path.Join(m3u8_dir_path, pusher.Path(), time.Now().Format("20060102")) err := utils.EnsureDir(dir) if err != nil { logger.Printf("EnsureDir:[%s] err:%v.", dir, err) continue } m3u8path := path.Join(dir, fmt.Sprintf("out.m3u8")) + port := pusher.Server().TCPPort + rtsp := fmt.Sprintf("rtsp://localhost:%d%s", port, pusher.Path()) + paramStr := utils.Conf().Section("rtsp").Key(pusher.Path()).MustString("-c:v copy -c:a aac") + params := []string{"-fflags", "genpts", "-rtsp_transport", "tcp", "-i", rtsp, "-hls_time", strconv.Itoa(ts_duration_second), "-hls_list_size", "0", m3u8path} + if paramStr != "default" { + paramsOfThisPath := strings.Split(paramStr, " ") + params = append(params[:6], append(paramsOfThisPath, params[6:]...)...) + } // ffmpeg -i ~/Downloads/720p.mp4 -s 640x360 -g 15 -c:a aac -hls_time 5 -hls_list_size 0 record.m3u8 - - cmd := exec.Command(ffmpeg, "-fflags", "genpts", "-rtsp_transport", "tcp", "-i", pusher.URL(), "-c:v", "copy", "-hls_time", strconv.Itoa(ts_duration_second), "-hls_list_size", "0", m3u8path) + cmd := exec.Command(ffmpeg, params...) f, err := os.OpenFile(path.Join(dir, fmt.Sprintf("log.txt")), os.O_RDWR|os.O_CREATE, 0755) if err == nil { cmd.Stdout = f