mirror of https://github.com/Xhofe/alist
311 lines
7.9 KiB
Go
311 lines
7.9 KiB
Go
package fs
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/alist-org/alist/v3/drivers/s3"
|
|
"github.com/alist-org/alist/v3/internal/driver"
|
|
"github.com/alist-org/alist/v3/internal/model"
|
|
"github.com/alist-org/alist/v3/internal/op"
|
|
"github.com/alist-org/alist/v3/internal/task"
|
|
"github.com/pkg/errors"
|
|
"github.com/xhofe/tache"
|
|
)
|
|
|
|
const s3TransitionPollInterval = 15 * time.Second
|
|
|
|
// S3TransitionTask represents an asynchronous S3 archive/thaw request that is
|
|
// tracked via the task manager so that clients can monitor the progress of the
|
|
// operation.
|
|
type S3TransitionTask struct {
|
|
task.TaskExtension
|
|
status string
|
|
|
|
StorageMountPath string `json:"storage_mount_path"`
|
|
ObjectPath string `json:"object_path"`
|
|
DisplayPath string `json:"display_path"`
|
|
ObjectName string `json:"object_name"`
|
|
Transition string `json:"transition"`
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
|
|
TargetStorageClass string `json:"target_storage_class,omitempty"`
|
|
RequestID string `json:"request_id,omitempty"`
|
|
VersionID string `json:"version_id,omitempty"`
|
|
|
|
storage driver.Driver `json:"-"`
|
|
}
|
|
|
|
// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks.
|
|
var S3TransitionTaskManager *tache.Manager[*S3TransitionTask]
|
|
|
|
var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil)
|
|
|
|
func (t *S3TransitionTask) GetName() string {
|
|
action := strings.ToLower(t.Transition)
|
|
if action == "" {
|
|
action = "transition"
|
|
}
|
|
display := t.DisplayPath
|
|
if display == "" {
|
|
display = t.ObjectPath
|
|
}
|
|
if display == "" {
|
|
display = t.ObjectName
|
|
}
|
|
return fmt.Sprintf("s3 %s %s", action, display)
|
|
}
|
|
|
|
func (t *S3TransitionTask) GetStatus() string {
|
|
return t.status
|
|
}
|
|
|
|
func (t *S3TransitionTask) Run() error {
|
|
t.ReinitCtx()
|
|
t.ClearEndTime()
|
|
start := time.Now()
|
|
t.SetStartTime(start)
|
|
defer func() { t.SetEndTime(time.Now()) }()
|
|
|
|
if err := t.ensureStorage(); err != nil {
|
|
t.status = fmt.Sprintf("locate storage failed: %v", err)
|
|
return err
|
|
}
|
|
|
|
payload, err := t.decodePayload()
|
|
if err != nil {
|
|
t.status = fmt.Sprintf("decode payload failed: %v", err)
|
|
return err
|
|
}
|
|
|
|
method := strings.ToLower(strings.TrimSpace(t.Transition))
|
|
switch method {
|
|
case s3.OtherMethodArchive:
|
|
t.status = "submitting archive request"
|
|
t.SetProgress(0)
|
|
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
|
|
Path: t.ObjectPath,
|
|
Method: s3.OtherMethodArchive,
|
|
Data: payload,
|
|
})
|
|
if err != nil {
|
|
t.status = fmt.Sprintf("archive request failed: %v", err)
|
|
return err
|
|
}
|
|
archiveResp, ok := toArchiveResponse(resp)
|
|
if ok {
|
|
if t.TargetStorageClass == "" {
|
|
t.TargetStorageClass = archiveResp.StorageClass
|
|
}
|
|
t.RequestID = archiveResp.RequestID
|
|
t.VersionID = archiveResp.VersionID
|
|
if archiveResp.StorageClass != "" {
|
|
t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass)
|
|
} else {
|
|
t.status = "archive requested"
|
|
}
|
|
} else if sc := t.extractTargetStorageClass(); sc != "" {
|
|
t.TargetStorageClass = sc
|
|
t.status = fmt.Sprintf("archive requested, waiting for %s", sc)
|
|
} else {
|
|
t.status = "archive requested"
|
|
}
|
|
if t.TargetStorageClass != "" {
|
|
t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass)
|
|
}
|
|
t.SetProgress(25)
|
|
return t.waitForArchive()
|
|
case s3.OtherMethodThaw:
|
|
t.status = "submitting thaw request"
|
|
t.SetProgress(0)
|
|
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
|
|
Path: t.ObjectPath,
|
|
Method: s3.OtherMethodThaw,
|
|
Data: payload,
|
|
})
|
|
if err != nil {
|
|
t.status = fmt.Sprintf("thaw request failed: %v", err)
|
|
return err
|
|
}
|
|
thawResp, ok := toThawResponse(resp)
|
|
if ok {
|
|
t.RequestID = thawResp.RequestID
|
|
if thawResp.Status != nil && !thawResp.Status.Ongoing {
|
|
t.SetProgress(100)
|
|
t.status = thawCompletionMessage(thawResp.Status)
|
|
return nil
|
|
}
|
|
}
|
|
t.status = "thaw requested"
|
|
t.SetProgress(25)
|
|
return t.waitForThaw()
|
|
default:
|
|
return errors.Errorf("unsupported transition method: %s", t.Transition)
|
|
}
|
|
}
|
|
|
|
func (t *S3TransitionTask) ensureStorage() error {
|
|
if t.storage != nil {
|
|
return nil
|
|
}
|
|
storage, err := op.GetStorageByMountPath(t.StorageMountPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.storage = storage
|
|
return nil
|
|
}
|
|
|
|
func (t *S3TransitionTask) decodePayload() (interface{}, error) {
|
|
if len(t.Payload) == 0 {
|
|
return nil, nil
|
|
}
|
|
var payload interface{}
|
|
if err := json.Unmarshal(t.Payload, &payload); err != nil {
|
|
return nil, err
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
func (t *S3TransitionTask) extractTargetStorageClass() string {
|
|
if len(t.Payload) == 0 {
|
|
return ""
|
|
}
|
|
var req s3.ArchiveRequest
|
|
if err := json.Unmarshal(t.Payload, &req); err != nil {
|
|
return ""
|
|
}
|
|
return s3.NormalizeStorageClass(req.StorageClass)
|
|
}
|
|
|
|
func (t *S3TransitionTask) waitForArchive() error {
|
|
ticker := time.NewTicker(s3TransitionPollInterval)
|
|
defer ticker.Stop()
|
|
|
|
ctx := t.Ctx()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.status = "archive canceled"
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
|
|
Path: t.ObjectPath,
|
|
Method: s3.OtherMethodArchiveStatus,
|
|
})
|
|
if err != nil {
|
|
t.status = fmt.Sprintf("archive status error: %v", err)
|
|
return err
|
|
}
|
|
archiveResp, ok := toArchiveResponse(resp)
|
|
if !ok {
|
|
t.status = fmt.Sprintf("unexpected archive status response: %T", resp)
|
|
return errors.Errorf("unexpected archive status response: %T", resp)
|
|
}
|
|
currentClass := strings.TrimSpace(archiveResp.StorageClass)
|
|
target := strings.TrimSpace(t.TargetStorageClass)
|
|
if target == "" {
|
|
target = currentClass
|
|
t.TargetStorageClass = currentClass
|
|
}
|
|
if currentClass == "" {
|
|
t.status = "waiting for storage class update"
|
|
t.SetProgress(50)
|
|
continue
|
|
}
|
|
if strings.EqualFold(currentClass, target) {
|
|
t.SetProgress(100)
|
|
t.status = fmt.Sprintf("archive complete (%s)", currentClass)
|
|
return nil
|
|
}
|
|
t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target)
|
|
t.SetProgress(75)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *S3TransitionTask) waitForThaw() error {
|
|
ticker := time.NewTicker(s3TransitionPollInterval)
|
|
defer ticker.Stop()
|
|
|
|
ctx := t.Ctx()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.status = "thaw canceled"
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
|
|
Path: t.ObjectPath,
|
|
Method: s3.OtherMethodThawStatus,
|
|
})
|
|
if err != nil {
|
|
t.status = fmt.Sprintf("thaw status error: %v", err)
|
|
return err
|
|
}
|
|
thawResp, ok := toThawResponse(resp)
|
|
if !ok {
|
|
t.status = fmt.Sprintf("unexpected thaw status response: %T", resp)
|
|
return errors.Errorf("unexpected thaw status response: %T", resp)
|
|
}
|
|
status := thawResp.Status
|
|
if status == nil {
|
|
t.status = "waiting for thaw status"
|
|
t.SetProgress(50)
|
|
continue
|
|
}
|
|
if status.Ongoing {
|
|
t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw)
|
|
t.SetProgress(75)
|
|
continue
|
|
}
|
|
t.SetProgress(100)
|
|
t.status = thawCompletionMessage(status)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func thawCompletionMessage(status *s3.RestoreStatus) string {
|
|
if status == nil {
|
|
return "thaw complete"
|
|
}
|
|
if status.Expiry != "" {
|
|
return fmt.Sprintf("thaw complete, expires %s", status.Expiry)
|
|
}
|
|
return "thaw complete"
|
|
}
|
|
|
|
func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) {
|
|
switch resp := v.(type) {
|
|
case s3.ArchiveResponse:
|
|
return resp, true
|
|
case *s3.ArchiveResponse:
|
|
if resp != nil {
|
|
return *resp, true
|
|
}
|
|
}
|
|
return s3.ArchiveResponse{}, false
|
|
}
|
|
|
|
func toThawResponse(v interface{}) (s3.ThawResponse, bool) {
|
|
switch resp := v.(type) {
|
|
case s3.ThawResponse:
|
|
return resp, true
|
|
case *s3.ThawResponse:
|
|
if resp != nil {
|
|
return *resp, true
|
|
}
|
|
}
|
|
return s3.ThawResponse{}, false
|
|
}
|
|
|
|
// Ensure compatibility with persistence when tasks are restored.
|
|
func (t *S3TransitionTask) OnRestore() {
|
|
// The storage handle is not persisted intentionally; it will be lazily
|
|
// re-fetched on the next Run invocation.
|
|
t.storage = nil
|
|
}
|