diff --git a/routers/stats.go b/routers/stats.go index 77503ae2..a9189ff0 100644 --- a/routers/stats.go +++ b/routers/stats.go @@ -60,7 +60,9 @@ func (h *APIHandler) Pushers(c *gin.Context) { } pushers = append(pushers, map[string]interface{}{ "id": pusher.ID(), - "path": rtsp, + "url": rtsp, + "path": pusher.Path(), + "source": pusher.Source(), "transType": pusher.TransType(), "inBytes": pusher.InBytes(), "outBytes": pusher.OutBytes(), diff --git a/routers/streams.go b/routers/streams.go index 38c82ec6..9766a358 100644 --- a/routers/streams.go +++ b/routers/streams.go @@ -3,9 +3,6 @@ package routers import ( "bytes" "fmt" - "github.com/EasyDarwin/EasyDarwin/rtsp" - "github.com/gin-gonic/gin" - "github.com/penggy/EasyGoLib/utils" "log" "math" "net/http" @@ -16,12 +13,18 @@ import ( "strconv" "strings" "time" + + "github.com/EasyDarwin/EasyDarwin/rtsp" + "github.com/gin-gonic/gin" + "github.com/penggy/EasyGoLib/utils" ) func (h *APIHandler) StreamStart(c *gin.Context) { type Form struct { - URL string `form:"url" binding:"required"` - IdleTimeout int `form:"idleTimeout"` + URL string `form:"url" binding:"required"` + CustomPath string `form:"customPath"` + IdleTimeout int `form:"idleTimeout"` + HeartbeatInterval int `form:"heartbeatInterval"` } var form Form err := c.Bind(&form) @@ -29,8 +32,21 @@ func (h *APIHandler) StreamStart(c *gin.Context) { log.Printf("Pull to push err:%v", err) return } - client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, 0) + client, err := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, int64(form.HeartbeatInterval)*1000) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, err.Error()) + return + } + if form.CustomPath != "" && !strings.HasPrefix(form.CustomPath, "/") { + form.CustomPath = "/" + form.CustomPath + } + client.CustomPath = form.CustomPath + pusher := rtsp.NewClientPusher(client) + if rtsp.GetServer().GetPusher(pusher.Path()) != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, fmt.Sprintf("Path %s already exists", client.Path)) + return + } err = client.Start(time.Duration(form.IdleTimeout) * time.Second) if err != nil { log.Printf("Pull stream err :%v", err) @@ -57,7 +73,6 @@ func (h *APIHandler) StreamStop(c *gin.Context) { if v.ID() == form.ID { v.Stop() c.IndentedJSON(200, "OK") - log.Printf("Stop %v success ", v) return } diff --git a/rtsp/pusher.go b/rtsp/pusher.go index a4a12d10..528047ba 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -55,6 +55,9 @@ func (pusher *Pusher) Path() string { if pusher.Session != nil { return pusher.Session.Path } + if pusher.RTSPClient.CustomPath != "" { + return pusher.RTSPClient.CustomPath + } return pusher.RTSPClient.Path } @@ -136,6 +139,13 @@ func (pusher *Pusher) StartAt() time.Time { return pusher.RTSPClient.StartAt } +func (pusher *Pusher) Source() string { + if pusher.Session != nil { + return pusher.Session.URL + } + return pusher.RTSPClient.URL +} + func NewClientPusher(client *RTSPClient) (pusher *Pusher) { pusher = &Pusher{ RTSPClient: client, diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index 20189ab4..fb86c951 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -13,6 +13,8 @@ import ( "strings" "time" + "github.com/teris-io/shortid" + "github.com/penggy/EasyGoLib/utils" "github.com/pixelbender/go-sdp/sdp" @@ -24,6 +26,7 @@ type RTSPClient struct { Status string URL string Path string + CustomPath string //custom path for pusher ID string Conn net.Conn AuthHeaders bool @@ -56,24 +59,25 @@ func (client *RTSPClient) String() string { return fmt.Sprintf("client[%s]", client.URL) } -func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) *RTSPClient { +func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) (client *RTSPClient, err error) { url, err := url.Parse(rawUrl) if err != nil { - return nil + return } - client := &RTSPClient{ + client = &RTSPClient{ Server: server, Stoped: false, URL: rawUrl, - ID: url.Path, + ID: shortid.MustGenerate(), Path: url.Path, vRTPChannel: 0, vRTPControlChannel: 1, aRTPChannel: 2, aRTPControlChannel: 3, OptionIntervalMillis: sendOptionMillis, + StartAt: time.Now(), } - return client + return } func (client *RTSPClient) Start(timeout time.Duration) error { @@ -197,7 +201,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { for !client.Stoped { if OptionIntervalMillis > 0 { elapse := time.Now().Sub(startTime) - if elapse > time.Duration(OptionIntervalMillis*int64(time.Millisecond)) { + if elapse > time.Duration(OptionIntervalMillis)*time.Millisecond { startTime = time.Now() headers := make(map[string]string) headers["Require"] = "implicit-play" diff --git a/web_src/components/PullRTSPDlg.vue b/web_src/components/PullRTSPDlg.vue index 1d59d4e5..44cd7c66 100644 --- a/web_src/components/PullRTSPDlg.vue +++ b/web_src/components/PullRTSPDlg.vue @@ -7,13 +7,27 @@ {{errors.first('url')}} -