From 1b3387ca1a7a057a7eb78717ce85fb1d75af03e6 Mon Sep 17 00:00:00 2001 From: Noah Hsu Date: Mon, 20 Jun 2022 22:29:52 +0800 Subject: [PATCH] chore: aria2 notifier --- drivers/local/driver.go | 2 +- go.mod | 1 + go.sum | 2 ++ internal/aria2/add.go | 15 ++++++-- internal/aria2/aria2.go | 3 +- internal/aria2/notify.go | 72 ++++++++++++++++++++++++++++----------- internal/aria2/task.go | 12 +++++++ internal/driver/driver.go | 2 +- internal/fs/put.go | 4 +-- internal/operations/fs.go | 8 +++-- 10 files changed, 92 insertions(+), 29 deletions(-) create mode 100644 internal/aria2/task.go diff --git a/drivers/local/driver.go b/drivers/local/driver.go index 0358c9d1..c235a326 100644 --- a/drivers/local/driver.go +++ b/drivers/local/driver.go @@ -78,7 +78,7 @@ func (d *Driver) Remove(ctx context.Context, obj model.Obj) error { panic("implement me") } -func (d *Driver) Put(ctx context.Context, parentDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { +func (d *Driver) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { //TODO implement me panic("implement me") } diff --git a/go.mod b/go.mod index a742e9e1..8564ffd4 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.11.0 // indirect github.com/goccy/go-json v0.9.7 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/go.sum b/go.sum index 568e0669..8e1ab040 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= diff --git a/internal/aria2/add.go b/internal/aria2/add.go index 802120dc..716ee576 100644 --- a/internal/aria2/add.go +++ b/internal/aria2/add.go @@ -2,13 +2,16 @@ package aria2 import ( "context" + "github.com/alist-org/alist/v3/conf" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/operations" + "github.com/google/uuid" "github.com/pkg/errors" + "path/filepath" ) -func AddURI(ctx context.Context, uri []string, dstPath string, parentPath string) error { +func AddURI(ctx context.Context, uri string, dstPath string, parentPath string) error { // check account account, actualParentPath, err := operations.GetAccountAndActualPath(parentPath) if err != nil { @@ -30,6 +33,14 @@ func AddURI(ctx context.Context, uri []string, dstPath string, parentPath string return errors.WithStack(fs.ErrNotFolder) } } - // add aria2 task + // call aria2 rpc + options := map[string]interface{}{ + "dir": filepath.Join(conf.Conf.TempDir, "aria2", uuid.NewString()), + } + gid, err := client.AddURI([]string{uri}, options) + if err != nil { + return errors.Wrapf(err, "failed to add uri %s", uri) + } + // TODO add to task manager return nil } diff --git a/internal/aria2/aria2.go b/internal/aria2/aria2.go index 2710e168..a07de404 100644 --- a/internal/aria2/aria2.go +++ b/internal/aria2/aria2.go @@ -9,10 +9,11 @@ import ( ) var Aria2TaskManager = task.NewTaskManager() +var notify = NewNotify() var client rpc.Client func InitAria2Client(uri string, secret string, timeout int) error { - c, err := rpc.New(context.Background(), uri, secret, time.Duration(timeout)*time.Second, &Notify{}) + c, err := rpc.New(context.Background(), uri, secret, time.Duration(timeout)*time.Second, notify) if err != nil { return errors.Wrap(err, "failed to init aria2 client") } diff --git a/internal/aria2/notify.go b/internal/aria2/notify.go index e03e9181..056fe514 100644 --- a/internal/aria2/notify.go +++ b/internal/aria2/notify.go @@ -1,36 +1,70 @@ package aria2 -import "github.com/alist-org/alist/v3/pkg/aria2/rpc" +import ( + "github.com/alist-org/alist/v3/pkg/aria2/rpc" + "github.com/alist-org/alist/v3/pkg/generic_sync" +) + +const ( + Downloading = iota + Paused + Stopped + Completed + Errored +) type Notify struct { + Signals generic_sync.MapOf[string, chan int] } -func (n Notify) OnDownloadStart(events []rpc.Event) { - //TODO update task status - panic("implement me") +func NewNotify() *Notify { + return &Notify{Signals: generic_sync.MapOf[string, chan int]{}} } -func (n Notify) OnDownloadPause(events []rpc.Event) { - //TODO update task status - panic("implement me") +func (n *Notify) OnDownloadStart(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Downloading + } + } } -func (n Notify) OnDownloadStop(events []rpc.Event) { - //TODO update task status - panic("implement me") +func (n *Notify) OnDownloadPause(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Paused + } + } } -func (n Notify) OnDownloadComplete(events []rpc.Event) { - //TODO get files and upload them - panic("implement me") +func (n *Notify) OnDownloadStop(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Stopped + } + } } -func (n Notify) OnDownloadError(events []rpc.Event) { - //TODO update task status - panic("implement me") +func (n *Notify) OnDownloadComplete(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Completed + } + } } -func (n Notify) OnBtDownloadComplete(events []rpc.Event) { - //TODO get files and upload them - panic("implement me") +func (n *Notify) OnDownloadError(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Errored + } + } +} + +func (n *Notify) OnBtDownloadComplete(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Completed + } + } } diff --git a/internal/aria2/task.go b/internal/aria2/task.go new file mode 100644 index 00000000..aed28fc9 --- /dev/null +++ b/internal/aria2/task.go @@ -0,0 +1,12 @@ +package aria2 + +import ( + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/pkg/task" +) + +type Task struct { + Account driver.Driver + ParentDir string + T task.Task +} diff --git a/internal/driver/driver.go b/internal/driver/driver.go index f794c4ea..c3707d29 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -50,7 +50,7 @@ type Writer interface { // Remove remove `object` Remove(ctx context.Context, obj model.Obj) error // Put upload `stream` to `parentDir` - Put(ctx context.Context, parentDir model.Obj, stream model.FileStreamer, up UpdateProgress) error + Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up UpdateProgress) error } type UpdateProgress func(percentage int) diff --git a/internal/fs/put.go b/internal/fs/put.go index a6939c43..12123186 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -13,8 +13,8 @@ import ( var UploadTaskManager = task.NewTaskManager() // Put add as a put task -func Put(ctx context.Context, account driver.Driver, parentPath string, file model.FileStreamer) error { - account, actualParentPath, err := operations.GetAccountAndActualPath(parentPath) +func Put(ctx context.Context, account driver.Driver, dstDir string, file model.FileStreamer) error { + account, actualParentPath, err := operations.GetAccountAndActualPath(dstDir) if account.Config().NoUpload { return errors.WithStack(ErrUploadNotSupported) } diff --git a/internal/operations/fs.go b/internal/operations/fs.go index 7b9f48eb..21865616 100644 --- a/internal/operations/fs.go +++ b/internal/operations/fs.go @@ -20,7 +20,7 @@ var filesCache = cache.NewMemCache(cache.WithShards[[]model.Obj](64)) var filesG singleflight.Group[[]model.Obj] // List files in storage, not contains virtual file -func List(ctx context.Context, account driver.Driver, path string) ([]model.Obj, error) { +func List(ctx context.Context, account driver.Driver, path string, refresh ...bool) ([]model.Obj, error) { dir, err := Get(ctx, account, path) if err != nil { return nil, errors.WithMessage(err, "failed get dir") @@ -29,8 +29,10 @@ func List(ctx context.Context, account driver.Driver, path string) ([]model.Obj, return account.List(ctx, dir) } key := stdpath.Join(account.GetAccount().VirtualPath, path) - if files, ok := filesCache.Get(key); ok { - return files, nil + if len(refresh) == 0 || !refresh[0] { + if files, ok := filesCache.Get(key); ok { + return files, nil + } } files, err, _ := filesG.Do(key, func() ([]model.Obj, error) { files, err := account.List(ctx, dir)