mirror of https://github.com/Xhofe/alist
				
				
				
			
		
			
				
	
	
		
			176 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			176 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
| package qbittorrent
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/alist-org/alist/v3/internal/model"
 | |
| 	"github.com/alist-org/alist/v3/internal/op"
 | |
| 	"github.com/alist-org/alist/v3/pkg/task"
 | |
| 	"github.com/alist-org/alist/v3/pkg/utils"
 | |
| 	"github.com/pkg/errors"
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| type Monitor struct {
 | |
| 	tsk        *task.Task[string]
 | |
| 	tempDir    string
 | |
| 	dstDirPath string
 | |
| 	seedtime   int
 | |
| 	finish     chan struct{}
 | |
| }
 | |
| 
 | |
| func (m *Monitor) Loop() error {
 | |
| 	var (
 | |
| 		err       error
 | |
| 		completed bool
 | |
| 	)
 | |
| 	m.finish = make(chan struct{})
 | |
| 
 | |
| 	// wait for qbittorrent to parse torrent and create task
 | |
| 	m.tsk.SetStatus("waiting for qbittorrent to parse torrent and create task")
 | |
| 	waitCount := 0
 | |
| 	for {
 | |
| 		_, err := qbclient.GetInfo(m.tsk.ID)
 | |
| 		if err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		switch err.(type) {
 | |
| 		case InfoNotFoundError:
 | |
| 			break
 | |
| 		default:
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		waitCount += 1
 | |
| 		if waitCount >= 60 {
 | |
| 			return errors.New("torrent parse timeout")
 | |
| 		}
 | |
| 		timer := time.NewTimer(time.Second)
 | |
| 		<-timer.C
 | |
| 	}
 | |
| 
 | |
| outer:
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-m.tsk.Ctx.Done():
 | |
| 			// delete qbittorrent task and downloaded files when the task exits with error
 | |
| 			return qbclient.Delete(m.tsk.ID, true)
 | |
| 		case <-time.After(time.Second * 2):
 | |
| 			completed, err = m.update()
 | |
| 			if completed {
 | |
| 				break outer
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	m.tsk.SetStatus("qbittorrent download completed, transferring")
 | |
| 	<-m.finish
 | |
| 	m.tsk.SetStatus("completed")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *Monitor) update() (bool, error) {
 | |
| 	info, err := qbclient.GetInfo(m.tsk.ID)
 | |
| 	if err != nil {
 | |
| 		m.tsk.SetStatus("qbittorrent " + string(info.State))
 | |
| 		return true, err
 | |
| 	}
 | |
| 
 | |
| 	progress := float64(info.Completed) / float64(info.Size) * 100
 | |
| 	m.tsk.SetProgress(int(progress))
 | |
| 	switch info.State {
 | |
| 	case UPLOADING, PAUSEDUP, QUEUEDUP, STALLEDUP, FORCEDUP, CHECKINGUP:
 | |
| 		err = m.complete()
 | |
| 		return true, errors.WithMessage(err, "failed to transfer file")
 | |
| 	case ALLOCATING, DOWNLOADING, METADL, PAUSEDDL, QUEUEDDL, STALLEDDL, CHECKINGDL, FORCEDDL, CHECKINGRESUMEDATA, MOVING:
 | |
| 		m.tsk.SetStatus("qbittorrent downloading")
 | |
| 		return false, nil
 | |
| 	case ERROR, MISSINGFILES, UNKNOWN:
 | |
| 		return true, errors.Errorf("failed to download %s, error: %s", m.tsk.ID, info.State)
 | |
| 	}
 | |
| 	return true, errors.New("unknown error occurred downloading qbittorrent") // should never happen
 | |
| }
 | |
| 
 | |
| var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) {
 | |
| 	atomic.AddUint64(k, 1)
 | |
| })
 | |
| 
 | |
| func (m *Monitor) complete() error {
 | |
| 	// check dstDir again
 | |
| 	storage, dstBaseDir, err := op.GetStorageAndActualPath(m.dstDirPath)
 | |
| 	if err != nil {
 | |
| 		return errors.WithMessage(err, "failed get storage")
 | |
| 	}
 | |
| 	// get files
 | |
| 	files, err := qbclient.GetFiles(m.tsk.ID)
 | |
| 	if err != nil {
 | |
| 		return errors.Wrapf(err, "failed to get files of %s", m.tsk.ID)
 | |
| 	}
 | |
| 	log.Debugf("files len: %d", len(files))
 | |
| 	// delete qbittorrent task but do not delete the files before transferring to avoid qbittorrent
 | |
| 	// accessing downloaded files and throw `cannot access the file because it is being used by another process` error
 | |
| 	// err = qbclient.Delete(m.tsk.ID, false)
 | |
| 	// if err != nil {
 | |
| 	// 	return err
 | |
| 	// }
 | |
| 	// upload files
 | |
| 	var wg sync.WaitGroup
 | |
| 	wg.Add(len(files))
 | |
| 	go func() {
 | |
| 		wg.Wait()
 | |
| 		m.finish <- struct{}{}
 | |
| 		if m.seedtime < 0 {
 | |
| 			log.Debugf("do not delete qb task %s", m.tsk.ID)
 | |
| 			return
 | |
| 		}
 | |
| 		log.Debugf("delete qb task %s after %d minutes", m.tsk.ID, m.seedtime)
 | |
| 		<-time.After(time.Duration(m.seedtime) * time.Minute)
 | |
| 		err := qbclient.Delete(m.tsk.ID, true)
 | |
| 		if err != nil {
 | |
| 			log.Errorln(err.Error())
 | |
| 		}
 | |
| 		err = os.RemoveAll(m.tempDir)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("failed to remove qbittorrent temp dir: %+v", err.Error())
 | |
| 		}
 | |
| 	}()
 | |
| 	for _, file := range files {
 | |
| 		tempPath := filepath.Join(m.tempDir, file.Name)
 | |
| 		dstPath := filepath.Join(dstBaseDir, file.Name)
 | |
| 		dstDir := filepath.Dir(dstPath)
 | |
| 		fileName := filepath.Base(dstPath)
 | |
| 		TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
 | |
| 			Name: fmt.Sprintf("transfer %s to [%s](%s)", tempPath, storage.GetStorage().MountPath, dstPath),
 | |
| 			Func: func(tsk *task.Task[uint64]) error {
 | |
| 				defer wg.Done()
 | |
| 				size := file.Size
 | |
| 				mimetype := utils.GetMimeType(tempPath)
 | |
| 				f, err := os.Open(tempPath)
 | |
| 				if err != nil {
 | |
| 					return errors.Wrapf(err, "failed to open file %s", tempPath)
 | |
| 				}
 | |
| 				stream := &model.FileStream{
 | |
| 					Obj: &model.Object{
 | |
| 						Name:     fileName,
 | |
| 						Size:     size,
 | |
| 						Modified: time.Now(),
 | |
| 						IsFolder: false,
 | |
| 					},
 | |
| 					ReadCloser: struct{ io.ReadSeekCloser }{f},
 | |
| 					Mimetype:   mimetype,
 | |
| 				}
 | |
| 				return op.Put(tsk.Ctx, storage, dstDir, stream, tsk.SetProgress)
 | |
| 			},
 | |
| 		}))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |