chore: add state for task

refactor/fs
Noah Hsu 2022-06-23 21:09:54 +08:00
parent aedcae840d
commit 6c61f1d261
4 changed files with 37 additions and 30 deletions

View File

@ -61,10 +61,10 @@ func TestDown(t *testing.T) {
for { for {
tsk := tasks[0] tsk := tasks[0]
t.Logf("task: %+v", tsk) t.Logf("task: %+v", tsk)
if tsk.Status == task.FINISHED { if tsk.GetState() == task.FINISHED {
break break
} }
if tsk.Status == task.ERRORED { if tsk.GetState() == task.ERRORED {
t.Fatalf("failed to download: %+v", tsk) t.Fatalf("failed to download: %+v", tsk)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
@ -75,10 +75,10 @@ func TestDown(t *testing.T) {
} }
tsk := transferTaskManager.GetAll()[0] tsk := transferTaskManager.GetAll()[0]
t.Logf("task: %+v", tsk) t.Logf("task: %+v", tsk)
if tsk.Status == task.FINISHED { if tsk.GetState() == task.FINISHED {
break break
} }
if tsk.Status == task.ERRORED { if tsk.GetState() == task.ERRORED {
t.Fatalf("failed to download: %+v", tsk) t.Fatalf("failed to download: %+v", tsk)
} }
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -2,6 +2,7 @@ package task
import ( import (
"github.com/alist-org/alist/v3/pkg/generic_sync" "github.com/alist-org/alist/v3/pkg/generic_sync"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -52,7 +53,7 @@ func (tm *Manager[K]) MustGet(tid K) *Task[K] {
func (tm *Manager[K]) Retry(tid K) error { func (tm *Manager[K]) Retry(tid K) error {
t, ok := tm.Get(tid) t, ok := tm.Get(tid)
if !ok { if !ok {
return ErrTaskNotFound return errors.WithStack(ErrTaskNotFound)
} }
tm.do(t) tm.do(t)
return nil return nil
@ -61,7 +62,7 @@ func (tm *Manager[K]) Retry(tid K) error {
func (tm *Manager[K]) Cancel(tid K) error { func (tm *Manager[K]) Cancel(tid K) error {
t, ok := tm.Get(tid) t, ok := tm.Get(tid)
if !ok { if !ok {
return ErrTaskNotFound return errors.WithStack(ErrTaskNotFound)
} }
t.Cancel() t.Cancel()
return nil return nil
@ -80,7 +81,7 @@ func (tm *Manager[K]) RemoveAll() {
func (tm *Manager[K]) RemoveFinished() { func (tm *Manager[K]) RemoveFinished() {
tasks := tm.GetAll() tasks := tm.GetAll()
for _, task := range tasks { for _, task := range tasks {
if task.Status == FINISHED { if task.state == FINISHED {
tm.Remove(task.ID) tm.Remove(task.ID)
} }
} }

View File

@ -22,32 +22,38 @@ type Callback[K comparable] func(task *Task[K])
type Task[K comparable] struct { type Task[K comparable] struct {
ID K ID K
Name string Name string
Status string state string // pending, running, finished, canceling, canceled, errored
status string
progress int
Error error Error error
Func Func[K] Func Func[K]
callback Callback[K] callback Callback[K]
Ctx context.Context Ctx context.Context
progress int
cancel context.CancelFunc cancel context.CancelFunc
} }
func (t *Task[K]) SetStatus(status string) { func (t *Task[K]) SetStatus(status string) {
t.Status = status t.status = status
} }
func (t *Task[K]) SetProgress(percentage int) { func (t *Task[K]) SetProgress(percentage int) {
t.progress = percentage t.progress = percentage
} }
func (t *Task[K]) GetState() string {
return t.state
}
func (t *Task[K]) run() { func (t *Task[K]) run() {
t.Status = RUNNING t.state = RUNNING
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Errorf("error [%+v] while run task [%s]", err, t.Name) log.Errorf("error [%+v] while run task [%s]", err, t.Name)
t.Error = errors.Errorf("panic: %+v", err) t.Error = errors.Errorf("panic: %+v", err)
t.Status = ERRORED t.state = ERRORED
} }
}() }()
t.Error = t.Func(t) t.Error = t.Func(t)
@ -55,11 +61,11 @@ func (t *Task[K]) run() {
log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name) log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name)
} }
if errors.Is(t.Ctx.Err(), context.Canceled) { if errors.Is(t.Ctx.Err(), context.Canceled) {
t.Status = CANCELED t.state = CANCELED
} else if t.Error != nil { } else if t.Error != nil {
t.Status = ERRORED t.state = ERRORED
} else { } else {
t.Status = FINISHED t.state = FINISHED
if t.callback != nil { if t.callback != nil {
t.callback(t) t.callback(t)
} }
@ -71,20 +77,20 @@ func (t *Task[K]) retry() {
} }
func (t *Task[K]) Cancel() { func (t *Task[K]) Cancel() {
if t.Status == FINISHED || t.Status == CANCELED { if t.state == FINISHED || t.state == CANCELED {
return return
} }
if t.cancel != nil { if t.cancel != nil {
t.cancel() t.cancel()
} }
// maybe can't cancel // maybe can't cancel
t.Status = CANCELING t.state = CANCELING
} }
func WithCancelCtx[K comparable](task *Task[K]) *Task[K] { func WithCancelCtx[K comparable](task *Task[K]) *Task[K] {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
task.Ctx = ctx task.Ctx = ctx
task.cancel = cancel task.cancel = cancel
task.Status = PENDING task.state = PENDING
return task return task
} }

View File

@ -24,12 +24,12 @@ func TestTask_Manager(t *testing.T) {
t.Fatal("task not found") t.Fatal("task not found")
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if task.Status != RUNNING { if task.state != RUNNING {
t.Errorf("task status not running: %s", task.Status) t.Errorf("task status not running: %s", task.state)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
if task.Status != FINISHED { if task.state != FINISHED {
t.Errorf("task status not finished: %s", task.Status) t.Errorf("task status not finished: %s", task.state)
} }
} }
@ -56,8 +56,8 @@ func TestTask_Cancel(t *testing.T) {
time.Sleep(time.Microsecond * 50) time.Sleep(time.Microsecond * 50)
task.Cancel() task.Cancel()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if task.Status != CANCELED { if task.state != CANCELED {
t.Errorf("task status not canceled: %s", task.Status) t.Errorf("task status not canceled: %s", task.state)
} }
} }
@ -82,7 +82,7 @@ func TestTask_Retry(t *testing.T) {
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if task.Error == nil { if task.Error == nil {
t.Error(task.Status) t.Error(task.state)
t.Fatal("task error is nil, but expected error") t.Fatal("task error is nil, but expected error")
} else { } else {
t.Logf("task error: %s", task.Error) t.Logf("task error: %s", task.Error)