alist/pkg/mq/mq.go

64 lines
1.0 KiB
Go
Raw Normal View History

2022-12-05 07:46:34 +00:00
package mq
import (
"sync"
"github.com/alist-org/alist/v3/pkg/generic"
)
type Message[T any] struct {
Content T
}
type BasicConsumer[T any] func(Message[T])
type AllConsumer[T any] func([]Message[T])
type MQ[T any] interface {
Publish(Message[T])
Consume(BasicConsumer[T])
ConsumeAll(AllConsumer[T])
Clear()
Len() int
2022-12-05 07:46:34 +00:00
}
type inMemoryMQ[T any] struct {
queue generic.Queue[Message[T]]
sync.Mutex
}
func NewInMemoryMQ[T any]() MQ[T] {
return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()}
}
func (mq *inMemoryMQ[T]) Publish(msg Message[T]) {
mq.Lock()
defer mq.Unlock()
mq.queue.Push(msg)
}
func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) {
mq.Lock()
defer mq.Unlock()
for !mq.queue.IsEmpty() {
consumer(mq.queue.Pop())
}
}
func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) {
mq.Lock()
defer mq.Unlock()
consumer(mq.queue.PopAll())
}
func (mq *inMemoryMQ[T]) Clear() {
mq.Lock()
defer mq.Unlock()
mq.queue.Clear()
}
func (mq *inMemoryMQ[T]) Len() int {
mq.Lock()
defer mq.Unlock()
return mq.queue.Len()
}