From af2c2b152aba62957f7a17c925eabdd9e06377b8 Mon Sep 17 00:00:00 2001 From: penggy Date: Sat, 24 Nov 2018 10:46:37 +0800 Subject: [PATCH] pull & push ui --- routers/routers.go | 3 + routers/stats.go | 4 +- routers/streams.go | 41 ++ rtsp/pusher.go | 19 +- rtsp/rtsp-client.go | 5 +- vendor/github.com/pixelbender/go-sdp/LICENSE | 21 + .../pixelbender/go-sdp/sdp/attribute.go | 54 ++ .../pixelbender/go-sdp/sdp/decoder.go | 479 ++++++++++++++++++ .../pixelbender/go-sdp/sdp/encoder.go | 318 ++++++++++++ .../github.com/pixelbender/go-sdp/sdp/sdp.go | 192 +++++++ vendor/github.com/reactivex/rxgo/LICENSE | 22 + .../reactivex/rxgo/observable/observable.go | 415 +++++++++++++++ vendor/vendor.json | 14 +- web_src/components/PullRTSPDlg.vue | 2 +- web_src/components/PusherList.vue | 6 +- www/apidoc/api_project.js | 4 +- www/apidoc/api_project.json | 4 +- www/css/index.8178b6a2.css | 1 - www/css/index.eba7eb98.css | 1 + www/css/login.729a9cc8.css | 1 - www/css/login.cf365b39.css | 1 + www/images/boxed-bg.7799dece.jpg | Bin 123770 -> 0 bytes www/index.html | 4 +- www/js/index.8178b6a2.js | 1 - www/js/index.eba7eb98.js | 1 + www/js/login.729a9cc8.js | 1 - www/js/login.cf365b39.js | 1 + www/js/pushers.56479087.js | 1 + www/js/pushers.57d2daae.js | 1 - www/login.html | 4 +- 30 files changed, 1598 insertions(+), 23 deletions(-) create mode 100644 routers/streams.go create mode 100644 vendor/github.com/pixelbender/go-sdp/LICENSE create mode 100644 vendor/github.com/pixelbender/go-sdp/sdp/attribute.go create mode 100644 vendor/github.com/pixelbender/go-sdp/sdp/decoder.go create mode 100644 vendor/github.com/pixelbender/go-sdp/sdp/encoder.go create mode 100644 vendor/github.com/pixelbender/go-sdp/sdp/sdp.go create mode 100644 vendor/github.com/reactivex/rxgo/LICENSE create mode 100644 vendor/github.com/reactivex/rxgo/observable/observable.go delete mode 100644 www/css/index.8178b6a2.css create mode 100644 www/css/index.eba7eb98.css delete mode 100644 www/css/login.729a9cc8.css create mode 100644 www/css/login.cf365b39.css delete mode 100644 www/images/boxed-bg.7799dece.jpg delete mode 100644 www/js/index.8178b6a2.js create mode 100644 www/js/index.eba7eb98.js delete mode 100644 www/js/login.729a9cc8.js create mode 100644 www/js/login.cf365b39.js create mode 100644 www/js/pushers.56479087.js delete mode 100644 www/js/pushers.57d2daae.js diff --git a/routers/routers.go b/routers/routers.go index 2873e7a4..508cb24a 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -135,6 +135,9 @@ func Init() (err error) { api.GET("/pushers", API.Pushers) api.GET("/players", API.Players) + + api.GET("/stream/start", API.StreamStart) + api.GET("/stream/stop", API.StreamStop) } return diff --git a/routers/stats.go b/routers/stats.go index 39f4032a..77503ae2 100644 --- a/routers/stats.go +++ b/routers/stats.go @@ -61,10 +61,10 @@ func (h *APIHandler) Pushers(c *gin.Context) { pushers = append(pushers, map[string]interface{}{ "id": pusher.ID(), "path": rtsp, - "transType": pusher.TransType.String(), + "transType": pusher.TransType(), "inBytes": pusher.InBytes(), "outBytes": pusher.OutBytes(), - "startAt": utils.DateTime(pusher.StartAt), + "startAt": utils.DateTime(pusher.StartAt()), "onlines": len(pusher.GetPlayers()), }) } diff --git a/routers/streams.go b/routers/streams.go new file mode 100644 index 00000000..b1676de7 --- /dev/null +++ b/routers/streams.go @@ -0,0 +1,41 @@ +package routers + +import ( + "log" + + "github.com/EasyDarwin/EasyDarwin/rtsp" + "github.com/gin-gonic/gin" + "github.com/reactivex/rxgo/handlers" + "github.com/reactivex/rxgo/observer" +) + +func (h *APIHandler) StreamStart(c *gin.Context) { + type Form struct { + URL string `form:"url" binding:"required"` + IdleTimeout int `form:"idleTimeout"` + } + var form Form + err := c.Bind(&form) + if err != nil { + return + } + client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, int64(form.IdleTimeout)*1000) + pusher := rtsp.NewClientPusher(client) + rtsp.GetServer().AddPusher(pusher) + onNext := handlers.NextFunc(func(item interface{}) { + log.Printf("CLIENT:RTSP拉流成功:%v", item) + }) + onDone := handlers.DoneFunc(func() { + log.Println("CLIENT done") + }) + onError := handlers.ErrFunc(func(err error) { + log.Println("CLIENT Error :", err.Error()) + }) + watcher := observer.New(onNext, onDone, onError) + client.Start().Subscribe(watcher) + c.IndentedJSON(200, "OK") +} + +func (h *APIHandler) StreamStop(c *gin.Context) { + +} diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 992e5fd5..47e71df1 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -4,6 +4,7 @@ import ( "log" "strings" "sync" + "time" "github.com/penggy/EasyGoLib/utils" ) @@ -121,6 +122,20 @@ func (pusher *Pusher) OutBytes() int { return pusher.RTSPClient.OutBytes } +func (pusher *Pusher) TransType() string { + if pusher.Session != nil { + return pusher.Session.TransType.String() + } + return pusher.RTSPClient.TransType.String() +} + +func (pusher *Pusher) StartAt() time.Time { + if pusher.Session != nil { + return pusher.Session.StartAt + } + return pusher.RTSPClient.StartAt +} + func NewClientPusher(client *RTSPClient) (pusher *Pusher) { pusher = &Pusher{ RTSPClient: client, @@ -270,14 +285,14 @@ func (pusher *Pusher) ClearPlayer() { // copy a new map to avoid deadlock players := make(map[string]*Player) pusher.playersLock.Lock() - for k,v := range pusher.players { + for k, v := range pusher.players { //v.Stop() players[k] = v } pusher.players = make(map[string]*Player) pusher.playersLock.Unlock() - for _,v:=range players{ + for _, v := range players { v.Stop() } } diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index aeb8d8d5..0cb1b500 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -5,7 +5,6 @@ import ( "bytes" "encoding/binary" "fmt" - "github.com/penggy/EasyGoLib/utils" "io" "log" "net" @@ -14,6 +13,8 @@ import ( "strings" "time" + "github.com/penggy/EasyGoLib/utils" + "github.com/pixelbender/go-sdp/sdp" "github.com/reactivex/rxgo/observable" ) @@ -32,6 +33,8 @@ type RTSPClient struct { connRW *bufio.ReadWriter InBytes int OutBytes int + TransType TransType + StartAt time.Time Sdp *sdp.Session AControl string VControl string diff --git a/vendor/github.com/pixelbender/go-sdp/LICENSE b/vendor/github.com/pixelbender/go-sdp/LICENSE new file mode 100644 index 00000000..12558c04 --- /dev/null +++ b/vendor/github.com/pixelbender/go-sdp/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Vasily Vasilyev + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/pixelbender/go-sdp/sdp/attribute.go b/vendor/github.com/pixelbender/go-sdp/sdp/attribute.go new file mode 100644 index 00000000..6e7b0663 --- /dev/null +++ b/vendor/github.com/pixelbender/go-sdp/sdp/attribute.go @@ -0,0 +1,54 @@ +package sdp + +// Attributes represent a list of SDP attributes. +type Attributes []*Attr + +// Has returns presence of attribute by name. +func (a Attributes) Has(name string) bool { + for _, it := range a { + if it.Name == name { + return true + } + } + return false +} + +// Get returns first attribute value by name. +func (a Attributes) Get(name string) string { + for _, it := range a { + if it.Name == name { + return it.Value + } + } + return "" +} + +// Attr represents session or media attribute. +type Attr struct { + Name, Value string +} + +// NewAttr returns a=: attribute. +func NewAttr(attr, value string) *Attr { + return &Attr{attr, value} +} + +// NewAttrFlag returns a= attribute. +func NewAttrFlag(flag string) *Attr { + return &Attr{flag, ""} +} + +func (a *Attr) String() string { + if a.Value == "" { + return a.Name + } + return a.Name + ":" + a.Value +} + +// Session or media attribute values for indication of a streaming mode. +const ( + ModeSendRecv = "sendrecv" + ModeRecvOnly = "recvonly" + ModeSendOnly = "sendonly" + ModeInactive = "inactive" +) diff --git a/vendor/github.com/pixelbender/go-sdp/sdp/decoder.go b/vendor/github.com/pixelbender/go-sdp/sdp/decoder.go new file mode 100644 index 00000000..45348158 --- /dev/null +++ b/vendor/github.com/pixelbender/go-sdp/sdp/decoder.go @@ -0,0 +1,479 @@ +package sdp + +import ( + "bufio" + "errors" + "fmt" + "io" + "strconv" + "time" +) + +// Parse reads session description from the buffer. +func Parse(b []byte) (*Session, error) { + return ParseString(string(b)) +} + +// ParseString reads session description from the string. +func ParseString(s string) (*Session, error) { + return NewDecoderString(s).Decode() +} + +// A Decoder reads a session description from a stream. +type Decoder struct { + r lineReader + p []string +} + +// NewDecoder returns new decoder that reads from r. +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r: &reader{b: bufio.NewReaderSize(r, maxLineSize)}} +} + +// NewDecoderString returns new decoder that reads from s. +func NewDecoderString(s string) *Decoder { + return &Decoder{r: &stringReader{s: s}} +} + +// Decode encodes the session description. +func (d *Decoder) Decode() (*Session, error) { + line := 0 + sess := new(Session) + var media *Media + + for { + line++ + s, err := d.r.ReadLine() + if err != nil { + if err == io.EOF && sess.Origin != nil { + break + } + return nil, err + } + if len(s) == 0 && sess.Origin != nil { + break + } + if len(s) < 2 || s[1] != '=' { + return nil, &errDecode{errFormat, line, s} + } + f, v := s[0], s[2:] + if f == 'm' { + media = new(Media) + err = d.media(media, f, v) + if err == nil { + sess.Media = append(sess.Media, media) + } + } else if media == nil { + err = d.session(sess, f, v) + } else { + err = d.media(media, f, v) + } + if err != nil { + return nil, &errDecode{err, line, s} + } + } + return sess, nil +} + +func (d *Decoder) session(s *Session, f byte, v string) error { + var err error + switch f { + case 'v': + s.Version, err = strconv.Atoi(v) + case 'o': + if s.Origin != nil { + return errUnexpectedField + } + s.Origin, err = d.origin(v) + case 's': + s.Name = v + case 'i': + s.Information = v + case 'u': + s.URI = v + case 'e': + s.Email = append(s.Email, v) + case 'p': + s.Phone = append(s.Phone, v) + case 'c': + if s.Connection != nil { + return errUnexpectedField + } + s.Connection, err = d.connection(v) + case 'b': + if s.Bandwidth == nil { + s.Bandwidth = make(Bandwidth) + } + err = d.bandwidth(s.Bandwidth, v) + case 'z': + s.TimeZone, err = d.timezone(v) + case 'k': + s.Key = append(s.Key, d.key(v)) + case 'a': + a := d.attr(v) + switch a.Name { + case ModeInactive, ModeRecvOnly, ModeSendOnly, ModeSendRecv: + s.Mode = a.Name + default: + s.Attributes = append(s.Attributes, a) + } + case 't': + s.Timing, err = d.timing(v) + case 'r': + r, err := d.repeat(v) + if err != nil { + return err + } + s.Repeat = append(s.Repeat, r) + default: + return errUnexpectedField + } + return err +} + +func (d *Decoder) media(m *Media, f byte, v string) error { + var err error + switch f { + case 'm': + err = d.proto(m, v) + case 'i': + m.Information = v + case 'c': + conn, err := d.connection(v) + if err != nil { + return err + } + m.Connection = append(m.Connection, conn) + case 'b': + if m.Bandwidth == nil { + m.Bandwidth = make(Bandwidth) + } + err = d.bandwidth(m.Bandwidth, v) + case 'k': + m.Key = append(m.Key, d.key(v)) + case 'a': + a := d.attr(v) + switch a.Name { + case ModeInactive, ModeRecvOnly, ModeSendOnly, ModeSendRecv: + m.Mode = a.Name + case "rtpmap", "rtcp-fb", "fmtp": + err = d.format(m, a) + default: + m.Attributes = append(m.Attributes, a) + } + default: + return errUnexpectedField + } + return err +} + +func (d *Decoder) format(m *Media, a *Attr) error { + p, ok := d.fields(a.Value, 2) + if !ok { + return errFormat + } + pt, err := strconv.Atoi(p[0]) + if err != nil { + return err + } + f, v := m.Format(pt), p[1] + if f == nil { + return nil + } + switch a.Name { + case "rtpmap": + err = d.rtpmap(f, v) + case "rtcp-fb": + f.Feedback = append(f.Feedback, v) + case "fmtp": + f.Params = append(f.Params, v) + } + return err +} + +func (d *Decoder) rtpmap(f *Format, v string) error { + p, ok := d.split(v, '/', 3) + if len(p) < 2 { + return errFormat + } + f.Name = p[0] + var err error + if ok { + if f.Channels, err = strconv.Atoi(p[2]); err != nil { + return err + } + } + if f.ClockRate, err = strconv.Atoi(p[1]); err != nil { + return err + } + return nil +} + +func (d *Decoder) proto(m *Media, v string) error { + p, ok := d.fields(v, 4) + if !ok { + return errFormat + } + formats := p[3] + m.Type, m.Proto = p[0], p[2] + p, ok = d.split(p[1], '/', 2) + var err error + if ok { + if m.PortNum, err = strconv.Atoi(p[1]); err != nil { + return err + } + } + if m.Port, err = strconv.Atoi(p[0]); err != nil { + return err + } + p, _ = d.fields(formats, maxLineSize) + for _, it := range p { + if it == "*" { + continue + } + pt, err := strconv.Atoi(it) + if err != nil { + return err + } + m.Formats = append(m.Formats, &Format{Payload: pt}) + } + return nil +} + +func (d *Decoder) origin(v string) (*Origin, error) { + p, ok := d.fields(v, 6) + if !ok { + return nil, errFormat + } + o := new(Origin) + o.Username, o.Network, o.Type, o.Address = p[0], p[3], p[4], p[5] + var err error + if o.SessionID, err = d.int(p[1]); err != nil { + return nil, err + } + if o.SessionVersion, err = d.int(p[2]); err != nil { + return nil, err + } + return o, nil +} + +func (d *Decoder) connection(v string) (*Connection, error) { + p, ok := d.fields(v, 3) + if !ok { + return nil, errFormat + } + c := new(Connection) + c.Network, c.Type, c.Address = p[0], p[1], p[2] + p, ok = d.split(c.Address, '/', 3) + if ok { + ttl, err := d.int(p[1]) + if err != nil { + return nil, err + } + c.TTL = int(ttl) + p = p[1:] + } + if len(p) > 1 { + num, err := d.int(p[1]) + if err != nil { + return nil, err + } + c.Address, c.AddressNum = p[0], int(num) + } + return c, nil +} + +func (d *Decoder) bandwidth(b Bandwidth, v string) error { + p, ok := d.split(v, ':', 2) + if !ok { + return errFormat + } + val, err := d.int(p[1]) + if err != nil { + return err + } + b[p[0]] = int(val) + return nil +} + +func (d *Decoder) timezone(v string) ([]*TimeZone, error) { + p, _ := d.fields(v, 40) + zone := make([]*TimeZone, 0, 1) + var err error + for len(p) > 1 { + it := new(TimeZone) + if it.Time, err = d.time(p[0]); err != nil { + return nil, err + } + if it.Offset, err = d.duration(p[1]); err != nil { + return nil, err + } + zone = append(zone, it) + p = p[2:] + } + return zone, nil +} + +func (d *Decoder) key(v string) *Key { + if p, ok := d.split(v, ':', 2); ok { + return &Key{p[0], p[1]} + } + return &Key{v, ""} +} + +func (d *Decoder) attr(v string) *Attr { + if p, ok := d.split(v, ':', 2); ok { + return &Attr{p[0], p[1]} + } + return &Attr{v, ""} +} + +func (d *Decoder) timing(v string) (*Timing, error) { + p, ok := d.fields(v, 2) + if !ok { + return nil, errFormat + } + start, err := d.time(p[0]) + if err != nil { + return nil, err + } + stop, err := d.time(p[1]) + if err != nil { + return nil, err + } + return &Timing{start, stop}, nil +} + +func (d *Decoder) repeat(v string) (*Repeat, error) { + p, _ := d.fields(v, maxLineSize) + if len(p) < 2 { + return nil, errFormat + } + r := new(Repeat) + var err error + if r.Interval, err = d.duration(p[0]); err != nil { + return nil, err + } + if r.Duration, err = d.duration(p[1]); err != nil { + return nil, err + } + for _, it := range p[2:] { + off, err := d.duration(it) + if err != nil { + return nil, err + } + r.Offsets = append(r.Offsets, off) + } + return r, nil +} + +func (d *Decoder) time(v string) (time.Time, error) { + sec, err := d.int(v) + if err != nil || sec == 0 { + return time.Time{}, err + } + return epoch.Add(time.Second * time.Duration(sec)), nil +} + +func (d *Decoder) duration(v string) (time.Duration, error) { + m := int64(1) + if n := len(v) - 1; n >= 0 { + switch v[n] { + case 'd': + m, v = 86400, v[:n] + case 'h': + m, v = 3600, v[:n] + case 'm': + m, v = 60, v[:n] + case 's': + v = v[:n] + } + } + sec, err := d.int(v) + if err != nil { + return 0, err + } + return time.Duration(sec*m) * time.Second, nil +} + +func (d *Decoder) int(v string) (int64, error) { + return strconv.ParseInt(v, 10, 64) +} + +func (d *Decoder) fields(s string, n int) ([]string, bool) { + return d.split(s, ' ', n) +} + +func (d *Decoder) split(s string, sep rune, n int) ([]string, bool) { + p, pos := d.p[:0], 0 + for i, c := range s { + if c != sep { + continue + } + p = append(p, s[pos:i]) + pos = i + 1 + if len(p) >= n-1 { + break + } + } + p = append(p, s[pos:]) + d.p = p[:0] + return p, len(p) == n +} + +const maxLineSize = 1024 + +type lineReader interface { + ReadLine() (string, error) +} + +type stringReader struct { + s string +} + +func (r *stringReader) ReadLine() (string, error) { + s, n := r.s, len(r.s) + if n == 0 { + return "", io.EOF + } + for i, ch := range s { + if ch == '\n' { + r.s = s[i+1:] + for i > 0 && s[i-1] == '\r' { + i-- + } + return s[:i], nil + } + } + r.s = "" + return s, nil +} + +type reader struct { + b *bufio.Reader +} + +func (r *reader) ReadLine() (string, error) { + b, prefix, err := r.b.ReadLine() + if prefix && err == nil { + err = errLineTooLong + } + if err != nil { + return "", err + } + return string(b), nil +} + +var errLineTooLong = errors.New("sdp: line is too long") +var errUnexpectedField = errors.New("unexpected field") +var errFormat = errors.New("format error") + +type errDecode struct { + err error + line int + text string +} + +func (e *errDecode) Error() string { + return fmt.Sprintf("sdp: %s on line %d '%s'", e.err.Error(), e.line, e.text) +} diff --git a/vendor/github.com/pixelbender/go-sdp/sdp/encoder.go b/vendor/github.com/pixelbender/go-sdp/sdp/encoder.go new file mode 100644 index 00000000..2ae52055 --- /dev/null +++ b/vendor/github.com/pixelbender/go-sdp/sdp/encoder.go @@ -0,0 +1,318 @@ +package sdp + +import ( + "io" + "strconv" + "time" +) + +// An Encoder writes a session description to a buffer. +type Encoder struct { + w io.Writer + buf []byte + pos int + newline bool +} + +// NewEncoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: w} +} + +// Encode encodes the session description. +func (e *Encoder) Encode(s *Session) error { + e.Reset() + e.session(s) + if e.w != nil { + _, err := e.w.Write(e.Bytes()) + if err != nil { + return err + } + } + return nil +} + +// Reset resets encoder state to be empty. +func (e *Encoder) Reset() { + e.pos, e.newline = 0, false +} + +func (e *Encoder) session(s *Session) *Encoder { + e.add('v').int(int64(s.Version)) + if s.Origin != nil { + e.add('o').origin(s.Origin) + } + e.add('s').str(s.Name) + if s.Information != "" { + e.add('i').str(s.Information) + } + if s.URI != "" { + e.add('u').str(s.URI) + } + for _, it := range s.Email { + e.add('e').str(it) + } + for _, it := range s.Phone { + e.add('p').str(it) + } + if s.Connection != nil { + e.add('c').connection(s.Connection) + } + for t, v := range s.Bandwidth { + e.add('b').bandwidth(t, v) + } + if len(s.TimeZone) > 0 { + e.add('z').timezone(s.TimeZone) + } + for _, it := range s.Key { + e.add('k').key(it) + } + e.add('t').timing(s.Timing) + for _, it := range s.Repeat { + e.add('r').repeat(it) + } + if s.Mode != "" { + e.add('a').str(s.Mode) + } + for _, it := range s.Attributes { + e.add('a').attr(it) + } + for _, it := range s.Media { + e.media(it) + } + return e +} + +func (e *Encoder) media(m *Media) *Encoder { + e.add('m').str(m.Type).sp().int(int64(m.Port)) + if m.PortNum > 0 { + e.char('/').int(int64(m.PortNum)) + } + e.sp().str(m.Proto) + for _, it := range m.Formats { + e.sp().int(int64(it.Payload)) + } + if len(m.Formats) == 0 { + e.sp().char('*') + } + if m.Information != "" { + e.add('i').str(m.Information) + } + for _, it := range m.Connection { + e.add('c').connection(it) + } + for t, v := range m.Bandwidth { + e.add('b').bandwidth(t, v) + } + for _, it := range m.Key { + e.add('k').key(it) + } + for _, it := range m.Formats { + e.format(it) + } + if m.Mode != "" { + e.add('a').str(m.Mode) + } + for _, it := range m.Attributes { + e.add('a').attr(it) + } + return e +} + +func (e *Encoder) format(f *Format) *Encoder { + p := int64(f.Payload) + if f.Name != "" { + e.add('a').str("rtpmap:").int(p).sp().str(f.Name).char('/').int(int64(f.ClockRate)) + if f.Channels > 0 { + e.char('/').int(int64(f.Channels)) + } + } + for _, it := range f.Feedback { + e.add('a').str("rtcp-fb:").int(p).sp().str(it) + } + for _, it := range f.Params { + e.add('a').str("fmtp:").int(p).sp().str(it) + } + return e +} + +func (e *Encoder) attr(a *Attr) *Encoder { + if a.Value == "" { + return e.str(a.Name) + } + return e.str(a.Name).char(':').str(a.Value) +} + +func (e *Encoder) timezone(z []*TimeZone) *Encoder { + for i, it := range z { + if i > 0 { + e.char(' ') + } + e.time(it.Time).sp().duration(it.Offset) + } + return e +} + +func (e *Encoder) timing(t *Timing) *Encoder { + if t == nil { + return e.str("0 0") + } + return e.time(t.Start).sp().time(t.Stop) +} + +func (e *Encoder) repeat(r *Repeat) *Encoder { + e.duration(r.Interval).sp().duration(r.Duration) + for _, it := range r.Offsets { + e.sp().duration(it) + } + return e +} + +func (e *Encoder) time(t time.Time) *Encoder { + if t.IsZero() { + return e.char('0') + } + return e.int(int64(t.Sub(epoch).Seconds())) +} + +func (e *Encoder) duration(d time.Duration) *Encoder { + v := int64(d.Seconds()) + switch { + case v == 0: + return e.char('0') + case v%86400 == 0: + return e.int(v / 86400).char('d') + case v%3600 == 0: + return e.int(v / 3600).char('h') + case v%60 == 0: + return e.int(v / 60).char('m') + default: + return e.int(v) + } +} + +func (e *Encoder) bandwidth(m string, v int) *Encoder { + return e.str(m).char(':').int(int64(v)) +} + +func (e *Encoder) key(k *Key) *Encoder { + if k.Value == "" { + return e.str(k.Method) + } + return e.str(k.Method).char(':').str(k.Value) +} + +func (e *Encoder) origin(o *Origin) *Encoder { + return e.str(strd(o.Username, "-")).sp().int(o.SessionID).sp().int(o.SessionVersion).sp().transport(o.Network, o.Type, o.Address) +} + +func (e *Encoder) connection(c *Connection) *Encoder { + e.transport(c.Network, c.Type, c.Address) + if c.TTL > 0 { + e.char('/').int(int64(c.TTL)) + } + if c.AddressNum > 1 { + e.char('/').int(int64(c.AddressNum)) + } + return e +} + +func (e *Encoder) transport(network, typ, addr string) *Encoder { + return e.fields(strd(network, "IN"), strd(typ, "IP4"), strd(addr, "127.0.0.1")) +} + +func strd(v, def string) string { + if v == "" { + return def + } + return v +} + +func (e *Encoder) str(v string) *Encoder { + if v == "" { + return e.char('-') + } + copy(e.next(len(v)), v) + return e +} + +func (e *Encoder) fields(v ...string) *Encoder { + n := len(v) - 1 + for _, it := range v { + n += len(it) + } + p, b := 0, e.next(n) + for _, it := range v { + if p > 0 { + b[p] = ' ' + p++ + } + p += copy(b[p:], it) + } + return e +} + +func (e *Encoder) sp() *Encoder { + return e.char(' ') +} + +func (e *Encoder) char(v byte) *Encoder { + e.next(1)[0] = v + return e +} + +func (e *Encoder) int(v int64) *Encoder { + b := e.next(20) + e.pos += len(strconv.AppendInt(b[:0], v, 10)) - len(b) + return e +} + +func (e *Encoder) add(n byte) *Encoder { + if e.newline { + b := e.next(4) + b[0], b[1], b[2], b[3] = '\r', '\n', n, '=' + } else { + b := e.next(2) + b[0], b[1] = n, '=' + e.newline = true + } + return e +} + +func (e *Encoder) next(n int) (b []byte) { + p := e.pos + n + if len(e.buf) < p { + e.grow(p) + } + b, e.pos = e.buf[e.pos:p], p + return +} + +func (e *Encoder) grow(p int) { + if p < 1024 { + p = 1024 + } else if s := len(e.buf) << 1; p < s { + p = s + } + b := make([]byte, p) + if e.pos > 0 { + copy(b, e.buf[:e.pos]) + } + e.buf = b +} + +// Bytes returns encoded bytes of the last session description. +// The bytes stop being valid at the next encoder call. +func (e *Encoder) Bytes() []byte { + if e.newline { + b := e.next(2) + b[0], b[1] = '\r', '\n' + e.newline = false + } + return e.buf[:e.pos] +} + +// Bytes returns the encoded session description as str. +func (e *Encoder) String() string { + return string(e.Bytes()) +} diff --git a/vendor/github.com/pixelbender/go-sdp/sdp/sdp.go b/vendor/github.com/pixelbender/go-sdp/sdp/sdp.go new file mode 100644 index 00000000..42d7b9e8 --- /dev/null +++ b/vendor/github.com/pixelbender/go-sdp/sdp/sdp.go @@ -0,0 +1,192 @@ +package sdp + +import ( + "time" +) + +// ContentType is the media type for an SDP session description. +const ContentType = "application/sdp" + +// Session represents an SDP session description. +type Session struct { + Version int // Protocol Version ("v=") + Origin *Origin // Origin ("o=") + Name string // Session Name ("s=") + Information string // Session Information ("i=") + URI string // URI ("u=") + Email []string // Email Address ("e=") + Phone []string // Phone Number ("p=") + Connection *Connection // Connection Data ("c=") + Bandwidth Bandwidth // Bandwidth ("b=") + TimeZone []*TimeZone // TimeZone ("z=") + Key []*Key // Encryption Keys ("k=") + Timing *Timing // Timing ("t=") + Repeat []*Repeat // Repeat Times ("r=") + Attributes // Session Attributes ("a=") + Media []*Media // Media Descriptions ("m=") + + Mode string // Streaming mode ("sendrecv", "recvonly", "sendonly", or "inactive") +} + +// String returns the encoded session description as string. +func (s *Session) String() string { + return string(s.Bytes()) +} + +// Bytes returns the encoded session description as buffer. +func (s *Session) Bytes() []byte { + return new(Encoder).session(s).Bytes() +} + +// Origin represents an originator of the session. +type Origin struct { + Username string + SessionID int64 + SessionVersion int64 + Network string + Type string + Address string +} + +// Connection contains connection data. +type Connection struct { + Network string + Type string + Address string + TTL int + AddressNum int +} + +// Bandwidth contains session or media bandwidth information. +type Bandwidth map[string]int + +// TimeZone represents a time zones change information for a repeated session. +type TimeZone struct { + Time time.Time + Offset time.Duration +} + +// Key contains a key exchange information. +// Deprecated: Not recommended, supported for compatibility with older implementations. +type Key struct { + Method, Value string +} + +// Timing specifies start and stop times for a session. +type Timing struct { + Start time.Time + Stop time.Time +} + +// Repeat specifies repeat times for a session. +type Repeat struct { + Interval time.Duration + Duration time.Duration + Offsets []time.Duration +} + +// Media contains media description. +type Media struct { + Type string + Port int + PortNum int + Proto string + + Information string // Media Information ("i=") + Connection []*Connection // Connection Data ("c=") + Bandwidth Bandwidth // Bandwidth ("b=") + Key []*Key // Encryption Keys ("k=") + Attributes // Attributes ("a=") + + Mode string // Streaming mode ("sendrecv", "recvonly", "sendonly", or "inactive") + Formats []*Format // Media Formats ("rtpmap") +} + +// Streaming modes. +const ( + SendRecv = "sendrecv" + SendOnly = "sendonly" + RecvOnly = "recvonly" + Inactive = "inactive" +) + +// NegotiateMode negotiates streaming mode. +func NegotiateMode(local, remote string) string { + switch local { + case SendRecv: + switch remote { + case RecvOnly: + return SendOnly + case SendOnly: + return RecvOnly + default: + return remote + } + case SendOnly: + switch remote { + case SendRecv, RecvOnly: + return SendOnly + } + case RecvOnly: + switch remote { + case SendRecv, SendOnly: + return RecvOnly + } + } + return Inactive +} + +// DeleteAttr removes all elements with name from attrs. +func DeleteAttr(attrs Attributes, name ... string) Attributes { + n := 0 +loop: + for _, it := range attrs { + for _, v := range name { + if it.Name == v { + continue loop + } + } + attrs[n] = it + n++ + } + return attrs[:n] +} + +// Format returns format description by payload type. +func (m *Media) Format(pt int) *Format { + for _, f := range m.Formats { + if f.Payload == pt { + return f + } + } + return nil +} + +// Format is a media format description represented by "rtpmap" attributes. +type Format struct { + Payload int + Name string + ClockRate int + Channels int + Feedback []string // "rtcp-fb" attributes + Params []string // "fmtp" attributes +} + +var epoch = time.Date(1900, time.January, 1, 0, 0, 0, 0, time.UTC) + +// GetAttribute returns session or first determined media attribute. +func (sess *Session) GetAttribute(name string) string { + for _, it := range sess.Attributes { + if it.Name == name { + return it.Value + } + } + for _, media := range sess.Media { + for _, it := range media.Attributes { + if it.Name == name { + return it.Value + } + } + } + return "" +} diff --git a/vendor/github.com/reactivex/rxgo/LICENSE b/vendor/github.com/reactivex/rxgo/LICENSE new file mode 100644 index 00000000..0e13817f --- /dev/null +++ b/vendor/github.com/reactivex/rxgo/LICENSE @@ -0,0 +1,22 @@ +MIT License + +Copyright (c) 2016 Joe Chasinga + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/reactivex/rxgo/observable/observable.go b/vendor/github.com/reactivex/rxgo/observable/observable.go new file mode 100644 index 00000000..40d950c0 --- /dev/null +++ b/vendor/github.com/reactivex/rxgo/observable/observable.go @@ -0,0 +1,415 @@ +package observable + +import ( + "sync" + "time" + + "github.com/reactivex/rxgo" + "github.com/reactivex/rxgo/errors" + "github.com/reactivex/rxgo/fx" + "github.com/reactivex/rxgo/handlers" + "github.com/reactivex/rxgo/observer" + "github.com/reactivex/rxgo/subscription" +) + +// Observable is a basic observable channel +type Observable <-chan interface{} + +var DefaultObservable = make(Observable) + +// New creates an Observable +func New(buffer uint) Observable { + return make(Observable, int(buffer)) +} + +// CheckHandler checks the underlying type of an EventHandler. +func CheckEventHandler(handler rx.EventHandler) observer.Observer { + ob := observer.DefaultObserver + switch handler := handler.(type) { + case handlers.NextFunc: + ob.NextHandler = handler + case handlers.ErrFunc: + ob.ErrHandler = handler + case handlers.DoneFunc: + ob.DoneHandler = handler + case observer.Observer: + ob = handler + } + return ob +} + +// Next returns the next item on the Observable. +func (o Observable) Next() (interface{}, error) { + if next, ok := <-o; ok { + return next, nil + } + return nil, errors.New(errors.EndOfIteratorError) +} + +// Subscribe subscribes an EventHandler and returns a Subscription channel. +func (o Observable) Subscribe(handler rx.EventHandler) <-chan subscription.Subscription { + done := make(chan subscription.Subscription) + sub := subscription.New().Subscribe() + + ob := CheckEventHandler(handler) + + go func() { + OuterLoop: + for item := range o { + switch item := item.(type) { + case error: + ob.OnError(item) + + // Record the error and break the loop. + sub.Error = item + break OuterLoop + default: + ob.OnNext(item) + } + } + + // OnDone only gets executed if there's no error. + if sub.Error == nil { + ob.OnDone() + } + + done <- sub.Unsubscribe() + return + }() + + return done +} + +/* +func (o Observable) Unsubscribe() subscription.Subscription { + // Stub: to be implemented + return subscription.New() +} +*/ + +// Map maps a MappableFunc predicate to each item in Observable and +// returns a new Observable with applied items. +func (o Observable) Map(apply fx.MappableFunc) Observable { + out := make(chan interface{}) + go func() { + for item := range o { + out <- apply(item) + } + close(out) + }() + return Observable(out) +} + +// Take takes first n items in the original Obserable and returns +// a new Observable with the taken items. +func (o Observable) Take(nth uint) Observable { + out := make(chan interface{}) + go func() { + takeCount := 0 + for item := range o { + if (takeCount < int(nth)) { + takeCount += 1 + out <- item + continue + } + break + } + close(out) + }() + return Observable(out) +} + +// TakeLast takes last n items in the original Observable and returns +// a new Observable with the taken items. +func (o Observable) TakeLast(nth uint) Observable { + out := make(chan interface{}) + go func() { + buf := make([]interface{}, nth) + for item := range o { + if (len(buf) >= int(nth)) { + buf = buf[1:] + } + buf = append(buf, item) + } + for _, takenItem := range buf { + out <- takenItem + } + close(out) + }() + return Observable(out) +} + +// Filter filters items in the original Observable and returns +// a new Observable with the filtered items. +func (o Observable) Filter(apply fx.FilterableFunc) Observable { + out := make(chan interface{}) + go func() { + for item := range o { + if apply(item) { + out <- item + } + } + close(out) + }() + return Observable(out) +} + +// First returns new Observable which emit only first item. +func (o Observable) First() Observable { + out := make(chan interface{}) + go func() { + for item := range o { + out <- item + break + } + close(out) + }() + return Observable(out) +} + +// Last returns a new Observable which emit only last item. +func (o Observable) Last() Observable { + out := make(chan interface{}) + go func() { + var last interface{} + for item := range o { + last = item + } + out <- last + close(out) + }() + return Observable(out) +} + +// Distinct suppresses duplicate items in the original Observable and returns +// a new Observable. +func (o Observable) Distinct(apply fx.KeySelectorFunc) Observable { + out := make(chan interface{}) + go func() { + keysets := make(map[interface{}]struct{}) + for item := range o { + key := apply(item) + _, ok := keysets[key] + if !ok { + out <- item + } + keysets[key] = struct{}{} + } + close(out) + }() + return Observable(out) +} + +// DistinctUntilChanged suppresses consecutive duplicate items in the original +// Observable and returns a new Observable. +func (o Observable) DistinctUntilChanged(apply fx.KeySelectorFunc) Observable { + out := make(chan interface{}) + go func() { + var current interface{} + for item := range o { + key := apply(item) + if current != key { + out <- item + current = key + } + } + close(out) + }() + return Observable(out) +} + +// Skip suppresses the first n items in the original Observable and +// returns a new Observable with the rest items. +func (o Observable) Skip(nth uint) Observable { + out := make(chan interface{}) + go func() { + skipCount := 0 + for item := range o { + if (skipCount < int(nth)) { + skipCount += 1 + continue + } + out <- item + } + close(out) + }() + return Observable(out) +} + +// SkipLast suppresses the last n items in the original Observable and +// returns a new Observable with the rest items. +func (o Observable) SkipLast(nth uint) Observable { + out := make(chan interface{}) + go func() { + buf := make(chan interface{}, nth) + for item := range o { + select { + case buf <- item: + default: + out <- (<- buf) + buf <- item + } + } + close(buf) + close(out) + }() + return Observable(out) +} + + +// Scan applies ScannableFunc predicate to each item in the original +// Observable sequentially and emits each successive value on a new Observable. +func (o Observable) Scan(apply fx.ScannableFunc) Observable { + out := make(chan interface{}) + + go func() { + var current interface{} + for item := range o { + out <- apply(current, item) + current = apply(current, item) + } + close(out) + }() + return Observable(out) +} + +// From creates a new Observable from an Iterator. +func From(it rx.Iterator) Observable { + source := make(chan interface{}) + go func() { + for { + val, err := it.Next() + if err != nil { + break + } + source <- val + } + close(source) + }() + return Observable(source) +} + +// Empty creates an Observable with no item and terminate immediately. +func Empty() Observable { + source := make(chan interface{}) + go func() { + close(source) + }() + return Observable(source) +} + +// Interval creates an Observable emitting incremental integers infinitely between +// each given time interval. +func Interval(term chan struct{}, interval time.Duration) Observable { + source := make(chan interface{}) + go func(term chan struct{}) { + i := 0 + OuterLoop: + for { + select { + case <-term: + break OuterLoop + case <-time.After(interval): + source <- i + } + i++ + } + close(source) + }(term) + return Observable(source) +} + +// Repeat creates an Observable emitting a given item repeatedly +func Repeat(item interface{}, ntimes ...int) Observable { + source := make(chan interface{}) + + // this is the infinity case no ntime parameter is given + if len(ntimes) == 0 { + go func() { + for { + source <- item + } + close(source) + }() + return Observable(source) + } + + // this repeat the item ntime + if len(ntimes) > 0 { + count := ntimes[0] + if count <= 0 { + return Empty() + } + go func() { + for i := 0; i < count; i++ { + source <- item + } + close(source) + }() + return Observable(source) + } + + return Empty() +} + +// Range creates an Observable that emits a particular range of sequential integers. +func Range(start, end int) Observable { + source := make(chan interface{}) + go func() { + i := start + for i < end { + source <- i + i++ + } + close(source) + }() + return Observable(source) +} + +// Just creates an Observable with the provided item(s). +func Just(item interface{}, items ...interface{}) Observable { + source := make(chan interface{}) + if len(items) > 0 { + items = append([]interface{}{item}, items...) + } else { + items = []interface{}{item} + } + + go func() { + for _, item := range items { + source <- item + } + close(source) + }() + + return Observable(source) +} + +// Start creates an Observable from one or more directive-like EmittableFunc +// and emits the result of each operation asynchronously on a new Observable. +func Start(f fx.EmittableFunc, fs ...fx.EmittableFunc) Observable { + if len(fs) > 0 { + fs = append([]fx.EmittableFunc{f}, fs...) + } else { + fs = []fx.EmittableFunc{f} + } + + source := make(chan interface{}) + + var wg sync.WaitGroup + for _, f := range fs { + wg.Add(1) + go func(f fx.EmittableFunc) { + source <- f() + wg.Done() + }(f) + } + + // Wait in another goroutine to not block + go func() { + wg.Wait() + close(source) + }() + + return Observable(source) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 8488a374..a81d3ae1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -272,6 +272,12 @@ "revision": "c3d6048651cd9420b049fc7ddfc232002d36e800", "revisionTime": "2018-05-29T03:02:45Z" }, + { + "checksumSHA1": "tQsy6xGhp1I9d+quwDQZWMtSVeQ=", + "path": "github.com/pixelbender/go-sdp/sdp", + "revision": "100bc9371a0caf8b1797c82bcc9c36854b059b51", + "revisionTime": "2018-11-23T09:41:52Z" + }, { "checksumSHA1": "d6BycwPpKXW09I/tXMqcItE8SA4=", "origin": "github.com/penggy/EasyGoLib/vendor/github.com/pkg/errors", @@ -279,6 +285,12 @@ "revision": "d0926230dda8c9e4e61040cb7825a026dee7d2d3", "revisionTime": "2018-09-07T02:33:35Z" }, + { + "checksumSHA1": "wGV+EeSd5YGVLiYL36qT65GWahg=", + "path": "github.com/reactivex/rxgo/observable", + "revision": "e715dd83f030be66a2cbef90b842fc3caedfcc69", + "revisionTime": "2018-10-31T19:04:19Z" + }, { "checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=", "path": "github.com/shirou/gopsutil/cpu", @@ -362,5 +374,5 @@ "revisionTime": "2018-03-28T19:50:20Z" } ], - "rootPath": "github.com/EasyDarwin" + "rootPath": "github.com/EasyDarwin/EasyDarwin" } diff --git a/web_src/components/PullRTSPDlg.vue b/web_src/components/PullRTSPDlg.vue index 7a80b82c..1d59d4e5 100644 --- a/web_src/components/PullRTSPDlg.vue +++ b/web_src/components/PullRTSPDlg.vue @@ -58,7 +58,7 @@ export default { return; } this.bLoading = true; - $.get('/stream/start', this.form).then(data => { + $.get('/api/v1/stream/start', this.form).then(data => { this.$refs['dlg'].hide(); this.$emit('submit'); }).always(() => { diff --git a/web_src/components/PusherList.vue b/web_src/components/PusherList.vue index 3dda5e01..06410984 100644 --- a/web_src/components/PusherList.vue +++ b/web_src/components/PusherList.vue @@ -14,7 +14,7 @@

推流列表

-
+
@@ -51,13 +51,13 @@ - +