mirror of https://github.com/Xhofe/alist
273 lines
6.7 KiB
Go
273 lines
6.7 KiB
Go
package search
|
|
|
|
import (
|
|
"context"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/alist-org/alist/v3/internal/conf"
|
|
"github.com/alist-org/alist/v3/internal/errs"
|
|
"github.com/alist-org/alist/v3/internal/fs"
|
|
"github.com/alist-org/alist/v3/internal/model"
|
|
"github.com/alist-org/alist/v3/internal/op"
|
|
"github.com/alist-org/alist/v3/internal/search/searcher"
|
|
"github.com/alist-org/alist/v3/internal/setting"
|
|
"github.com/alist-org/alist/v3/pkg/mq"
|
|
"github.com/alist-org/alist/v3/pkg/utils"
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var (
|
|
Quit = atomic.Pointer[chan struct{}]{}
|
|
)
|
|
|
|
func Running() bool {
|
|
return Quit.Load() != nil
|
|
}
|
|
|
|
func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth int, count bool) error {
|
|
var (
|
|
err error
|
|
objCount uint64 = 0
|
|
fi model.Obj
|
|
)
|
|
log.Infof("build index for: %+v", indexPaths)
|
|
log.Infof("ignore paths: %+v", ignorePaths)
|
|
quit := make(chan struct{}, 1)
|
|
if !Quit.CompareAndSwap(nil, &quit) {
|
|
// other goroutine is running
|
|
return errs.BuildIndexIsRunning
|
|
}
|
|
var (
|
|
indexMQ = mq.NewInMemoryMQ[ObjWithParent]()
|
|
running = atomic.Bool{} // current goroutine running
|
|
wg = &sync.WaitGroup{}
|
|
)
|
|
running.Store(true)
|
|
wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(time.Second)
|
|
defer func() {
|
|
Quit.Store(nil)
|
|
wg.Done()
|
|
// notify walk to exit when StopIndex api called
|
|
running.Store(false)
|
|
ticker.Stop()
|
|
}()
|
|
tickCount := 0
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
tickCount += 1
|
|
if indexMQ.Len() < 1000 && tickCount != 5 {
|
|
continue
|
|
} else if tickCount >= 5 {
|
|
tickCount = 0
|
|
}
|
|
log.Infof("index obj count: %d", objCount)
|
|
indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
|
|
if len(messages) != 0 {
|
|
log.Debugf("current index: %s", messages[len(messages)-1].Content.Parent)
|
|
}
|
|
if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
|
|
func(src mq.Message[ObjWithParent]) ObjWithParent {
|
|
return src.Content
|
|
})); err != nil {
|
|
log.Errorf("build index in batch error: %+v", err)
|
|
} else {
|
|
objCount = objCount + uint64(len(messages))
|
|
}
|
|
if count {
|
|
WriteProgress(&model.IndexProgress{
|
|
ObjCount: objCount,
|
|
IsDone: false,
|
|
LastDoneTime: nil,
|
|
})
|
|
}
|
|
})
|
|
|
|
case <-quit:
|
|
log.Debugf("build index for %+v received quit", indexPaths)
|
|
eMsg := ""
|
|
now := time.Now()
|
|
originErr := err
|
|
indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
|
|
if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
|
|
func(src mq.Message[ObjWithParent]) ObjWithParent {
|
|
return src.Content
|
|
})); err != nil {
|
|
log.Errorf("build index in batch error: %+v", err)
|
|
} else {
|
|
objCount = objCount + uint64(len(messages))
|
|
}
|
|
if originErr != nil {
|
|
log.Errorf("build index error: %+v", originErr)
|
|
eMsg = originErr.Error()
|
|
} else {
|
|
log.Infof("success build index, count: %d", objCount)
|
|
}
|
|
if count {
|
|
WriteProgress(&model.IndexProgress{
|
|
ObjCount: objCount,
|
|
IsDone: true,
|
|
LastDoneTime: &now,
|
|
Error: eMsg,
|
|
})
|
|
}
|
|
})
|
|
log.Debugf("build index for %+v quit success", indexPaths)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
defer func() {
|
|
if !running.Load() || Quit.Load() != &quit {
|
|
log.Debugf("build index for %+v stopped by StopIndex", indexPaths)
|
|
return
|
|
}
|
|
select {
|
|
// avoid goroutine leak
|
|
case quit <- struct{}{}:
|
|
default:
|
|
}
|
|
wg.Wait()
|
|
}()
|
|
admin, err := op.GetAdmin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if count {
|
|
WriteProgress(&model.IndexProgress{
|
|
ObjCount: 0,
|
|
IsDone: false,
|
|
})
|
|
}
|
|
for _, indexPath := range indexPaths {
|
|
walkFn := func(indexPath string, info model.Obj) error {
|
|
if !running.Load() {
|
|
return filepath.SkipDir
|
|
}
|
|
for _, avoidPath := range ignorePaths {
|
|
if strings.HasPrefix(indexPath, avoidPath) {
|
|
return filepath.SkipDir
|
|
}
|
|
}
|
|
if storage, _, err := op.GetStorageAndActualPath(indexPath); err == nil {
|
|
if storage.GetStorage().DisableIndex {
|
|
return filepath.SkipDir
|
|
}
|
|
}
|
|
// ignore root
|
|
if indexPath == "/" {
|
|
return nil
|
|
}
|
|
indexMQ.Publish(mq.Message[ObjWithParent]{
|
|
Content: ObjWithParent{
|
|
Obj: info,
|
|
Parent: path.Dir(indexPath),
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
fi, err = fs.Get(ctx, indexPath, &fs.GetArgs{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// TODO: run walkFS concurrently
|
|
err = fs.WalkFS(context.WithValue(ctx, "user", admin), maxDepth, indexPath, fi, walkFn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func Del(ctx context.Context, prefix string) error {
|
|
return instance.Del(ctx, prefix)
|
|
}
|
|
|
|
func Clear(ctx context.Context) error {
|
|
return instance.Clear(ctx)
|
|
}
|
|
|
|
func Config(ctx context.Context) searcher.Config {
|
|
return instance.Config()
|
|
}
|
|
|
|
func Update(parent string, objs []model.Obj) {
|
|
if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running() {
|
|
return
|
|
}
|
|
if isIgnorePath(parent) {
|
|
return
|
|
}
|
|
ctx := context.Background()
|
|
// only update when index have built
|
|
progress, err := Progress()
|
|
if err != nil {
|
|
log.Errorf("update search index error while get progress: %+v", err)
|
|
return
|
|
}
|
|
if !progress.IsDone {
|
|
return
|
|
}
|
|
nodes, err := instance.Get(ctx, parent)
|
|
if err != nil {
|
|
log.Errorf("update search index error while get nodes: %+v", err)
|
|
return
|
|
}
|
|
now := mapset.NewSet[string]()
|
|
for i := range objs {
|
|
now.Add(objs[i].GetName())
|
|
}
|
|
old := mapset.NewSet[string]()
|
|
for i := range nodes {
|
|
old.Add(nodes[i].Name)
|
|
}
|
|
// delete data that no longer exists
|
|
toDelete := old.Difference(now)
|
|
toAdd := now.Difference(old)
|
|
for i := range nodes {
|
|
if toDelete.Contains(nodes[i].Name) && !op.HasStorage(path.Join(parent, nodes[i].Name)) {
|
|
log.Debugf("delete index: %s", path.Join(parent, nodes[i].Name))
|
|
err = instance.Del(ctx, path.Join(parent, nodes[i].Name))
|
|
if err != nil {
|
|
log.Errorf("update search index error while del old node: %+v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
for i := range objs {
|
|
if toAdd.Contains(objs[i].GetName()) {
|
|
if !objs[i].IsDir() {
|
|
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
|
|
err = Index(ctx, parent, objs[i])
|
|
if err != nil {
|
|
log.Errorf("update search index error while index new node: %+v", err)
|
|
return
|
|
}
|
|
} else {
|
|
// build index if it's a folder
|
|
dir := path.Join(parent, objs[i].GetName())
|
|
err = BuildIndex(ctx,
|
|
[]string{dir},
|
|
conf.SlicesMap[conf.IgnorePaths],
|
|
setting.GetInt(conf.MaxIndexDepth, 20)-strings.Count(dir, "/"), false)
|
|
if err != nil {
|
|
log.Errorf("update search index error while build index: %+v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
op.RegisterObjsUpdateHook(Update)
|
|
}
|