mirror of https://github.com/EasyDarwin/EasyDarwin
parent
af2c2b152a
commit
1a6cb3260a
|
@ -79,7 +79,7 @@
|
|||
|
||||
go get -u github.com/kardianos/govendor
|
||||
go get -u github.com/caixw/gobuild
|
||||
go get -u github.com/reactivex/rxgo
|
||||
|
||||
|
||||
### 编译命令
|
||||
|
||||
|
|
|
@ -138,6 +138,18 @@ func Init() (err error) {
|
|||
|
||||
api.GET("/stream/start", API.StreamStart)
|
||||
api.GET("/stream/stop", API.StreamStop)
|
||||
|
||||
api.GET("/record/folders", API.RecordFolders)
|
||||
api.GET("/record/files", API.RecordFiles)
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("")
|
||||
if len(mp4Path) != 0 {
|
||||
Router.Use(static.Serve("/record", static.LocalFile(mp4Path, true)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -1,12 +1,21 @@
|
|||
package routers
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/EasyDarwin/EasyDarwin/rtsp"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/reactivex/rxgo/handlers"
|
||||
"github.com/reactivex/rxgo/observer"
|
||||
"github.com/penggy/EasyGoLib/utils"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (h *APIHandler) StreamStart(c *gin.Context) {
|
||||
|
@ -17,25 +26,167 @@ func (h *APIHandler) StreamStart(c *gin.Context) {
|
|||
var form Form
|
||||
err := c.Bind(&form)
|
||||
if err != nil {
|
||||
log.Printf("Pull to push err:%v", err)
|
||||
return
|
||||
}
|
||||
client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, int64(form.IdleTimeout)*1000)
|
||||
client := rtsp.NewRTSPClient(rtsp.GetServer(), form.URL, 0)
|
||||
pusher := rtsp.NewClientPusher(client)
|
||||
err = client.Start(time.Duration(form.IdleTimeout) * time.Second)
|
||||
if err != nil {
|
||||
log.Printf("Pull stream err :%v", err)
|
||||
c.AbortWithStatusJSON(http.StatusBadRequest, fmt.Sprintf("Pull stream err: %v", err))
|
||||
return
|
||||
}
|
||||
log.Printf("Pull to push %v success ", form)
|
||||
rtsp.GetServer().AddPusher(pusher)
|
||||
onNext := handlers.NextFunc(func(item interface{}) {
|
||||
log.Printf("CLIENT:RTSP拉流成功:%v", item)
|
||||
})
|
||||
onDone := handlers.DoneFunc(func() {
|
||||
log.Println("CLIENT done")
|
||||
})
|
||||
onError := handlers.ErrFunc(func(err error) {
|
||||
log.Println("CLIENT Error :", err.Error())
|
||||
})
|
||||
watcher := observer.New(onNext, onDone, onError)
|
||||
client.Start().Subscribe(watcher)
|
||||
c.IndentedJSON(200, "OK")
|
||||
c.IndentedJSON(200, pusher.ID())
|
||||
}
|
||||
|
||||
func (h *APIHandler) StreamStop(c *gin.Context) {
|
||||
type Form struct {
|
||||
ID string `form:"id" binding:"required"`
|
||||
}
|
||||
var form Form
|
||||
err := c.Bind(&form)
|
||||
if err != nil {
|
||||
log.Printf("stop pull to push err:%v", err)
|
||||
return
|
||||
}
|
||||
pushers := rtsp.GetServer().GetPushers()
|
||||
for _, v := range pushers {
|
||||
if v.ID() == form.ID {
|
||||
v.Stop()
|
||||
c.IndentedJSON(200, "OK")
|
||||
|
||||
log.Printf("Stop %v success ", v)
|
||||
return
|
||||
}
|
||||
}
|
||||
c.AbortWithStatusJSON(http.StatusBadRequest, fmt.Sprintf("Pusher[%s] not found", form.ID))
|
||||
}
|
||||
|
||||
func (h *APIHandler) RecordFolders(c *gin.Context) {
|
||||
mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("")
|
||||
form := utils.NewPageForm()
|
||||
if err := c.Bind(form); err != nil {
|
||||
log.Printf("record folder bind err:%v", err)
|
||||
return
|
||||
}
|
||||
var files = make([]interface{}, 0)
|
||||
if mp4Path != "" {
|
||||
visit := func(files *[]interface{}) filepath.WalkFunc {
|
||||
return func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if path == mp4Path {
|
||||
return nil
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
*files = append(*files, map[string]interface{}{"folder": info.Name()})
|
||||
return filepath.SkipDir
|
||||
}
|
||||
}
|
||||
err := filepath.Walk(mp4Path, visit(&files))
|
||||
if err != nil {
|
||||
log.Printf("Query RecordFolders err:%v", err)
|
||||
}
|
||||
}
|
||||
pr := utils.NewPageResult(files)
|
||||
if form.Sort != "" {
|
||||
pr.Sort(form.Sort, form.Order)
|
||||
}
|
||||
pr.Slice(form.Start, form.Limit)
|
||||
c.IndentedJSON(200, pr)
|
||||
|
||||
}
|
||||
|
||||
func (h *APIHandler) RecordFiles(c *gin.Context) {
|
||||
type Form struct {
|
||||
utils.PageForm
|
||||
Folder string `form:"folder" binding:"required"`
|
||||
StartAt int `form:"beginUTCSecond"`
|
||||
StopAt int `form:"endUTCSecond"`
|
||||
}
|
||||
var form = Form{}
|
||||
form.Limit = math.MaxUint32
|
||||
err := c.Bind(&form)
|
||||
if err != nil {
|
||||
log.Printf("record file bind err:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
files := make([]interface{}, 0)
|
||||
mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("")
|
||||
if mp4Path != "" {
|
||||
ffmpeg_path := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("")
|
||||
ffmpeg_folder, executable := filepath.Split(ffmpeg_path)
|
||||
split := strings.Split(executable, ".")
|
||||
suffix := ""
|
||||
if len(split) > 1 {
|
||||
suffix = split[1]
|
||||
}
|
||||
ffprobe := ffmpeg_folder + "ffprobe" + suffix
|
||||
folder := filepath.Join(mp4Path, form.Folder)
|
||||
visit := func(files *[]interface{}) filepath.WalkFunc {
|
||||
return func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if path == folder {
|
||||
return nil
|
||||
}
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
if info.Size() == 0 {
|
||||
return nil
|
||||
}
|
||||
if info.Name() == ".DS_Store" {
|
||||
return nil
|
||||
}
|
||||
cmd := exec.Command(ffprobe, "-i", path)
|
||||
cmdOutput := &bytes.Buffer{}
|
||||
//cmd.Stdout = cmdOutput
|
||||
cmd.Stderr = cmdOutput
|
||||
err = cmd.Run()
|
||||
bytes := cmdOutput.Bytes()
|
||||
output := string(bytes)
|
||||
//log.Printf("%v result:%v", cmd, output)
|
||||
var average = regexp.MustCompile(`Duration: ((\d+):(\d+):(\d+).(\d+))`)
|
||||
result := average.FindStringSubmatch(output)
|
||||
duration := time.Duration(0)
|
||||
durationStr := ""
|
||||
if len(result) > 0 {
|
||||
durationStr = result[1]
|
||||
h, _ := strconv.Atoi(result[2])
|
||||
duration += time.Duration(h) * time.Hour
|
||||
m, _ := strconv.Atoi(result[3])
|
||||
duration += time.Duration(m) * time.Minute
|
||||
s, _ := strconv.Atoi(result[4])
|
||||
duration += time.Duration(s) * time.Second
|
||||
millis, _ := strconv.Atoi(result[5])
|
||||
duration += time.Duration(millis) * time.Millisecond
|
||||
}
|
||||
*files = append(*files, map[string]interface{}{
|
||||
"name": info.Name(),
|
||||
"durationMillis": duration / time.Millisecond,
|
||||
"duration": durationStr})
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err = filepath.Walk(folder, visit(&files))
|
||||
if err != nil {
|
||||
log.Printf("Query RecordFolders err:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
pr := utils.NewPageResult(files)
|
||||
if form.Sort != "" {
|
||||
pr.Sort(form.Sort, form.Order)
|
||||
}
|
||||
pr.Slice(form.Start, form.Limit)
|
||||
c.IndentedJSON(200, pr)
|
||||
}
|
||||
|
|
|
@ -231,6 +231,14 @@ func (pusher *Pusher) Start() {
|
|||
}
|
||||
}
|
||||
|
||||
func (pusher *Pusher) Stop() {
|
||||
if pusher.Session != nil {
|
||||
pusher.Session.Stop()
|
||||
return
|
||||
}
|
||||
pusher.RTSPClient.Stop()
|
||||
}
|
||||
|
||||
func (pusher *Pusher) BroadcastRTP(pack *RTPPack) *Pusher {
|
||||
for _, player := range pusher.GetPlayers() {
|
||||
player.QueueRTP(pack)
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/penggy/EasyGoLib/utils"
|
||||
|
||||
"github.com/pixelbender/go-sdp/sdp"
|
||||
"github.com/reactivex/rxgo/observable"
|
||||
)
|
||||
|
||||
type RTSPClient struct {
|
||||
|
@ -77,9 +76,14 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) *RTSPC
|
|||
return client
|
||||
}
|
||||
|
||||
func (client *RTSPClient) Start() observable.Observable {
|
||||
source := make(chan interface{})
|
||||
requestStream := func() interface{} {
|
||||
func (client *RTSPClient) Start(timeout time.Duration) error {
|
||||
//source := make(chan interface{})
|
||||
|
||||
if timeout == 0 {
|
||||
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
|
||||
timeout = time.Duration(timeoutMillis) * time.Millisecond
|
||||
}
|
||||
requestStream := func() error {
|
||||
l, err := url.Parse(client.URL)
|
||||
setStatus := func() {
|
||||
if err != nil {
|
||||
|
@ -92,11 +96,19 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if strings.ToLower(l.Scheme) != "rtsp" {
|
||||
err = fmt.Errorf("RTSP url is invalid")
|
||||
return err
|
||||
}
|
||||
if strings.ToLower(l.Hostname()) == "" {
|
||||
err = fmt.Errorf("RTSP url is invalid")
|
||||
return err
|
||||
}
|
||||
port := l.Port()
|
||||
if len(port) == 0 {
|
||||
port = "554"
|
||||
}
|
||||
conn, err := net.Dial("tcp", l.Hostname()+":"+port)
|
||||
conn, err := net.DialTimeout("tcp", l.Hostname()+":"+port, timeout)
|
||||
if err != nil {
|
||||
// handle error
|
||||
return err
|
||||
|
@ -104,11 +116,10 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
client.Conn = conn
|
||||
|
||||
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576)
|
||||
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
|
||||
|
||||
timeoutConn := RichConn{
|
||||
conn,
|
||||
time.Duration(timeoutMillis) * time.Millisecond,
|
||||
timeout,
|
||||
}
|
||||
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer))
|
||||
|
||||
|
@ -175,17 +186,14 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return 0
|
||||
return nil
|
||||
}
|
||||
stream := func(ch chan interface{}) {
|
||||
|
||||
stream := func() {
|
||||
OptionIntervalMillis := client.OptionIntervalMillis
|
||||
startTime := time.Now()
|
||||
loggerTime := time.Now().Add(-10 * time.Second)
|
||||
defer func() {
|
||||
if client.Stoped {
|
||||
close(ch)
|
||||
}
|
||||
}()
|
||||
defer client.Stop()
|
||||
for !client.Stoped {
|
||||
if OptionIntervalMillis > 0 {
|
||||
elapse := time.Now().Sub(startTime)
|
||||
|
@ -205,7 +213,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
if err != nil {
|
||||
if !client.Stoped {
|
||||
log.Printf("client.connRW.ReadByte err:%v", err)
|
||||
ch <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -217,7 +224,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
if err != nil {
|
||||
|
||||
if !client.Stoped {
|
||||
ch <- err
|
||||
log.Printf("io.ReadFull err:%v", err)
|
||||
}
|
||||
return
|
||||
|
@ -228,7 +234,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
_, err = io.ReadFull(client.connRW, content)
|
||||
if err != nil {
|
||||
if !client.Stoped {
|
||||
ch <- err
|
||||
log.Printf("io.ReadFull err:%v", err)
|
||||
}
|
||||
return
|
||||
|
@ -267,7 +272,7 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
}
|
||||
elapsed := time.Now().Sub(loggerTime)
|
||||
if elapsed >= 10*time.Second {
|
||||
log.Printf("client[%v]read rtp frame.", client)
|
||||
log.Printf("%v read rtp frame.", client)
|
||||
loggerTime = time.Now()
|
||||
}
|
||||
client.InBytes += int(length + 4)
|
||||
|
@ -283,7 +288,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
line, prefix, err := client.connRW.ReadLine()
|
||||
if err != nil {
|
||||
if !client.Stoped {
|
||||
ch <- err
|
||||
log.Printf("client.connRW.ReadLine err:%v", err)
|
||||
}
|
||||
return
|
||||
|
@ -295,7 +299,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
if err != nil {
|
||||
if !client.Stoped {
|
||||
err = fmt.Errorf("Read content err.ContentLength:%d", contentLen)
|
||||
ch <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -316,7 +319,6 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1]))
|
||||
if err != nil {
|
||||
if !client.Stoped {
|
||||
ch <- err
|
||||
log.Printf("strconv.Atoi err:%v, str:%v", err, splits[1])
|
||||
}
|
||||
return
|
||||
|
@ -326,19 +328,25 @@ func (client *RTSPClient) Start() observable.Observable {
|
|||
}
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
defer client.Stop()
|
||||
r := requestStream()
|
||||
source <- r
|
||||
switch r.(type) {
|
||||
case error:
|
||||
return
|
||||
}
|
||||
stream(source)
|
||||
}()
|
||||
return observable.Observable(source)
|
||||
//go func() {
|
||||
// defer client.Stop()
|
||||
// r := requestStream()
|
||||
// source <- r
|
||||
// switch r.(type) {
|
||||
// case error:
|
||||
// return
|
||||
// }
|
||||
// stream(source)
|
||||
//}()
|
||||
//return observable.Observable(source)
|
||||
|
||||
//return observable.Just(1)
|
||||
err := requestStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go stream()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *RTSPClient) Stop() {
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2016 Joe Chasinga
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
|
@ -1,415 +0,0 @@
|
|||
package observable
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/reactivex/rxgo"
|
||||
"github.com/reactivex/rxgo/errors"
|
||||
"github.com/reactivex/rxgo/fx"
|
||||
"github.com/reactivex/rxgo/handlers"
|
||||
"github.com/reactivex/rxgo/observer"
|
||||
"github.com/reactivex/rxgo/subscription"
|
||||
)
|
||||
|
||||
// Observable is a basic observable channel
|
||||
type Observable <-chan interface{}
|
||||
|
||||
var DefaultObservable = make(Observable)
|
||||
|
||||
// New creates an Observable
|
||||
func New(buffer uint) Observable {
|
||||
return make(Observable, int(buffer))
|
||||
}
|
||||
|
||||
// CheckHandler checks the underlying type of an EventHandler.
|
||||
func CheckEventHandler(handler rx.EventHandler) observer.Observer {
|
||||
ob := observer.DefaultObserver
|
||||
switch handler := handler.(type) {
|
||||
case handlers.NextFunc:
|
||||
ob.NextHandler = handler
|
||||
case handlers.ErrFunc:
|
||||
ob.ErrHandler = handler
|
||||
case handlers.DoneFunc:
|
||||
ob.DoneHandler = handler
|
||||
case observer.Observer:
|
||||
ob = handler
|
||||
}
|
||||
return ob
|
||||
}
|
||||
|
||||
// Next returns the next item on the Observable.
|
||||
func (o Observable) Next() (interface{}, error) {
|
||||
if next, ok := <-o; ok {
|
||||
return next, nil
|
||||
}
|
||||
return nil, errors.New(errors.EndOfIteratorError)
|
||||
}
|
||||
|
||||
// Subscribe subscribes an EventHandler and returns a Subscription channel.
|
||||
func (o Observable) Subscribe(handler rx.EventHandler) <-chan subscription.Subscription {
|
||||
done := make(chan subscription.Subscription)
|
||||
sub := subscription.New().Subscribe()
|
||||
|
||||
ob := CheckEventHandler(handler)
|
||||
|
||||
go func() {
|
||||
OuterLoop:
|
||||
for item := range o {
|
||||
switch item := item.(type) {
|
||||
case error:
|
||||
ob.OnError(item)
|
||||
|
||||
// Record the error and break the loop.
|
||||
sub.Error = item
|
||||
break OuterLoop
|
||||
default:
|
||||
ob.OnNext(item)
|
||||
}
|
||||
}
|
||||
|
||||
// OnDone only gets executed if there's no error.
|
||||
if sub.Error == nil {
|
||||
ob.OnDone()
|
||||
}
|
||||
|
||||
done <- sub.Unsubscribe()
|
||||
return
|
||||
}()
|
||||
|
||||
return done
|
||||
}
|
||||
|
||||
/*
|
||||
func (o Observable) Unsubscribe() subscription.Subscription {
|
||||
// Stub: to be implemented
|
||||
return subscription.New()
|
||||
}
|
||||
*/
|
||||
|
||||
// Map maps a MappableFunc predicate to each item in Observable and
|
||||
// returns a new Observable with applied items.
|
||||
func (o Observable) Map(apply fx.MappableFunc) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
for item := range o {
|
||||
out <- apply(item)
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// Take takes first n items in the original Obserable and returns
|
||||
// a new Observable with the taken items.
|
||||
func (o Observable) Take(nth uint) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
takeCount := 0
|
||||
for item := range o {
|
||||
if (takeCount < int(nth)) {
|
||||
takeCount += 1
|
||||
out <- item
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// TakeLast takes last n items in the original Observable and returns
|
||||
// a new Observable with the taken items.
|
||||
func (o Observable) TakeLast(nth uint) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
buf := make([]interface{}, nth)
|
||||
for item := range o {
|
||||
if (len(buf) >= int(nth)) {
|
||||
buf = buf[1:]
|
||||
}
|
||||
buf = append(buf, item)
|
||||
}
|
||||
for _, takenItem := range buf {
|
||||
out <- takenItem
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// Filter filters items in the original Observable and returns
|
||||
// a new Observable with the filtered items.
|
||||
func (o Observable) Filter(apply fx.FilterableFunc) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
for item := range o {
|
||||
if apply(item) {
|
||||
out <- item
|
||||
}
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// First returns new Observable which emit only first item.
|
||||
func (o Observable) First() Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
for item := range o {
|
||||
out <- item
|
||||
break
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// Last returns a new Observable which emit only last item.
|
||||
func (o Observable) Last() Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
var last interface{}
|
||||
for item := range o {
|
||||
last = item
|
||||
}
|
||||
out <- last
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// Distinct suppresses duplicate items in the original Observable and returns
|
||||
// a new Observable.
|
||||
func (o Observable) Distinct(apply fx.KeySelectorFunc) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
keysets := make(map[interface{}]struct{})
|
||||
for item := range o {
|
||||
key := apply(item)
|
||||
_, ok := keysets[key]
|
||||
if !ok {
|
||||
out <- item
|
||||
}
|
||||
keysets[key] = struct{}{}
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// DistinctUntilChanged suppresses consecutive duplicate items in the original
|
||||
// Observable and returns a new Observable.
|
||||
func (o Observable) DistinctUntilChanged(apply fx.KeySelectorFunc) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
var current interface{}
|
||||
for item := range o {
|
||||
key := apply(item)
|
||||
if current != key {
|
||||
out <- item
|
||||
current = key
|
||||
}
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// Skip suppresses the first n items in the original Observable and
|
||||
// returns a new Observable with the rest items.
|
||||
func (o Observable) Skip(nth uint) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
skipCount := 0
|
||||
for item := range o {
|
||||
if (skipCount < int(nth)) {
|
||||
skipCount += 1
|
||||
continue
|
||||
}
|
||||
out <- item
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// SkipLast suppresses the last n items in the original Observable and
|
||||
// returns a new Observable with the rest items.
|
||||
func (o Observable) SkipLast(nth uint) Observable {
|
||||
out := make(chan interface{})
|
||||
go func() {
|
||||
buf := make(chan interface{}, nth)
|
||||
for item := range o {
|
||||
select {
|
||||
case buf <- item:
|
||||
default:
|
||||
out <- (<- buf)
|
||||
buf <- item
|
||||
}
|
||||
}
|
||||
close(buf)
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
|
||||
// Scan applies ScannableFunc predicate to each item in the original
|
||||
// Observable sequentially and emits each successive value on a new Observable.
|
||||
func (o Observable) Scan(apply fx.ScannableFunc) Observable {
|
||||
out := make(chan interface{})
|
||||
|
||||
go func() {
|
||||
var current interface{}
|
||||
for item := range o {
|
||||
out <- apply(current, item)
|
||||
current = apply(current, item)
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return Observable(out)
|
||||
}
|
||||
|
||||
// From creates a new Observable from an Iterator.
|
||||
func From(it rx.Iterator) Observable {
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
for {
|
||||
val, err := it.Next()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
source <- val
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// Empty creates an Observable with no item and terminate immediately.
|
||||
func Empty() Observable {
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
close(source)
|
||||
}()
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// Interval creates an Observable emitting incremental integers infinitely between
|
||||
// each given time interval.
|
||||
func Interval(term chan struct{}, interval time.Duration) Observable {
|
||||
source := make(chan interface{})
|
||||
go func(term chan struct{}) {
|
||||
i := 0
|
||||
OuterLoop:
|
||||
for {
|
||||
select {
|
||||
case <-term:
|
||||
break OuterLoop
|
||||
case <-time.After(interval):
|
||||
source <- i
|
||||
}
|
||||
i++
|
||||
}
|
||||
close(source)
|
||||
}(term)
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// Repeat creates an Observable emitting a given item repeatedly
|
||||
func Repeat(item interface{}, ntimes ...int) Observable {
|
||||
source := make(chan interface{})
|
||||
|
||||
// this is the infinity case no ntime parameter is given
|
||||
if len(ntimes) == 0 {
|
||||
go func() {
|
||||
for {
|
||||
source <- item
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// this repeat the item ntime
|
||||
if len(ntimes) > 0 {
|
||||
count := ntimes[0]
|
||||
if count <= 0 {
|
||||
return Empty()
|
||||
}
|
||||
go func() {
|
||||
for i := 0; i < count; i++ {
|
||||
source <- item
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
return Empty()
|
||||
}
|
||||
|
||||
// Range creates an Observable that emits a particular range of sequential integers.
|
||||
func Range(start, end int) Observable {
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
i := start
|
||||
for i < end {
|
||||
source <- i
|
||||
i++
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// Just creates an Observable with the provided item(s).
|
||||
func Just(item interface{}, items ...interface{}) Observable {
|
||||
source := make(chan interface{})
|
||||
if len(items) > 0 {
|
||||
items = append([]interface{}{item}, items...)
|
||||
} else {
|
||||
items = []interface{}{item}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for _, item := range items {
|
||||
source <- item
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
return Observable(source)
|
||||
}
|
||||
|
||||
// Start creates an Observable from one or more directive-like EmittableFunc
|
||||
// and emits the result of each operation asynchronously on a new Observable.
|
||||
func Start(f fx.EmittableFunc, fs ...fx.EmittableFunc) Observable {
|
||||
if len(fs) > 0 {
|
||||
fs = append([]fx.EmittableFunc{f}, fs...)
|
||||
} else {
|
||||
fs = []fx.EmittableFunc{f}
|
||||
}
|
||||
|
||||
source := make(chan interface{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, f := range fs {
|
||||
wg.Add(1)
|
||||
go func(f fx.EmittableFunc) {
|
||||
source <- f()
|
||||
wg.Done()
|
||||
}(f)
|
||||
}
|
||||
|
||||
// Wait in another goroutine to not block
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(source)
|
||||
}()
|
||||
|
||||
return Observable(source)
|
||||
}
|
|
@ -285,12 +285,6 @@
|
|||
"revision": "d0926230dda8c9e4e61040cb7825a026dee7d2d3",
|
||||
"revisionTime": "2018-09-07T02:33:35Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "wGV+EeSd5YGVLiYL36qT65GWahg=",
|
||||
"path": "github.com/reactivex/rxgo/observable",
|
||||
"revision": "e715dd83f030be66a2cbef90b842fc3caedfcc69",
|
||||
"revisionTime": "2018-10-31T19:04:19Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=",
|
||||
"path": "github.com/shirou/gopsutil/cpu",
|
||||
|
|
Loading…
Reference in New Issue