diff --git a/internal/search/build.go b/internal/search/build.go index 922438b8..868c6ae3 100644 --- a/internal/search/build.go +++ b/internal/search/build.go @@ -4,16 +4,20 @@ import ( "context" "path" "path/filepath" + "strings" + "sync/atomic" "time" "github.com/alist-org/alist/v3/internal/db" "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/pkg/mq" + "github.com/alist-org/alist/v3/pkg/utils" log "github.com/sirupsen/logrus" ) var ( - Running = false + Running = atomic.Bool{} Quit chan struct{} ) @@ -22,8 +26,9 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth if err != nil { return err } + var skipDrivers = []string{"AList V2", "AList V3"} for _, storage := range storages { - if storage.Driver == "AList V2" || storage.Driver == "AList V3" { + if utils.SliceContains(skipDrivers, storage.Driver) { // TODO: request for indexing permission ignorePaths = append(ignorePaths, storage.MountPath) } @@ -32,66 +37,72 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth objCount uint64 = 0 fi model.Obj ) - Running = true + Running.Store(true) Quit = make(chan struct{}, 1) - parents := []string{} - infos := []model.Obj{} + indexMQ := mq.NewInMemoryMQ[ObjWithParent]() go func() { ticker := time.NewTicker(5 * time.Second) for { select { case <-ticker.C: log.Infof("index obj count: %d", objCount) - if len(parents) != 0 { - log.Debugf("current index: %s", parents[len(parents)-1]) - } - if err = BatchIndex(ctx, parents, infos); err != nil { - log.Errorf("build index in batch error: %+v", err) - } else { - objCount = objCount + uint64(len(parents)) - } - if count { - WriteProgress(&model.IndexProgress{ - ObjCount: objCount, - IsDone: false, - LastDoneTime: nil, - }) - } - parents = nil - infos = nil + indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) { + if len(messages) != 0 { + log.Debugf("current index: %s", messages[len(messages)-1].Content.Parent) + } + if err = BatchIndex(ctx, utils.MustSliceConvert(messages, + func(src mq.Message[ObjWithParent]) ObjWithParent { + return src.Content + })); err != nil { + log.Errorf("build index in batch error: %+v", err) + } else { + objCount = objCount + uint64(len(messages)) + } + if count { + WriteProgress(&model.IndexProgress{ + ObjCount: objCount, + IsDone: false, + LastDoneTime: nil, + }) + } + }) + case <-Quit: - Running = false + Running.Store(false) ticker.Stop() eMsg := "" now := time.Now() originErr := err - if err = BatchIndex(ctx, parents, infos); err != nil { - log.Errorf("build index in batch error: %+v", err) - } else { - objCount = objCount + uint64(len(parents)) - } - parents = nil - infos = nil - if originErr != nil { - log.Errorf("build index error: %+v", err) - eMsg = err.Error() - } else { - log.Infof("success build index, count: %d", objCount) - } - if count { - WriteProgress(&model.IndexProgress{ - ObjCount: objCount, - IsDone: originErr == nil, - LastDoneTime: &now, - Error: eMsg, - }) - } + indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) { + if err = BatchIndex(ctx, utils.MustSliceConvert(messages, + func(src mq.Message[ObjWithParent]) ObjWithParent { + return src.Content + })); err != nil { + log.Errorf("build index in batch error: %+v", err) + } else { + objCount = objCount + uint64(len(messages)) + } + if originErr != nil { + log.Errorf("build index error: %+v", err) + eMsg = err.Error() + } else { + log.Infof("success build index, count: %d", objCount) + } + if count { + WriteProgress(&model.IndexProgress{ + ObjCount: objCount, + IsDone: true, + LastDoneTime: &now, + Error: eMsg, + }) + } + }) return } } }() defer func() { - if Running { + if Running.Load() { Quit <- struct{}{} } }() @@ -107,8 +118,11 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth } for _, indexPath := range indexPaths { walkFn := func(indexPath string, info model.Obj) error { + if !Running.Load() { + return filepath.SkipDir + } for _, avoidPath := range ignorePaths { - if indexPath == avoidPath { + if strings.HasPrefix(indexPath, avoidPath) { return filepath.SkipDir } } @@ -116,8 +130,12 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth if indexPath == "/" { return nil } - parents = append(parents, path.Dir(indexPath)) - infos = append(infos, info) + indexMQ.Publish(mq.Message[ObjWithParent]{ + Content: ObjWithParent{ + Obj: info, + Parent: path.Dir(indexPath), + }, + }) return nil } fi, err = fs.Get(ctx, indexPath) diff --git a/internal/search/search.go b/internal/search/search.go index 9b84cf2b..d9c208d0 100644 --- a/internal/search/search.go +++ b/internal/search/search.go @@ -17,13 +17,17 @@ var instance searcher.Searcher = nil // Init or reset index func Init(mode string) error { if instance != nil { + // unchanged, do nothing + if instance.Config().Name == mode { + return nil + } err := instance.Release(context.Background()) if err != nil { log.Errorf("release instance err: %+v", err) } instance = nil } - if Running { + if Running.Load() { return fmt.Errorf("index is running") } if mode == "none" { @@ -59,17 +63,22 @@ func Index(ctx context.Context, parent string, obj model.Obj) error { }) } -func BatchIndex(ctx context.Context, parents []string, objs []model.Obj) error { +type ObjWithParent struct { + Parent string + model.Obj +} + +func BatchIndex(ctx context.Context, objs []ObjWithParent) error { if instance == nil { return errs.SearchNotAvailable } - if len(parents) == 0 { + if len(objs) == 0 { return nil } - searchNodes := []model.SearchNode{} - for i := range parents { + var searchNodes []model.SearchNode + for i := range objs { searchNodes = append(searchNodes, model.SearchNode{ - Parent: parents[i], + Parent: objs[i].Parent, Name: objs[i].GetName(), IsDir: objs[i].IsDir(), Size: objs[i].GetSize(), diff --git a/internal/search/update.go b/internal/search/update.go index e512a9f9..47f982ca 100644 --- a/internal/search/update.go +++ b/internal/search/update.go @@ -11,7 +11,7 @@ import ( ) func Update(parent string, objs []model.Obj) { - if instance != nil && !instance.Config().AutoUpdate { + if instance == nil || !instance.Config().AutoUpdate || Running.Load() { return } ctx := context.Background() diff --git a/pkg/generic/queue.go b/pkg/generic/queue.go new file mode 100644 index 00000000..0ccc4bd9 --- /dev/null +++ b/pkg/generic/queue.go @@ -0,0 +1,75 @@ +package generic + +type Queue[T any] struct { + queue []T +} + +func NewQueue[T any]() *Queue[T] { + return &Queue[T]{queue: make([]T, 0)} +} + +func (q *Queue[T]) Push(v T) { + q.queue = append(q.queue, v) +} + +func (q *Queue[T]) Pop() T { + v := q.queue[0] + q.queue = q.queue[1:] + return v +} + +func (q *Queue[T]) Len() int { + return len(q.queue) +} + +func (q *Queue[T]) IsEmpty() bool { + return len(q.queue) == 0 +} + +func (q *Queue[T]) Clear() { + q.queue = nil +} + +func (q *Queue[T]) Peek() T { + return q.queue[0] +} + +func (q *Queue[T]) PeekN(n int) []T { + return q.queue[:n] +} + +func (q *Queue[T]) PopN(n int) []T { + v := q.queue[:n] + q.queue = q.queue[n:] + return v +} + +func (q *Queue[T]) PopAll() []T { + v := q.queue + q.queue = nil + return v +} + +func (q *Queue[T]) PopWhile(f func(T) bool) []T { + var i int + for i = 0; i < len(q.queue); i++ { + if !f(q.queue[i]) { + break + } + } + v := q.queue[:i] + q.queue = q.queue[i:] + return v +} + +func (q *Queue[T]) PopUntil(f func(T) bool) []T { + var i int + for i = 0; i < len(q.queue); i++ { + if f(q.queue[i]) { + break + } + } + v := q.queue[:i] + q.queue = q.queue[i:] + return v +} diff --git a/pkg/mq/mq.go b/pkg/mq/mq.go new file mode 100644 index 00000000..4c2176ae --- /dev/null +++ b/pkg/mq/mq.go @@ -0,0 +1,56 @@ +package mq + +import ( + "sync" + + "github.com/alist-org/alist/v3/pkg/generic" +) + +type Message[T any] struct { + Content T +} + +type BasicConsumer[T any] func(Message[T]) +type AllConsumer[T any] func([]Message[T]) + +type MQ[T any] interface { + Publish(Message[T]) + Consume(BasicConsumer[T]) + ConsumeAll(AllConsumer[T]) + Clear() +} + +type inMemoryMQ[T any] struct { + queue generic.Queue[Message[T]] + sync.Mutex +} + +func NewInMemoryMQ[T any]() MQ[T] { + return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()} +} + +func (mq *inMemoryMQ[T]) Publish(msg Message[T]) { + mq.Lock() + defer mq.Unlock() + mq.queue.Push(msg) +} + +func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) { + mq.Lock() + defer mq.Unlock() + for !mq.queue.IsEmpty() { + consumer(mq.queue.Pop()) + } +} + +func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) { + mq.Lock() + defer mq.Unlock() + consumer(mq.queue.PopAll()) +} + +func (mq *inMemoryMQ[T]) Clear() { + mq.Lock() + defer mq.Unlock() + mq.queue.Clear() +} diff --git a/server/handles/index.go b/server/handles/index.go index 46f7b3aa..3739da7e 100644 --- a/server/handles/index.go +++ b/server/handles/index.go @@ -21,7 +21,7 @@ func BuildIndex(c *gin.Context) { common.ErrorResp(c, err, 400) return } - if search.Running { + if search.Running.Load() { common.ErrorStrResp(c, "index is running", 400) return } @@ -41,7 +41,7 @@ func BuildIndex(c *gin.Context) { } func StopIndex(c *gin.Context) { - if !search.Running { + if !search.Running.Load() { common.ErrorStrResp(c, "index is not running", 400) return }