From 1a6cb3260aa111a5f13eeb78c2ae7c935152bfef Mon Sep 17 00:00:00 2001 From: macbookpro Date: Sat, 24 Nov 2018 16:54:07 +0800 Subject: [PATCH] =?UTF-8?q?query=20video=20files=20and=20playback=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=BD=95=E5=83=8F=E7=9A=84=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=92=8C=E5=9B=9E=E6=94=BE=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- routers/routers.go | 12 + routers/streams.go | 185 +++++++- rtsp/pusher.go | 8 + rtsp/rtsp-client.go | 72 +-- vendor/github.com/reactivex/rxgo/LICENSE | 22 - .../reactivex/rxgo/observable/observable.go | 415 ------------------ vendor/vendor.json | 6 - 8 files changed, 229 insertions(+), 493 deletions(-) delete mode 100644 vendor/github.com/reactivex/rxgo/LICENSE delete mode 100644 vendor/github.com/reactivex/rxgo/observable/observable.go diff --git a/README.md b/README.md index 298592d9..8fcc9ac9 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ go get -u github.com/kardianos/govendor go get -u github.com/caixw/gobuild - go get -u github.com/reactivex/rxgo + ### 编译命令 diff --git a/routers/routers.go b/routers/routers.go index 508cb24a..a6275b10 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -138,6 +138,18 @@ func Init() (err error) { api.GET("/stream/start", API.StreamStart) api.GET("/stream/stop", API.StreamStop) + + api.GET("/record/folders", API.RecordFolders) + api.GET("/record/files", API.RecordFiles) + } + + { + + mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("") + if len(mp4Path) != 0 { + Router.Use(static.Serve("/record", static.LocalFile(mp4Path, true))) + } + } return diff --git a/routers/streams.go b/routers/streams.go index b1676de7..38c82ec6 100644 --- a/routers/streams.go +++ b/routers/streams.go @@ -1,12 +1,21 @@ package routers import ( - "log" - + "bytes" + "fmt" "github.com/EasyDarwin/EasyDarwin/rtsp" "github.com/gin-gonic/gin" - "github.com/reactivex/rxgo/handlers" - "github.com/reactivex/rxgo/observer" + "github.com/penggy/EasyGoLib/utils" + "log" + "math" + "net/http" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" ) func (h *APIHandler) StreamStart(c *gin.Context) { @@ -17,25 +26,167 @@ func (h *APIHandler) StreamStart(c *gin.Context) { var form Form err := c.Bind(&form) if err != nil { + log.Printf("Pull to push err:%v", err) return } - client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, int64(form.IdleTimeout)*1000) + client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, 0) pusher := rtsp.NewClientPusher(client) + err = client.Start(time.Duration(form.IdleTimeout) * time.Second) + if err != nil { + log.Printf("Pull stream err :%v", err) + c.AbortWithStatusJSON(http.StatusBadRequest, fmt.Sprintf("Pull stream err: %v", err)) + return + } + log.Printf("Pull to push %v success ", form) rtsp.GetServer().AddPusher(pusher) - onNext := handlers.NextFunc(func(item interface{}) { - log.Printf("CLIENT:RTSP拉流成功:%v", item) - }) - onDone := handlers.DoneFunc(func() { - log.Println("CLIENT done") - }) - onError := handlers.ErrFunc(func(err error) { - log.Println("CLIENT Error :", err.Error()) - }) - watcher := observer.New(onNext, onDone, onError) - client.Start().Subscribe(watcher) - c.IndentedJSON(200, "OK") + c.IndentedJSON(200, pusher.ID()) } func (h *APIHandler) StreamStop(c *gin.Context) { + type Form struct { + ID string `form:"id" binding:"required"` + } + var form Form + err := c.Bind(&form) + if err != nil { + log.Printf("stop pull to push err:%v", err) + return + } + pushers := rtsp.GetServer().GetPushers() + for _, v := range pushers { + if v.ID() == form.ID { + v.Stop() + c.IndentedJSON(200, "OK") + + log.Printf("Stop %v success ", v) + return + } + } + c.AbortWithStatusJSON(http.StatusBadRequest, fmt.Sprintf("Pusher[%s] not found", form.ID)) +} + +func (h *APIHandler) RecordFolders(c *gin.Context) { + mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("") + form := utils.NewPageForm() + if err := c.Bind(form); err != nil { + log.Printf("record folder bind err:%v", err) + return + } + var files = make([]interface{}, 0) + if mp4Path != "" { + visit := func(files *[]interface{}) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if path == mp4Path { + return nil + } + if !info.IsDir() { + return nil + } + *files = append(*files, map[string]interface{}{"folder": info.Name()}) + return filepath.SkipDir + } + } + err := filepath.Walk(mp4Path, visit(&files)) + if err != nil { + log.Printf("Query RecordFolders err:%v", err) + } + } + pr := utils.NewPageResult(files) + if form.Sort != "" { + pr.Sort(form.Sort, form.Order) + } + pr.Slice(form.Start, form.Limit) + c.IndentedJSON(200, pr) } + +func (h *APIHandler) RecordFiles(c *gin.Context) { + type Form struct { + utils.PageForm + Folder string `form:"folder" binding:"required"` + StartAt int `form:"beginUTCSecond"` + StopAt int `form:"endUTCSecond"` + } + var form = Form{} + form.Limit = math.MaxUint32 + err := c.Bind(&form) + if err != nil { + log.Printf("record file bind err:%v", err) + return + } + + files := make([]interface{}, 0) + mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("") + if mp4Path != "" { + ffmpeg_path := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("") + ffmpeg_folder, executable := filepath.Split(ffmpeg_path) + split := strings.Split(executable, ".") + suffix := "" + if len(split) > 1 { + suffix = split[1] + } + ffprobe := ffmpeg_folder + "ffprobe" + suffix + folder := filepath.Join(mp4Path, form.Folder) + visit := func(files *[]interface{}) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if path == folder { + return nil + } + if info.IsDir() { + return nil + } + if info.Size() == 0 { + return nil + } + if info.Name() == ".DS_Store" { + return nil + } + cmd := exec.Command(ffprobe, "-i", path) + cmdOutput := &bytes.Buffer{} + //cmd.Stdout = cmdOutput + cmd.Stderr = cmdOutput + err = cmd.Run() + bytes := cmdOutput.Bytes() + output := string(bytes) + //log.Printf("%v result:%v", cmd, output) + var average = regexp.MustCompile(`Duration: ((\d+):(\d+):(\d+).(\d+))`) + result := average.FindStringSubmatch(output) + duration := time.Duration(0) + durationStr := "" + if len(result) > 0 { + durationStr = result[1] + h, _ := strconv.Atoi(result[2]) + duration += time.Duration(h) * time.Hour + m, _ := strconv.Atoi(result[3]) + duration += time.Duration(m) * time.Minute + s, _ := strconv.Atoi(result[4]) + duration += time.Duration(s) * time.Second + millis, _ := strconv.Atoi(result[5]) + duration += time.Duration(millis) * time.Millisecond + } + *files = append(*files, map[string]interface{}{ + "name": info.Name(), + "durationMillis": duration / time.Millisecond, + "duration": durationStr}) + return nil + } + } + err = filepath.Walk(folder, visit(&files)) + if err != nil { + log.Printf("Query RecordFolders err:%v", err) + } + } + + pr := utils.NewPageResult(files) + if form.Sort != "" { + pr.Sort(form.Sort, form.Order) + } + pr.Slice(form.Start, form.Limit) + c.IndentedJSON(200, pr) +} diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 47e71df1..a4a12d10 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -231,6 +231,14 @@ func (pusher *Pusher) Start() { } } +func (pusher *Pusher) Stop() { + if pusher.Session != nil { + pusher.Session.Stop() + return + } + pusher.RTSPClient.Stop() +} + func (pusher *Pusher) BroadcastRTP(pack *RTPPack) *Pusher { for _, player := range pusher.GetPlayers() { player.QueueRTP(pack) diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index 0cb1b500..20189ab4 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -16,7 +16,6 @@ import ( "github.com/penggy/EasyGoLib/utils" "github.com/pixelbender/go-sdp/sdp" - "github.com/reactivex/rxgo/observable" ) type RTSPClient struct { @@ -77,9 +76,14 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) *RTSPC return client } -func (client *RTSPClient) Start() observable.Observable { - source := make(chan interface{}) - requestStream := func() interface{} { +func (client *RTSPClient) Start(timeout time.Duration) error { + //source := make(chan interface{}) + + if timeout == 0 { + timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) + timeout = time.Duration(timeoutMillis) * time.Millisecond + } + requestStream := func() error { l, err := url.Parse(client.URL) setStatus := func() { if err != nil { @@ -92,11 +96,19 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { return err } + if strings.ToLower(l.Scheme) != "rtsp" { + err = fmt.Errorf("RTSP url is invalid") + return err + } + if strings.ToLower(l.Hostname()) == "" { + err = fmt.Errorf("RTSP url is invalid") + return err + } port := l.Port() if len(port) == 0 { port = "554" } - conn, err := net.Dial("tcp", l.Hostname()+":"+port) + conn, err := net.DialTimeout("tcp", l.Hostname()+":"+port, timeout) if err != nil { // handle error return err @@ -104,11 +116,10 @@ func (client *RTSPClient) Start() observable.Observable { client.Conn = conn networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) - timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) timeoutConn := RichConn{ conn, - time.Duration(timeoutMillis) * time.Millisecond, + timeout, } client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) @@ -175,17 +186,14 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { return err } - return 0 + return nil } - stream := func(ch chan interface{}) { + + stream := func() { OptionIntervalMillis := client.OptionIntervalMillis startTime := time.Now() loggerTime := time.Now().Add(-10 * time.Second) - defer func() { - if client.Stoped { - close(ch) - } - }() + defer client.Stop() for !client.Stoped { if OptionIntervalMillis > 0 { elapse := time.Now().Sub(startTime) @@ -205,7 +213,6 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { if !client.Stoped { log.Printf("client.connRW.ReadByte err:%v", err) - ch <- err } return } @@ -217,7 +224,6 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { if !client.Stoped { - ch <- err log.Printf("io.ReadFull err:%v", err) } return @@ -228,7 +234,6 @@ func (client *RTSPClient) Start() observable.Observable { _, err = io.ReadFull(client.connRW, content) if err != nil { if !client.Stoped { - ch <- err log.Printf("io.ReadFull err:%v", err) } return @@ -267,7 +272,7 @@ func (client *RTSPClient) Start() observable.Observable { } elapsed := time.Now().Sub(loggerTime) if elapsed >= 10*time.Second { - log.Printf("client[%v]read rtp frame.", client) + log.Printf("%v read rtp frame.", client) loggerTime = time.Now() } client.InBytes += int(length + 4) @@ -283,7 +288,6 @@ func (client *RTSPClient) Start() observable.Observable { line, prefix, err := client.connRW.ReadLine() if err != nil { if !client.Stoped { - ch <- err log.Printf("client.connRW.ReadLine err:%v", err) } return @@ -295,7 +299,6 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { if !client.Stoped { err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) - ch <- err } return } @@ -316,7 +319,6 @@ func (client *RTSPClient) Start() observable.Observable { contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) if err != nil { if !client.Stoped { - ch <- err log.Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) } return @@ -326,19 +328,25 @@ func (client *RTSPClient) Start() observable.Observable { } } } - go func() { - defer client.Stop() - r := requestStream() - source <- r - switch r.(type) { - case error: - return - } - stream(source) - }() - return observable.Observable(source) + //go func() { + // defer client.Stop() + // r := requestStream() + // source <- r + // switch r.(type) { + // case error: + // return + // } + // stream(source) + //}() + //return observable.Observable(source) //return observable.Just(1) + err := requestStream() + if err != nil { + return err + } + go stream() + return nil } func (client *RTSPClient) Stop() { diff --git a/vendor/github.com/reactivex/rxgo/LICENSE b/vendor/github.com/reactivex/rxgo/LICENSE deleted file mode 100644 index 0e13817f..00000000 --- a/vendor/github.com/reactivex/rxgo/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -MIT License - -Copyright (c) 2016 Joe Chasinga - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - diff --git a/vendor/github.com/reactivex/rxgo/observable/observable.go b/vendor/github.com/reactivex/rxgo/observable/observable.go deleted file mode 100644 index 40d950c0..00000000 --- a/vendor/github.com/reactivex/rxgo/observable/observable.go +++ /dev/null @@ -1,415 +0,0 @@ -package observable - -import ( - "sync" - "time" - - "github.com/reactivex/rxgo" - "github.com/reactivex/rxgo/errors" - "github.com/reactivex/rxgo/fx" - "github.com/reactivex/rxgo/handlers" - "github.com/reactivex/rxgo/observer" - "github.com/reactivex/rxgo/subscription" -) - -// Observable is a basic observable channel -type Observable <-chan interface{} - -var DefaultObservable = make(Observable) - -// New creates an Observable -func New(buffer uint) Observable { - return make(Observable, int(buffer)) -} - -// CheckHandler checks the underlying type of an EventHandler. -func CheckEventHandler(handler rx.EventHandler) observer.Observer { - ob := observer.DefaultObserver - switch handler := handler.(type) { - case handlers.NextFunc: - ob.NextHandler = handler - case handlers.ErrFunc: - ob.ErrHandler = handler - case handlers.DoneFunc: - ob.DoneHandler = handler - case observer.Observer: - ob = handler - } - return ob -} - -// Next returns the next item on the Observable. -func (o Observable) Next() (interface{}, error) { - if next, ok := <-o; ok { - return next, nil - } - return nil, errors.New(errors.EndOfIteratorError) -} - -// Subscribe subscribes an EventHandler and returns a Subscription channel. -func (o Observable) Subscribe(handler rx.EventHandler) <-chan subscription.Subscription { - done := make(chan subscription.Subscription) - sub := subscription.New().Subscribe() - - ob := CheckEventHandler(handler) - - go func() { - OuterLoop: - for item := range o { - switch item := item.(type) { - case error: - ob.OnError(item) - - // Record the error and break the loop. - sub.Error = item - break OuterLoop - default: - ob.OnNext(item) - } - } - - // OnDone only gets executed if there's no error. - if sub.Error == nil { - ob.OnDone() - } - - done <- sub.Unsubscribe() - return - }() - - return done -} - -/* -func (o Observable) Unsubscribe() subscription.Subscription { - // Stub: to be implemented - return subscription.New() -} -*/ - -// Map maps a MappableFunc predicate to each item in Observable and -// returns a new Observable with applied items. -func (o Observable) Map(apply fx.MappableFunc) Observable { - out := make(chan interface{}) - go func() { - for item := range o { - out <- apply(item) - } - close(out) - }() - return Observable(out) -} - -// Take takes first n items in the original Obserable and returns -// a new Observable with the taken items. -func (o Observable) Take(nth uint) Observable { - out := make(chan interface{}) - go func() { - takeCount := 0 - for item := range o { - if (takeCount < int(nth)) { - takeCount += 1 - out <- item - continue - } - break - } - close(out) - }() - return Observable(out) -} - -// TakeLast takes last n items in the original Observable and returns -// a new Observable with the taken items. -func (o Observable) TakeLast(nth uint) Observable { - out := make(chan interface{}) - go func() { - buf := make([]interface{}, nth) - for item := range o { - if (len(buf) >= int(nth)) { - buf = buf[1:] - } - buf = append(buf, item) - } - for _, takenItem := range buf { - out <- takenItem - } - close(out) - }() - return Observable(out) -} - -// Filter filters items in the original Observable and returns -// a new Observable with the filtered items. -func (o Observable) Filter(apply fx.FilterableFunc) Observable { - out := make(chan interface{}) - go func() { - for item := range o { - if apply(item) { - out <- item - } - } - close(out) - }() - return Observable(out) -} - -// First returns new Observable which emit only first item. -func (o Observable) First() Observable { - out := make(chan interface{}) - go func() { - for item := range o { - out <- item - break - } - close(out) - }() - return Observable(out) -} - -// Last returns a new Observable which emit only last item. -func (o Observable) Last() Observable { - out := make(chan interface{}) - go func() { - var last interface{} - for item := range o { - last = item - } - out <- last - close(out) - }() - return Observable(out) -} - -// Distinct suppresses duplicate items in the original Observable and returns -// a new Observable. -func (o Observable) Distinct(apply fx.KeySelectorFunc) Observable { - out := make(chan interface{}) - go func() { - keysets := make(map[interface{}]struct{}) - for item := range o { - key := apply(item) - _, ok := keysets[key] - if !ok { - out <- item - } - keysets[key] = struct{}{} - } - close(out) - }() - return Observable(out) -} - -// DistinctUntilChanged suppresses consecutive duplicate items in the original -// Observable and returns a new Observable. -func (o Observable) DistinctUntilChanged(apply fx.KeySelectorFunc) Observable { - out := make(chan interface{}) - go func() { - var current interface{} - for item := range o { - key := apply(item) - if current != key { - out <- item - current = key - } - } - close(out) - }() - return Observable(out) -} - -// Skip suppresses the first n items in the original Observable and -// returns a new Observable with the rest items. -func (o Observable) Skip(nth uint) Observable { - out := make(chan interface{}) - go func() { - skipCount := 0 - for item := range o { - if (skipCount < int(nth)) { - skipCount += 1 - continue - } - out <- item - } - close(out) - }() - return Observable(out) -} - -// SkipLast suppresses the last n items in the original Observable and -// returns a new Observable with the rest items. -func (o Observable) SkipLast(nth uint) Observable { - out := make(chan interface{}) - go func() { - buf := make(chan interface{}, nth) - for item := range o { - select { - case buf <- item: - default: - out <- (<- buf) - buf <- item - } - } - close(buf) - close(out) - }() - return Observable(out) -} - - -// Scan applies ScannableFunc predicate to each item in the original -// Observable sequentially and emits each successive value on a new Observable. -func (o Observable) Scan(apply fx.ScannableFunc) Observable { - out := make(chan interface{}) - - go func() { - var current interface{} - for item := range o { - out <- apply(current, item) - current = apply(current, item) - } - close(out) - }() - return Observable(out) -} - -// From creates a new Observable from an Iterator. -func From(it rx.Iterator) Observable { - source := make(chan interface{}) - go func() { - for { - val, err := it.Next() - if err != nil { - break - } - source <- val - } - close(source) - }() - return Observable(source) -} - -// Empty creates an Observable with no item and terminate immediately. -func Empty() Observable { - source := make(chan interface{}) - go func() { - close(source) - }() - return Observable(source) -} - -// Interval creates an Observable emitting incremental integers infinitely between -// each given time interval. -func Interval(term chan struct{}, interval time.Duration) Observable { - source := make(chan interface{}) - go func(term chan struct{}) { - i := 0 - OuterLoop: - for { - select { - case <-term: - break OuterLoop - case <-time.After(interval): - source <- i - } - i++ - } - close(source) - }(term) - return Observable(source) -} - -// Repeat creates an Observable emitting a given item repeatedly -func Repeat(item interface{}, ntimes ...int) Observable { - source := make(chan interface{}) - - // this is the infinity case no ntime parameter is given - if len(ntimes) == 0 { - go func() { - for { - source <- item - } - close(source) - }() - return Observable(source) - } - - // this repeat the item ntime - if len(ntimes) > 0 { - count := ntimes[0] - if count <= 0 { - return Empty() - } - go func() { - for i := 0; i < count; i++ { - source <- item - } - close(source) - }() - return Observable(source) - } - - return Empty() -} - -// Range creates an Observable that emits a particular range of sequential integers. -func Range(start, end int) Observable { - source := make(chan interface{}) - go func() { - i := start - for i < end { - source <- i - i++ - } - close(source) - }() - return Observable(source) -} - -// Just creates an Observable with the provided item(s). -func Just(item interface{}, items ...interface{}) Observable { - source := make(chan interface{}) - if len(items) > 0 { - items = append([]interface{}{item}, items...) - } else { - items = []interface{}{item} - } - - go func() { - for _, item := range items { - source <- item - } - close(source) - }() - - return Observable(source) -} - -// Start creates an Observable from one or more directive-like EmittableFunc -// and emits the result of each operation asynchronously on a new Observable. -func Start(f fx.EmittableFunc, fs ...fx.EmittableFunc) Observable { - if len(fs) > 0 { - fs = append([]fx.EmittableFunc{f}, fs...) - } else { - fs = []fx.EmittableFunc{f} - } - - source := make(chan interface{}) - - var wg sync.WaitGroup - for _, f := range fs { - wg.Add(1) - go func(f fx.EmittableFunc) { - source <- f() - wg.Done() - }(f) - } - - // Wait in another goroutine to not block - go func() { - wg.Wait() - close(source) - }() - - return Observable(source) -} diff --git a/vendor/vendor.json b/vendor/vendor.json index a81d3ae1..d08f4839 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -285,12 +285,6 @@ "revision": "d0926230dda8c9e4e61040cb7825a026dee7d2d3", "revisionTime": "2018-09-07T02:33:35Z" }, - { - "checksumSHA1": "wGV+EeSd5YGVLiYL36qT65GWahg=", - "path": "github.com/reactivex/rxgo/observable", - "revision": "e715dd83f030be66a2cbef90b842fc3caedfcc69", - "revisionTime": "2018-10-31T19:04:19Z" - }, { "checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=", "path": "github.com/shirou/gopsutil/cpu",