persist pull streams

add pull stream demon
pull/132/head
macbookpro 2019-01-06 19:12:17 +08:00
parent f156df5683
commit 3de6502dda
7 changed files with 90 additions and 15 deletions

52
main.go
View File

@ -9,6 +9,8 @@ import (
"strings"
"time"
"github.com/penggy/EasyGoLib/db"
"github.com/EasyDarwin/EasyDarwin/models"
"github.com/EasyDarwin/EasyDarwin/routers"
"github.com/EasyDarwin/EasyDarwin/rtsp"
@ -108,6 +110,11 @@ func (p *program) Start(s service.Service) (err error) {
}
p.StartRTSP()
p.StartHTTP()
if !utils.Debug {
log.Println("log files -->", utils.LogDir())
log.SetOutput(utils.GetLogWriter())
}
go func() {
for range routers.API.RestartChan {
p.StopHTTP()
@ -117,6 +124,44 @@ func (p *program) Start(s service.Service) (err error) {
p.StartHTTP()
}
}()
go func() {
log.Printf("demon pull streams")
for {
var streams []models.Stream
db.SQLite.Find(&streams)
if err := db.SQLite.Find(&streams).Error; err != nil {
log.Printf("find stream err:%v", err)
return
}
for i := len(streams) - 1; i > -1; i-- {
v := streams[i]
agent := fmt.Sprintf("EasyDarwinGo/%s", routers.BuildVersion)
if routers.BuildDateTime != "" {
agent = fmt.Sprintf("%s(%s)", agent, routers.BuildDateTime)
}
client, err := rtsp.NewRTSPClient(rtsp.GetServer(), v.URL, int64(v.HeartbeatInterval)*1000, agent)
if err != nil {
continue
}
client.CustomPath = v.CustomPath
pusher := rtsp.NewClientPusher(client)
if rtsp.GetServer().GetPusher(pusher.Path()) != nil {
continue
}
err = client.Start(time.Duration(v.IdleTimeout) * time.Second)
if err != nil {
log.Printf("Pull stream err :%v", err)
continue
}
rtsp.GetServer().AddPusher(pusher)
//streams = streams[0:i]
//streams = append(streams[:i], streams[i+1:]...)
}
time.Sleep(10 * time.Second)
}
}()
return
}
@ -137,12 +182,7 @@ func main() {
// log
log.SetPrefix("[EasyDarwin] ")
log.SetFlags(log.Lshortfile | log.LstdFlags)
if !utils.Debug {
log.Println("log files -->", utils.LogDir())
log.Printf("git commit code:%s", gitCommitCode)
log.Printf("build date:%s", buildDateTime)
log.SetOutput(utils.GetLogWriter())
}
log.Printf("git commit code:%s", gitCommitCode)
log.Printf("build date:%s", buildDateTime)
routers.BuildVersion = fmt.Sprintf("%s.%s", routers.BuildVersion, gitCommitCode)

View File

@ -10,7 +10,7 @@ func Init() (err error) {
if err != nil {
return
}
db.SQLite.AutoMigrate(User{})
db.SQLite.AutoMigrate(User{}, Stream{})
count := 0
sec := utils.Conf().Section("http")
defUser := sec.Key("default_username").MustString("admin")

8
models/stream.go Normal file
View File

@ -0,0 +1,8 @@
package models
type Stream struct {
URL string `gorm:"type:varchar(256);primary_key;unique"`
CustomPath string `gorm:"type:varchar(256)"`
IdleTimeout int
HeartbeatInterval int
}

View File

@ -139,7 +139,7 @@ func (h *APIHandler) RecordFiles(c *gin.Context) {
if info.Name() == ".DS_Store" {
return nil
}
if !strings.HasSuffix(info.Name(), ".m3u8") {
if !strings.HasSuffix(strings.ToLower(info.Name()), ".m3u8") && !strings.HasSuffix(strings.ToLower(info.Name()), ".ts") {
return nil
}
cmd := exec.Command(ffprobe, "-i", path)

View File

@ -2,6 +2,8 @@ package routers
import (
"fmt"
"github.com/EasyDarwin/EasyDarwin/models"
"github.com/penggy/EasyGoLib/db"
"log"
"net/http"
"strings"
@ -65,6 +67,18 @@ func (h *APIHandler) StreamStart(c *gin.Context) {
}
log.Printf("Pull to push %v success ", form)
rtsp.GetServer().AddPusher(pusher)
// save to db.
var stream = models.Stream{
URL: form.URL,
CustomPath: form.CustomPath,
IdleTimeout: form.IdleTimeout,
HeartbeatInterval: form.HeartbeatInterval,
}
if db.SQLite.Where(&models.Stream{URL: form.URL}).First(&models.Stream{}).RecordNotFound() {
db.SQLite.Create(&stream)
} else {
db.SQLite.Save(&stream)
}
c.IndentedJSON(200, pusher.ID())
}
@ -91,6 +105,11 @@ func (h *APIHandler) StreamStop(c *gin.Context) {
v.Stop()
c.IndentedJSON(200, "OK")
log.Printf("Stop %v success ", v)
if v.RTSPClient != nil {
var stream models.Stream
stream.URL = v.RTSPClient.URL
db.SQLite.Delete(stream)
}
return
}
}

View File

@ -77,7 +77,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
URL: rawUrl,
ID: shortid.MustGenerate(),
Path: url.Path,
TransType: TRANS_TYPE_UDP,
TransType: TRANS_TYPE_TCP,
vRTPChannel: 0,
vRTPControlChannel: 1,
aRTPChannel: 2,
@ -295,7 +295,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" {
headers["Session"] = Session
@ -328,7 +328,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" {
headers["Session"] = Session

View File

@ -8,6 +8,7 @@ import (
"os/exec"
"path"
"strconv"
"strings"
"sync"
"syscall"
"time"
@ -56,7 +57,7 @@ func (server *Server) Start() (err error) {
localRecord := utils.Conf().Section("rtsp").Key("save_stream_to_local").MustInt(0)
ffmpeg := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("")
m3u8_dir_path := utils.Conf().Section("rtsp").Key("m3u8_dir_path").MustString("")
ts_duration_second := utils.Conf().Section("rtsp").Key("ts_duration_second").MustInt(10 * 60)
ts_duration_second := utils.Conf().Section("rtsp").Key("ts_duration_second").MustInt(6)
SaveStreamToLocal := false
if (len(ffmpeg) > 0) && localRecord > 0 && len(m3u8_dir_path) > 0 {
err := utils.EnsureDir(m3u8_dir_path)
@ -80,16 +81,23 @@ func (server *Server) Start() (err error) {
case pusher, addChnOk = <-server.addPusherCh:
if SaveStreamToLocal {
if addChnOk {
dir := path.Join(m3u8_dir_path, pusher.Path(), time.Now().Format("20060102150405"))
dir := path.Join(m3u8_dir_path, pusher.Path(), time.Now().Format("20060102"))
err := utils.EnsureDir(dir)
if err != nil {
logger.Printf("EnsureDir:[%s] err:%v.", dir, err)
continue
}
m3u8path := path.Join(dir, fmt.Sprintf("out.m3u8"))
port := pusher.Server().TCPPort
rtsp := fmt.Sprintf("rtsp://localhost:%d%s", port, pusher.Path())
paramStr := utils.Conf().Section("rtsp").Key(pusher.Path()).MustString("-c:v copy -c:a aac")
params := []string{"-fflags", "genpts", "-rtsp_transport", "tcp", "-i", rtsp, "-hls_time", strconv.Itoa(ts_duration_second), "-hls_list_size", "0", m3u8path}
if paramStr != "default" {
paramsOfThisPath := strings.Split(paramStr, " ")
params = append(params[:6], append(paramsOfThisPath, params[6:]...)...)
}
// ffmpeg -i ~/Downloads/720p.mp4 -s 640x360 -g 15 -c:a aac -hls_time 5 -hls_list_size 0 record.m3u8
cmd := exec.Command(ffmpeg, "-fflags", "genpts", "-rtsp_transport", "tcp", "-i", pusher.URL(), "-c:v", "copy", "-hls_time", strconv.Itoa(ts_duration_second), "-hls_list_size", "0", m3u8path)
cmd := exec.Command(ffmpeg, params...)
f, err := os.OpenFile(path.Join(dir, fmt.Sprintf("log.txt")), os.O_RDWR|os.O_CREATE, 0755)
if err == nil {
cmd.Stdout = f