feat(local): thumbnail token bucket smooth migration (#7425)

* feat(local): allow to migrate static token buckets

* improve(local): token bucket migration boundary handling
pull/7453/head
Mmx 2024-11-01 20:58:53 +08:00 committed by GitHub
parent 4955d8cec8
commit 34a148c83d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 3 deletions

View File

@ -76,7 +76,7 @@ func (d *Local) Init(ctx context.Context) error {
if d.thumbConcurrency == 0 {
d.thumbTokenBucket = NewNopTokenBucket()
} else {
d.thumbTokenBucket = NewStaticTokenBucket(d.thumbConcurrency)
d.thumbTokenBucket = NewStaticTokenBucketWithMigration(d.thumbTokenBucket, d.thumbConcurrency)
}
return nil
}

View File

@ -23,6 +23,38 @@ func NewStaticTokenBucket(size int) StaticTokenBucket {
return StaticTokenBucket{bucket: bucket}
}
func NewStaticTokenBucketWithMigration(oldBucket TokenBucket, size int) StaticTokenBucket {
if oldBucket != nil {
oldStaticBucket, ok := oldBucket.(StaticTokenBucket)
if ok {
oldSize := cap(oldStaticBucket.bucket)
migrateSize := oldSize
if size < migrateSize {
migrateSize = size
}
bucket := make(chan struct{}, size)
for range size - migrateSize {
bucket <- struct{}{}
}
if migrateSize != 0 {
go func() {
for range migrateSize {
<-oldStaticBucket.bucket
bucket <- struct{}{}
}
close(oldStaticBucket.bucket)
}()
}
return StaticTokenBucket{bucket: bucket}
}
}
return NewStaticTokenBucket(size)
}
// Take channel maybe closed when local driver is modified.
// don't call Put method after the channel is closed.
func (b StaticTokenBucket) Take() <-chan struct{} {
return b.bucket
}
@ -35,9 +67,11 @@ func (b StaticTokenBucket) Do(ctx context.Context, f func() error) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.bucket:
case _, ok := <-b.Take():
if ok {
defer b.Put()
}
}
return f()
}