mirror of https://github.com/portainer/portainer
163 lines
3.9 KiB
Go
163 lines
3.9 KiB
Go
package boltdb
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
|
|
dserrors "github.com/portainer/portainer/api/dataservices/errors"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
type DbTransaction struct {
|
|
conn *DbConnection
|
|
tx *bolt.Tx
|
|
}
|
|
|
|
func (tx *DbTransaction) SetServiceName(bucketName string) error {
|
|
_, err := tx.tx.CreateBucketIfNotExists([]byte(bucketName))
|
|
return err
|
|
}
|
|
|
|
func (tx *DbTransaction) GetObject(bucketName string, key []byte, object interface{}) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
|
|
value := bucket.Get(key)
|
|
if value == nil {
|
|
return fmt.Errorf("%w (bucket=%s, key=%s)", dserrors.ErrObjectNotFound, bucketName, keyToString(key))
|
|
}
|
|
|
|
return tx.conn.UnmarshalObject(value, object)
|
|
}
|
|
|
|
func (tx *DbTransaction) UpdateObject(bucketName string, key []byte, object interface{}) error {
|
|
data, err := tx.conn.MarshalObject(object)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
return bucket.Put(key, data)
|
|
}
|
|
|
|
func (tx *DbTransaction) DeleteObject(bucketName string, key []byte) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
return bucket.Delete(key)
|
|
}
|
|
|
|
func (tx *DbTransaction) DeleteAllObjects(bucketName string, obj interface{}, matchingFn func(o interface{}) (id int, ok bool)) error {
|
|
var ids []int
|
|
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
|
|
cursor := bucket.Cursor()
|
|
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
|
|
err := tx.conn.UnmarshalObject(v, &obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if id, ok := matchingFn(obj); ok {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
|
|
for _, id := range ids {
|
|
if err := bucket.Delete(tx.conn.ConvertToKey(id)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tx *DbTransaction) GetNextIdentifier(bucketName string) int {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
id, err := bucket.NextSequence()
|
|
if err != nil {
|
|
log.Error().Err(err).Str("bucket", bucketName).Msg("failed to get the next identifer")
|
|
return 0
|
|
}
|
|
|
|
return int(id)
|
|
}
|
|
|
|
func (tx *DbTransaction) CreateObject(bucketName string, fn func(uint64) (int, interface{})) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
|
|
seqId, _ := bucket.NextSequence()
|
|
id, obj := fn(seqId)
|
|
|
|
data, err := tx.conn.MarshalObject(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bucket.Put(tx.conn.ConvertToKey(id), data)
|
|
}
|
|
|
|
func (tx *DbTransaction) CreateObjectWithId(bucketName string, id int, obj interface{}) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
data, err := tx.conn.MarshalObject(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bucket.Put(tx.conn.ConvertToKey(id), data)
|
|
}
|
|
|
|
func (tx *DbTransaction) CreateObjectWithStringId(bucketName string, id []byte, obj interface{}) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
data, err := tx.conn.MarshalObject(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bucket.Put(id, data)
|
|
}
|
|
|
|
func (tx *DbTransaction) GetAll(bucketName string, obj interface{}, appendFn func(o interface{}) (interface{}, error)) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
|
|
return bucket.ForEach(func(k []byte, v []byte) error {
|
|
err := tx.conn.UnmarshalObject(v, obj)
|
|
if err == nil {
|
|
obj, err = appendFn(obj)
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (tx *DbTransaction) GetAllWithJsoniter(bucketName string, obj interface{}, appendFn func(o interface{}) (interface{}, error)) error {
|
|
bucket := tx.tx.Bucket([]byte(bucketName))
|
|
|
|
return bucket.ForEach(func(k []byte, v []byte) error {
|
|
err := tx.conn.UnmarshalObject(v, obj)
|
|
if err == nil {
|
|
obj, err = appendFn(obj)
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (tx *DbTransaction) GetAllWithKeyPrefix(bucketName string, keyPrefix []byte, obj interface{}, appendFn func(o interface{}) (interface{}, error)) error {
|
|
cursor := tx.tx.Bucket([]byte(bucketName)).Cursor()
|
|
|
|
for k, v := cursor.Seek(keyPrefix); k != nil && bytes.HasPrefix(k, keyPrefix); k, v = cursor.Next() {
|
|
err := tx.conn.UnmarshalObject(v, obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
obj, err = appendFn(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|