feat: optimize index build

pull/2587/head
Noah Hsu 2022-12-05 15:46:34 +08:00
parent bc6baf1be0
commit bd33c200dc
6 changed files with 215 additions and 57 deletions

View File

@ -4,16 +4,20 @@ import (
"context" "context"
"path" "path"
"path/filepath" "path/filepath"
"strings"
"sync/atomic"
"time" "time"
"github.com/alist-org/alist/v3/internal/db" "github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/mq"
"github.com/alist-org/alist/v3/pkg/utils"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var ( var (
Running = false Running = atomic.Bool{}
Quit chan struct{} Quit chan struct{}
) )
@ -22,8 +26,9 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
if err != nil { if err != nil {
return err return err
} }
var skipDrivers = []string{"AList V2", "AList V3"}
for _, storage := range storages { for _, storage := range storages {
if storage.Driver == "AList V2" || storage.Driver == "AList V3" { if utils.SliceContains(skipDrivers, storage.Driver) {
// TODO: request for indexing permission // TODO: request for indexing permission
ignorePaths = append(ignorePaths, storage.MountPath) ignorePaths = append(ignorePaths, storage.MountPath)
} }
@ -32,66 +37,72 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
objCount uint64 = 0 objCount uint64 = 0
fi model.Obj fi model.Obj
) )
Running = true Running.Store(true)
Quit = make(chan struct{}, 1) Quit = make(chan struct{}, 1)
parents := []string{} indexMQ := mq.NewInMemoryMQ[ObjWithParent]()
infos := []model.Obj{}
go func() { go func() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(5 * time.Second)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
log.Infof("index obj count: %d", objCount) log.Infof("index obj count: %d", objCount)
if len(parents) != 0 { indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
log.Debugf("current index: %s", parents[len(parents)-1]) if len(messages) != 0 {
} log.Debugf("current index: %s", messages[len(messages)-1].Content.Parent)
if err = BatchIndex(ctx, parents, infos); err != nil { }
log.Errorf("build index in batch error: %+v", err) if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
} else { func(src mq.Message[ObjWithParent]) ObjWithParent {
objCount = objCount + uint64(len(parents)) return src.Content
} })); err != nil {
if count { log.Errorf("build index in batch error: %+v", err)
WriteProgress(&model.IndexProgress{ } else {
ObjCount: objCount, objCount = objCount + uint64(len(messages))
IsDone: false, }
LastDoneTime: nil, if count {
}) WriteProgress(&model.IndexProgress{
} ObjCount: objCount,
parents = nil IsDone: false,
infos = nil LastDoneTime: nil,
})
}
})
case <-Quit: case <-Quit:
Running = false Running.Store(false)
ticker.Stop() ticker.Stop()
eMsg := "" eMsg := ""
now := time.Now() now := time.Now()
originErr := err originErr := err
if err = BatchIndex(ctx, parents, infos); err != nil { indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
log.Errorf("build index in batch error: %+v", err) if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
} else { func(src mq.Message[ObjWithParent]) ObjWithParent {
objCount = objCount + uint64(len(parents)) return src.Content
} })); err != nil {
parents = nil log.Errorf("build index in batch error: %+v", err)
infos = nil } else {
if originErr != nil { objCount = objCount + uint64(len(messages))
log.Errorf("build index error: %+v", err) }
eMsg = err.Error() if originErr != nil {
} else { log.Errorf("build index error: %+v", err)
log.Infof("success build index, count: %d", objCount) eMsg = err.Error()
} } else {
if count { log.Infof("success build index, count: %d", objCount)
WriteProgress(&model.IndexProgress{ }
ObjCount: objCount, if count {
IsDone: originErr == nil, WriteProgress(&model.IndexProgress{
LastDoneTime: &now, ObjCount: objCount,
Error: eMsg, IsDone: true,
}) LastDoneTime: &now,
} Error: eMsg,
})
}
})
return return
} }
} }
}() }()
defer func() { defer func() {
if Running { if Running.Load() {
Quit <- struct{}{} Quit <- struct{}{}
} }
}() }()
@ -107,8 +118,11 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
} }
for _, indexPath := range indexPaths { for _, indexPath := range indexPaths {
walkFn := func(indexPath string, info model.Obj) error { walkFn := func(indexPath string, info model.Obj) error {
if !Running.Load() {
return filepath.SkipDir
}
for _, avoidPath := range ignorePaths { for _, avoidPath := range ignorePaths {
if indexPath == avoidPath { if strings.HasPrefix(indexPath, avoidPath) {
return filepath.SkipDir return filepath.SkipDir
} }
} }
@ -116,8 +130,12 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
if indexPath == "/" { if indexPath == "/" {
return nil return nil
} }
parents = append(parents, path.Dir(indexPath)) indexMQ.Publish(mq.Message[ObjWithParent]{
infos = append(infos, info) Content: ObjWithParent{
Obj: info,
Parent: path.Dir(indexPath),
},
})
return nil return nil
} }
fi, err = fs.Get(ctx, indexPath) fi, err = fs.Get(ctx, indexPath)

View File

@ -17,13 +17,17 @@ var instance searcher.Searcher = nil
// Init or reset index // Init or reset index
func Init(mode string) error { func Init(mode string) error {
if instance != nil { if instance != nil {
// unchanged, do nothing
if instance.Config().Name == mode {
return nil
}
err := instance.Release(context.Background()) err := instance.Release(context.Background())
if err != nil { if err != nil {
log.Errorf("release instance err: %+v", err) log.Errorf("release instance err: %+v", err)
} }
instance = nil instance = nil
} }
if Running { if Running.Load() {
return fmt.Errorf("index is running") return fmt.Errorf("index is running")
} }
if mode == "none" { if mode == "none" {
@ -59,17 +63,22 @@ func Index(ctx context.Context, parent string, obj model.Obj) error {
}) })
} }
func BatchIndex(ctx context.Context, parents []string, objs []model.Obj) error { type ObjWithParent struct {
Parent string
model.Obj
}
func BatchIndex(ctx context.Context, objs []ObjWithParent) error {
if instance == nil { if instance == nil {
return errs.SearchNotAvailable return errs.SearchNotAvailable
} }
if len(parents) == 0 { if len(objs) == 0 {
return nil return nil
} }
searchNodes := []model.SearchNode{} var searchNodes []model.SearchNode
for i := range parents { for i := range objs {
searchNodes = append(searchNodes, model.SearchNode{ searchNodes = append(searchNodes, model.SearchNode{
Parent: parents[i], Parent: objs[i].Parent,
Name: objs[i].GetName(), Name: objs[i].GetName(),
IsDir: objs[i].IsDir(), IsDir: objs[i].IsDir(),
Size: objs[i].GetSize(), Size: objs[i].GetSize(),

View File

@ -11,7 +11,7 @@ import (
) )
func Update(parent string, objs []model.Obj) { func Update(parent string, objs []model.Obj) {
if instance != nil && !instance.Config().AutoUpdate { if instance == nil || !instance.Config().AutoUpdate || Running.Load() {
return return
} }
ctx := context.Background() ctx := context.Background()

75
pkg/generic/queue.go Normal file
View File

@ -0,0 +1,75 @@
package generic
type Queue[T any] struct {
queue []T
}
func NewQueue[T any]() *Queue[T] {
return &Queue[T]{queue: make([]T, 0)}
}
func (q *Queue[T]) Push(v T) {
q.queue = append(q.queue, v)
}
func (q *Queue[T]) Pop() T {
v := q.queue[0]
q.queue = q.queue[1:]
return v
}
func (q *Queue[T]) Len() int {
return len(q.queue)
}
func (q *Queue[T]) IsEmpty() bool {
return len(q.queue) == 0
}
func (q *Queue[T]) Clear() {
q.queue = nil
}
func (q *Queue[T]) Peek() T {
return q.queue[0]
}
func (q *Queue[T]) PeekN(n int) []T {
return q.queue[:n]
}
func (q *Queue[T]) PopN(n int) []T {
v := q.queue[:n]
q.queue = q.queue[n:]
return v
}
func (q *Queue[T]) PopAll() []T {
v := q.queue
q.queue = nil
return v
}
func (q *Queue[T]) PopWhile(f func(T) bool) []T {
var i int
for i = 0; i < len(q.queue); i++ {
if !f(q.queue[i]) {
break
}
}
v := q.queue[:i]
q.queue = q.queue[i:]
return v
}
func (q *Queue[T]) PopUntil(f func(T) bool) []T {
var i int
for i = 0; i < len(q.queue); i++ {
if f(q.queue[i]) {
break
}
}
v := q.queue[:i]
q.queue = q.queue[i:]
return v
}

56
pkg/mq/mq.go Normal file
View File

@ -0,0 +1,56 @@
package mq
import (
"sync"
"github.com/alist-org/alist/v3/pkg/generic"
)
type Message[T any] struct {
Content T
}
type BasicConsumer[T any] func(Message[T])
type AllConsumer[T any] func([]Message[T])
type MQ[T any] interface {
Publish(Message[T])
Consume(BasicConsumer[T])
ConsumeAll(AllConsumer[T])
Clear()
}
type inMemoryMQ[T any] struct {
queue generic.Queue[Message[T]]
sync.Mutex
}
func NewInMemoryMQ[T any]() MQ[T] {
return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()}
}
func (mq *inMemoryMQ[T]) Publish(msg Message[T]) {
mq.Lock()
defer mq.Unlock()
mq.queue.Push(msg)
}
func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) {
mq.Lock()
defer mq.Unlock()
for !mq.queue.IsEmpty() {
consumer(mq.queue.Pop())
}
}
func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) {
mq.Lock()
defer mq.Unlock()
consumer(mq.queue.PopAll())
}
func (mq *inMemoryMQ[T]) Clear() {
mq.Lock()
defer mq.Unlock()
mq.queue.Clear()
}

View File

@ -21,7 +21,7 @@ func BuildIndex(c *gin.Context) {
common.ErrorResp(c, err, 400) common.ErrorResp(c, err, 400)
return return
} }
if search.Running { if search.Running.Load() {
common.ErrorStrResp(c, "index is running", 400) common.ErrorStrResp(c, "index is running", 400)
return return
} }
@ -41,7 +41,7 @@ func BuildIndex(c *gin.Context) {
} }
func StopIndex(c *gin.Context) { func StopIndex(c *gin.Context) {
if !search.Running { if !search.Running.Load() {
common.ErrorStrResp(c, "index is not running", 400) common.ErrorStrResp(c, "index is not running", 400)
return return
} }