mirror of https://github.com/portainer/portainer
fix(tags): remove a data race EE-4310 (#7862)
parent
8f1ac38963
commit
367f3dd6d4
|
@ -24,6 +24,7 @@ type Connection interface {
|
||||||
SetServiceName(bucketName string) error
|
SetServiceName(bucketName string) error
|
||||||
GetObject(bucketName string, key []byte, object interface{}) error
|
GetObject(bucketName string, key []byte, object interface{}) error
|
||||||
UpdateObject(bucketName string, key []byte, object interface{}) error
|
UpdateObject(bucketName string, key []byte, object interface{}) error
|
||||||
|
UpdateObjectFunc(bucketName string, key []byte, object any, updateFn func()) error
|
||||||
DeleteObject(bucketName string, key []byte) error
|
DeleteObject(bucketName string, key []byte) error
|
||||||
DeleteAllObjects(bucketName string, matching func(o interface{}) (id int, ok bool)) error
|
DeleteAllObjects(bucketName string, matching func(o interface{}) (id int, ok bool)) error
|
||||||
GetNextIdentifier(bucketName string) int
|
GetNextIdentifier(bucketName string) int
|
||||||
|
|
|
@ -179,7 +179,7 @@ func (connection *DbConnection) ConvertToKey(v int) []byte {
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBucket is a generic function used to create a bucket inside a database database.
|
// CreateBucket is a generic function used to create a bucket inside a database.
|
||||||
func (connection *DbConnection) SetServiceName(bucketName string) error {
|
func (connection *DbConnection) SetServiceName(bucketName string) error {
|
||||||
return connection.Batch(func(tx *bolt.Tx) error {
|
return connection.Batch(func(tx *bolt.Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
_, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||||
|
@ -187,7 +187,7 @@ func (connection *DbConnection) SetServiceName(bucketName string) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject is a generic function used to retrieve an unmarshalled object from a database database.
|
// GetObject is a generic function used to retrieve an unmarshalled object from a database.
|
||||||
func (connection *DbConnection) GetObject(bucketName string, key []byte, object interface{}) error {
|
func (connection *DbConnection) GetObject(bucketName string, key []byte, object interface{}) error {
|
||||||
var data []byte
|
var data []byte
|
||||||
|
|
||||||
|
@ -219,7 +219,7 @@ func (connection *DbConnection) getEncryptionKey() []byte {
|
||||||
return connection.EncryptionKey
|
return connection.EncryptionKey
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateObject is a generic function used to update an object inside a database database.
|
// UpdateObject is a generic function used to update an object inside a database.
|
||||||
func (connection *DbConnection) UpdateObject(bucketName string, key []byte, object interface{}) error {
|
func (connection *DbConnection) UpdateObject(bucketName string, key []byte, object interface{}) error {
|
||||||
data, err := connection.MarshalObject(object)
|
data, err := connection.MarshalObject(object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -232,7 +232,33 @@ func (connection *DbConnection) UpdateObject(bucketName string, key []byte, obje
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObject is a generic function used to delete an object inside a database database.
|
// UpdateObjectFunc is a generic function used to update an object safely without race conditions.
|
||||||
|
func (connection *DbConnection) UpdateObjectFunc(bucketName string, key []byte, object any, updateFn func()) error {
|
||||||
|
return connection.Batch(func(tx *bolt.Tx) error {
|
||||||
|
bucket := tx.Bucket([]byte(bucketName))
|
||||||
|
|
||||||
|
data := bucket.Get(key)
|
||||||
|
if data == nil {
|
||||||
|
return dserrors.ErrObjectNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
err := connection.UnmarshalObjectWithJsoniter(data, object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
updateFn()
|
||||||
|
|
||||||
|
data, err = connection.MarshalObject(object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucket.Put(key, data)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject is a generic function used to delete an object inside a database.
|
||||||
func (connection *DbConnection) DeleteObject(bucketName string, key []byte) error {
|
func (connection *DbConnection) DeleteObject(bucketName string, key []byte) error {
|
||||||
return connection.Batch(func(tx *bolt.Tx) error {
|
return connection.Batch(func(tx *bolt.Tx) error {
|
||||||
bucket := tx.Bucket([]byte(bucketName))
|
bucket := tx.Bucket([]byte(bucketName))
|
||||||
|
|
|
@ -251,6 +251,7 @@ type (
|
||||||
Tag(ID portainer.TagID) (*portainer.Tag, error)
|
Tag(ID portainer.TagID) (*portainer.Tag, error)
|
||||||
Create(tag *portainer.Tag) error
|
Create(tag *portainer.Tag) error
|
||||||
UpdateTag(ID portainer.TagID, tag *portainer.Tag) error
|
UpdateTag(ID portainer.TagID, tag *portainer.Tag) error
|
||||||
|
UpdateTagFunc(ID portainer.TagID, updateFunc func(tag *portainer.Tag)) error
|
||||||
DeleteTag(ID portainer.TagID) error
|
DeleteTag(ID portainer.TagID) error
|
||||||
BucketName() string
|
BucketName() string
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,12 +80,24 @@ func (service *Service) Create(tag *portainer.Tag) error {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateTag updates a tag.
|
// Deprecated: Use UpdateTagFunc instead.
|
||||||
func (service *Service) UpdateTag(ID portainer.TagID, tag *portainer.Tag) error {
|
func (service *Service) UpdateTag(ID portainer.TagID, tag *portainer.Tag) error {
|
||||||
identifier := service.connection.ConvertToKey(int(ID))
|
identifier := service.connection.ConvertToKey(int(ID))
|
||||||
return service.connection.UpdateObject(BucketName, identifier, tag)
|
return service.connection.UpdateObject(BucketName, identifier, tag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateTagFunc updates a tag inside a transaction avoiding data races.
|
||||||
|
func (service *Service) UpdateTagFunc(ID portainer.TagID, updateFunc func(tag *portainer.Tag)) error {
|
||||||
|
id := service.connection.ConvertToKey(int(ID))
|
||||||
|
tag := &portainer.Tag{}
|
||||||
|
|
||||||
|
service.connection.UpdateObjectFunc(BucketName, id, tag, func() {
|
||||||
|
updateFunc(tag)
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteTag deletes a tag.
|
// DeleteTag deletes a tag.
|
||||||
func (service *Service) DeleteTag(ID portainer.TagID) error {
|
func (service *Service) DeleteTag(ID portainer.TagID) error {
|
||||||
identifier := service.connection.ConvertToKey(int(ID))
|
identifier := service.connection.ConvertToKey(int(ID))
|
||||||
|
|
|
@ -91,15 +91,13 @@ func (handler *Handler) endpointGroupCreate(w http.ResponseWriter, r *http.Reque
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tagID := range endpointGroup.TagIDs {
|
for _, tagID := range endpointGroup.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
tag.EndpointGroups[endpointGroup.ID] = true
|
||||||
return httperror.InternalServerError("Unable to retrieve tag from the database", err)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
tag.EndpointGroups[endpointGroup.ID] = true
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
err = handler.DataStore.Tag().UpdateTag(tagID, tag)
|
} else if err != nil {
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,15 +66,13 @@ func (handler *Handler) endpointGroupDelete(w http.ResponseWriter, r *http.Reque
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tagID := range endpointGroup.TagIDs {
|
for _, tagID := range endpointGroup.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
delete(tag.EndpointGroups, endpointGroup.ID)
|
||||||
return httperror.InternalServerError("Unable to retrieve tag from the database", err)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
delete(tag.EndpointGroups, endpointGroup.ID)
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
err = handler.DataStore.Tag().UpdateTag(tagID, tag)
|
} else if err != nil {
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,28 +81,26 @@ func (handler *Handler) endpointGroupUpdate(w http.ResponseWriter, r *http.Reque
|
||||||
removeTags := tag.Difference(endpointGroupTagSet, payloadTagSet)
|
removeTags := tag.Difference(endpointGroupTagSet, payloadTagSet)
|
||||||
|
|
||||||
for tagID := range removeTags {
|
for tagID := range removeTags {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
delete(tag.EndpointGroups, endpointGroup.ID)
|
||||||
|
})
|
||||||
|
|
||||||
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
}
|
} else if err != nil {
|
||||||
delete(tag.EndpointGroups, endpointGroup.ID)
|
|
||||||
err = handler.DataStore.Tag().UpdateTag(tag.ID, tag)
|
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointGroup.TagIDs = payload.TagIDs
|
endpointGroup.TagIDs = payload.TagIDs
|
||||||
for _, tagID := range payload.TagIDs {
|
for _, tagID := range payload.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
tag.EndpointGroups[endpointGroup.ID] = true
|
||||||
|
})
|
||||||
|
|
||||||
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
}
|
} else if err != nil {
|
||||||
|
|
||||||
tag.EndpointGroups[endpointGroup.ID] = true
|
|
||||||
|
|
||||||
err = handler.DataStore.Tag().UpdateTag(tag.ID, tag)
|
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -530,14 +530,9 @@ func (handler *Handler) saveEndpointAndUpdateAuthorizations(endpoint *portainer.
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tagID := range endpoint.TagIDs {
|
for _, tagID := range endpoint.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
err = handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
tag.Endpoints[endpoint.ID] = true
|
||||||
return err
|
})
|
||||||
}
|
|
||||||
|
|
||||||
tag.Endpoints[endpoint.ID] = true
|
|
||||||
|
|
||||||
err = handler.DataStore.Tag().UpdateTag(tagID, tag)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,15 +62,13 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) *
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tagID := range endpoint.TagIDs {
|
for _, tagID := range endpoint.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
err = handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
delete(tag.Endpoints, endpoint.ID)
|
||||||
|
})
|
||||||
|
|
||||||
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
return httperror.NotFound("Unable to find tag inside the database", err)
|
return httperror.NotFound("Unable to find tag inside the database", err)
|
||||||
}
|
} else if err != nil {
|
||||||
|
|
||||||
delete(tag.Endpoints, endpoint.ID)
|
|
||||||
|
|
||||||
err = handler.DataStore.Tag().UpdateTag(tagID, tag)
|
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag relation inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag relation inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,29 +139,26 @@ func (handler *Handler) endpointUpdate(w http.ResponseWriter, r *http.Request) *
|
||||||
removeTags := tag.Difference(endpointTagSet, payloadTagSet)
|
removeTags := tag.Difference(endpointTagSet, payloadTagSet)
|
||||||
|
|
||||||
for tagID := range removeTags {
|
for tagID := range removeTags {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
err = handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
delete(tag.Endpoints, endpoint.ID)
|
||||||
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
delete(tag.Endpoints, endpoint.ID)
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
err = handler.DataStore.Tag().UpdateTag(tag.ID, tag)
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
if err != nil {
|
} else if err != nil {
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoint.TagIDs = payload.TagIDs
|
endpoint.TagIDs = payload.TagIDs
|
||||||
for _, tagID := range payload.TagIDs {
|
for _, tagID := range payload.TagIDs {
|
||||||
tag, err := handler.DataStore.Tag().Tag(tagID)
|
err = handler.DataStore.Tag().UpdateTagFunc(tagID, func(tag *portainer.Tag) {
|
||||||
if err != nil {
|
tag.Endpoints[endpoint.ID] = true
|
||||||
|
})
|
||||||
|
|
||||||
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
return httperror.InternalServerError("Unable to find a tag inside the database", err)
|
||||||
}
|
} else if err != nil {
|
||||||
|
|
||||||
tag.Endpoints[endpoint.ID] = true
|
|
||||||
|
|
||||||
err = handler.DataStore.Tag().UpdateTag(tag.ID, tag)
|
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
return httperror.InternalServerError("Unable to persist tag changes inside the database", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue