mirror of https://github.com/portainer/portainer
fix(edgegroups): avoid a last-write-wins situation when updating edge groups concurrently EE-3732 (#8101)
parent
6dc1841c14
commit
e26a607d28
|
@ -31,7 +31,6 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
|
|||
service.mu.Lock()
|
||||
|
||||
for _, tunnel := range service.tunnelDetailsMap {
|
||||
// Filter in-place
|
||||
n := 0
|
||||
for _, edgeJob := range tunnel.Jobs {
|
||||
if edgeJob.ID != edgeJobID {
|
||||
|
|
|
@ -8,10 +8,8 @@ import (
|
|||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// BucketName represents the name of the bucket where this service stores data.
|
||||
BucketName = "edgegroups"
|
||||
)
|
||||
// BucketName represents the name of the bucket where this service stores data.
|
||||
const BucketName = "edgegroups"
|
||||
|
||||
// Service represents a service for managing Edge group data.
|
||||
type Service struct {
|
||||
|
@ -68,12 +66,22 @@ func (service *Service) EdgeGroup(ID portainer.EdgeGroupID) (*portainer.EdgeGrou
|
|||
return &group, nil
|
||||
}
|
||||
|
||||
// UpdateEdgeGroup updates an Edge group.
|
||||
// Deprecated: Use UpdateEdgeGroupFunc instead.
|
||||
func (service *Service) UpdateEdgeGroup(ID portainer.EdgeGroupID, group *portainer.EdgeGroup) error {
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
return service.connection.UpdateObject(BucketName, identifier, group)
|
||||
}
|
||||
|
||||
// UpdateEdgeGroupFunc updates an edge group inside a transaction avoiding data races.
|
||||
func (service *Service) UpdateEdgeGroupFunc(ID portainer.EdgeGroupID, updateFunc func(edgeGroup *portainer.EdgeGroup)) error {
|
||||
id := service.connection.ConvertToKey(int(ID))
|
||||
edgeGroup := &portainer.EdgeGroup{}
|
||||
|
||||
return service.connection.UpdateObjectFunc(BucketName, id, edgeGroup, func() {
|
||||
updateFunc(edgeGroup)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteEdgeGroup deletes an Edge group.
|
||||
func (service *Service) DeleteEdgeGroup(ID portainer.EdgeGroupID) error {
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
|
|
|
@ -69,6 +69,7 @@ type (
|
|||
EdgeGroup(ID portainer.EdgeGroupID) (*portainer.EdgeGroup, error)
|
||||
Create(group *portainer.EdgeGroup) error
|
||||
UpdateEdgeGroup(ID portainer.EdgeGroupID, group *portainer.EdgeGroup) error
|
||||
UpdateEdgeGroupFunc(ID portainer.EdgeGroupID, updateFunc func(group *portainer.EdgeGroup)) error
|
||||
DeleteEdgeGroup(ID portainer.EdgeGroupID) error
|
||||
BucketName() string
|
||||
}
|
||||
|
|
|
@ -83,15 +83,12 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) *
|
|||
return httperror.InternalServerError("Unable to retrieve edge groups from the database", err)
|
||||
}
|
||||
|
||||
for idx := range edgeGroups {
|
||||
edgeGroup := &edgeGroups[idx]
|
||||
endpointIdx := findEndpointIndex(edgeGroup.Endpoints, endpoint.ID)
|
||||
if endpointIdx != -1 {
|
||||
edgeGroup.Endpoints = removeElement(edgeGroup.Endpoints, endpointIdx)
|
||||
err = handler.DataStore.EdgeGroup().UpdateEdgeGroup(edgeGroup.ID, edgeGroup)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update edge group", err)
|
||||
}
|
||||
for _, edgeGroup := range edgeGroups {
|
||||
err = handler.DataStore.EdgeGroup().UpdateEdgeGroupFunc(edgeGroup.ID, func(g *portainer.EdgeGroup) {
|
||||
g.Endpoints = removeElement(g.Endpoints, endpoint.ID)
|
||||
})
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update edge group", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,20 +127,14 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) *
|
|||
return response.Empty(w)
|
||||
}
|
||||
|
||||
func findEndpointIndex(tags []portainer.EndpointID, searchEndpointID portainer.EndpointID) int {
|
||||
for idx, tagID := range tags {
|
||||
if searchEndpointID == tagID {
|
||||
return idx
|
||||
func removeElement(slice []portainer.EndpointID, elem portainer.EndpointID) []portainer.EndpointID {
|
||||
for i, id := range slice {
|
||||
if id == elem {
|
||||
slice[i] = slice[len(slice)-1]
|
||||
|
||||
return slice[:len(slice)-1]
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func removeElement(arr []portainer.EndpointID, index int) []portainer.EndpointID {
|
||||
if index < 0 {
|
||||
return arr
|
||||
}
|
||||
lastTagIdx := len(arr) - 1
|
||||
arr[index] = arr[lastTagIdx]
|
||||
return arr[:lastTagIdx]
|
||||
return slice
|
||||
}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package endpoints
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/apikey"
|
||||
"github.com/portainer/portainer/api/datastore"
|
||||
"github.com/portainer/portainer/api/demo"
|
||||
"github.com/portainer/portainer/api/http/proxy"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/jwt"
|
||||
)
|
||||
|
||||
func TestEndpointDeleteEdgeGroupsConcurrently(t *testing.T) {
|
||||
const endpointsCount = 100
|
||||
|
||||
_, store, teardown := datastore.MustNewTestStore(t, true, false)
|
||||
defer teardown()
|
||||
|
||||
user := &portainer.User{ID: 2, Username: "admin", Role: portainer.AdministratorRole}
|
||||
err := store.User().Create(user)
|
||||
if err != nil {
|
||||
t.Fatal("could not create admin user:", err)
|
||||
}
|
||||
|
||||
jwtService, err := jwt.NewService("1h", store)
|
||||
if err != nil {
|
||||
t.Fatal("could not initialize the JWT service:", err)
|
||||
}
|
||||
|
||||
apiKeyService := apikey.NewAPIKeyService(store.APIKeyRepository(), store.User())
|
||||
rawAPIKey, _, err := apiKeyService.GenerateApiKey(*user, "test")
|
||||
if err != nil {
|
||||
t.Fatal("could not generate API key:", err)
|
||||
}
|
||||
|
||||
bouncer := security.NewRequestBouncer(store, jwtService, apiKeyService)
|
||||
|
||||
handler := NewHandler(bouncer, demo.NewService())
|
||||
handler.DataStore = store
|
||||
handler.ProxyManager = proxy.NewManager(nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
// Create all the environments and add them to the same edge group
|
||||
|
||||
var endpointIDs []portainer.EndpointID
|
||||
|
||||
for i := 0; i < endpointsCount; i++ {
|
||||
endpointID := portainer.EndpointID(i) + 1
|
||||
|
||||
err = store.Endpoint().Create(&portainer.Endpoint{
|
||||
ID: endpointID,
|
||||
Name: "env-" + strconv.Itoa(int(endpointID)),
|
||||
Type: portainer.EdgeAgentOnDockerEnvironment,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("could not create endpoint:", err)
|
||||
}
|
||||
|
||||
endpointIDs = append(endpointIDs, endpointID)
|
||||
}
|
||||
|
||||
err = store.EdgeGroup().Create(&portainer.EdgeGroup{
|
||||
ID: 1,
|
||||
Name: "edgegroup-1",
|
||||
Endpoints: endpointIDs,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("could not create edge group:", err)
|
||||
}
|
||||
|
||||
// Remove the environments concurrently
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(endpointIDs))
|
||||
|
||||
for _, endpointID := range endpointIDs {
|
||||
go func(ID portainer.EndpointID) {
|
||||
defer wg.Done()
|
||||
|
||||
req, err := http.NewRequest(http.MethodDelete, "/endpoints/"+strconv.Itoa(int(ID)), nil)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
req.Header.Add("X-Api-Key", rawAPIKey)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, req)
|
||||
}(endpointID)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Check that the edge group is consistent
|
||||
|
||||
edgeGroup, err := handler.DataStore.EdgeGroup().EdgeGroup(1)
|
||||
if err != nil {
|
||||
t.Fatal("could not retrieve the edge group:", err)
|
||||
}
|
||||
|
||||
if len(edgeGroup.Endpoints) > 0 {
|
||||
t.Fatal("the edge group is not consistent")
|
||||
}
|
||||
}
|
|
@ -44,13 +44,10 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe
|
|||
return httperror.InternalServerError("Unable to retrieve environment from the database", err)
|
||||
}
|
||||
|
||||
tagIdx := findTagIndex(endpoint.TagIDs, tagID)
|
||||
if tagIdx != -1 {
|
||||
endpoint.TagIDs = removeElement(endpoint.TagIDs, tagIdx)
|
||||
err = handler.DataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update environment", err)
|
||||
}
|
||||
endpoint.TagIDs = removeElement(endpoint.TagIDs, tagID)
|
||||
err = handler.DataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update environment", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,13 +57,10 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe
|
|||
return httperror.InternalServerError("Unable to retrieve environment group from the database", err)
|
||||
}
|
||||
|
||||
tagIdx := findTagIndex(endpointGroup.TagIDs, tagID)
|
||||
if tagIdx != -1 {
|
||||
endpointGroup.TagIDs = removeElement(endpointGroup.TagIDs, tagIdx)
|
||||
err = handler.DataStore.EndpointGroup().UpdateEndpointGroup(endpointGroup.ID, endpointGroup)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update environment group", err)
|
||||
}
|
||||
endpointGroup.TagIDs = removeElement(endpointGroup.TagIDs, tagID)
|
||||
err = handler.DataStore.EndpointGroup().UpdateEndpointGroup(endpointGroup.ID, endpointGroup)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update environment group", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,15 +88,12 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe
|
|||
}
|
||||
}
|
||||
|
||||
for idx := range edgeGroups {
|
||||
edgeGroup := &edgeGroups[idx]
|
||||
tagIdx := findTagIndex(edgeGroup.TagIDs, tagID)
|
||||
if tagIdx != -1 {
|
||||
edgeGroup.TagIDs = removeElement(edgeGroup.TagIDs, tagIdx)
|
||||
err = handler.DataStore.EdgeGroup().UpdateEdgeGroup(edgeGroup.ID, edgeGroup)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update environment group", err)
|
||||
}
|
||||
for _, edgeGroup := range edgeGroups {
|
||||
err = handler.DataStore.EdgeGroup().UpdateEdgeGroupFunc(edgeGroup.ID, func(g *portainer.EdgeGroup) {
|
||||
g.TagIDs = removeElement(g.TagIDs, tagID)
|
||||
})
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update edge group", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,20 +126,14 @@ func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edg
|
|||
return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation)
|
||||
}
|
||||
|
||||
func findTagIndex(tags []portainer.TagID, searchTagID portainer.TagID) int {
|
||||
for idx, tagID := range tags {
|
||||
if searchTagID == tagID {
|
||||
return idx
|
||||
func removeElement(slice []portainer.TagID, elem portainer.TagID) []portainer.TagID {
|
||||
for i, id := range slice {
|
||||
if id == elem {
|
||||
slice[i] = slice[len(slice)-1]
|
||||
|
||||
return slice[:len(slice)-1]
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func removeElement(arr []portainer.TagID, index int) []portainer.TagID {
|
||||
if index < 0 {
|
||||
return arr
|
||||
}
|
||||
lastTagIdx := len(arr) - 1
|
||||
arr[index] = arr[lastTagIdx]
|
||||
return arr[:lastTagIdx]
|
||||
return slice
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
package tags
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/apikey"
|
||||
"github.com/portainer/portainer/api/datastore"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/jwt"
|
||||
)
|
||||
|
||||
func TestTagDeleteEdgeGroupsConcurrently(t *testing.T) {
|
||||
const tagsCount = 100
|
||||
|
||||
_, store, teardown := datastore.MustNewTestStore(t, true, false)
|
||||
defer teardown()
|
||||
|
||||
user := &portainer.User{ID: 2, Username: "admin", Role: portainer.AdministratorRole}
|
||||
err := store.User().Create(user)
|
||||
if err != nil {
|
||||
t.Fatal("could not create admin user:", err)
|
||||
}
|
||||
|
||||
jwtService, err := jwt.NewService("1h", store)
|
||||
if err != nil {
|
||||
t.Fatal("could not initialize the JWT service:", err)
|
||||
}
|
||||
|
||||
apiKeyService := apikey.NewAPIKeyService(store.APIKeyRepository(), store.User())
|
||||
rawAPIKey, _, err := apiKeyService.GenerateApiKey(*user, "test")
|
||||
if err != nil {
|
||||
t.Fatal("could not generate API key:", err)
|
||||
}
|
||||
|
||||
bouncer := security.NewRequestBouncer(store, jwtService, apiKeyService)
|
||||
|
||||
handler := NewHandler(bouncer)
|
||||
handler.DataStore = store
|
||||
|
||||
// Create all the tags and add them to the same edge group
|
||||
|
||||
var tagIDs []portainer.TagID
|
||||
|
||||
for i := 0; i < tagsCount; i++ {
|
||||
tagID := portainer.TagID(i) + 1
|
||||
|
||||
err = store.Tag().Create(&portainer.Tag{
|
||||
ID: tagID,
|
||||
Name: "tag-" + strconv.Itoa(int(tagID)),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("could not create tag:", err)
|
||||
}
|
||||
|
||||
tagIDs = append(tagIDs, tagID)
|
||||
}
|
||||
|
||||
err = store.EdgeGroup().Create(&portainer.EdgeGroup{
|
||||
ID: 1,
|
||||
Name: "edgegroup-1",
|
||||
TagIDs: tagIDs,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("could not create edge group:", err)
|
||||
}
|
||||
|
||||
// Remove the tags concurrently
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(tagIDs))
|
||||
|
||||
for _, tagID := range tagIDs {
|
||||
go func(ID portainer.TagID) {
|
||||
defer wg.Done()
|
||||
|
||||
req, err := http.NewRequest(http.MethodDelete, "/tags/"+strconv.Itoa(int(ID)), nil)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
req.Header.Add("X-Api-Key", rawAPIKey)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, req)
|
||||
}(tagID)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Check that the edge group is consistent
|
||||
|
||||
edgeGroup, err := handler.DataStore.EdgeGroup().EdgeGroup(1)
|
||||
if err != nil {
|
||||
t.Fatal("could not retrieve the edge group:", err)
|
||||
}
|
||||
|
||||
if len(edgeGroup.TagIDs) > 0 {
|
||||
t.Fatal("the edge group is not consistent")
|
||||
}
|
||||
}
|
|
@ -66,7 +66,10 @@ func (manager *Manager) GetEndpointProxy(endpoint *portainer.Endpoint) http.Hand
|
|||
// is currently only called for edge connection clean up and when endpoint is updated
|
||||
func (manager *Manager) DeleteEndpointProxy(endpointID portainer.EndpointID) {
|
||||
manager.endpointProxies.Remove(fmt.Sprint(endpointID))
|
||||
manager.k8sClientFactory.RemoveKubeClient(endpointID)
|
||||
|
||||
if manager.k8sClientFactory != nil {
|
||||
manager.k8sClientFactory.RemoveKubeClient(endpointID)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateGitlabProxy creates a new HTTP reverse proxy that can be used to send requests to the Gitlab API
|
||||
|
|
Loading…
Reference in New Issue