mirror of https://github.com/portainer/portainer
fix(endpointrelation): optimize updateEdgeStacksAfterRelationChange() BE-12092 (#941)
parent
5e271fd4a4
commit
a46db61c4c
|
@ -0,0 +1,50 @@
|
|||
package edgestack
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/database/boltdb"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpdate(t *testing.T) {
|
||||
var conn portainer.Connection = &boltdb.DbConnection{Path: t.TempDir()}
|
||||
err := conn.Open()
|
||||
require.NoError(t, err)
|
||||
|
||||
defer conn.Close()
|
||||
|
||||
service, err := NewService(conn, func(portainer.Transaction, portainer.EdgeStackID) {})
|
||||
require.NoError(t, err)
|
||||
|
||||
const edgeStackID = 1
|
||||
edgeStack := &portainer.EdgeStack{
|
||||
ID: edgeStackID,
|
||||
Name: "Test Stack",
|
||||
}
|
||||
|
||||
err = service.Create(edgeStackID, edgeStack)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.UpdateEdgeStackFunc(edgeStackID, func(edgeStack *portainer.EdgeStack) {
|
||||
edgeStack.Name = "Updated Stack"
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
updatedStack, err := service.EdgeStack(edgeStackID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Updated Stack", updatedStack.Name)
|
||||
|
||||
err = conn.UpdateTx(func(tx portainer.Transaction) error {
|
||||
return service.UpdateEdgeStackFuncTx(tx, edgeStackID, func(edgeStack *portainer.EdgeStack) {
|
||||
edgeStack.Name = "Updated Stack Again"
|
||||
})
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
updatedStack, err = service.EdgeStack(edgeStackID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Updated Stack Again", updatedStack.Name)
|
||||
}
|
|
@ -6,8 +6,6 @@ import (
|
|||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// BucketName represents the name of the bucket where this service stores data.
|
||||
|
@ -16,7 +14,6 @@ const BucketName = "endpoint_relations"
|
|||
// Service represents a service for managing environment(endpoint) relation data.
|
||||
type Service struct {
|
||||
connection portainer.Connection
|
||||
updateStackFn func(ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error
|
||||
updateStackFnTx func(tx portainer.Transaction, ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error
|
||||
endpointRelationsCache []portainer.EndpointRelation
|
||||
mu sync.Mutex
|
||||
|
@ -29,10 +26,8 @@ func (service *Service) BucketName() string {
|
|||
}
|
||||
|
||||
func (service *Service) RegisterUpdateStackFunction(
|
||||
updateFunc func(portainer.EdgeStackID, func(*portainer.EdgeStack)) error,
|
||||
updateFuncTx func(portainer.Transaction, portainer.EdgeStackID, func(*portainer.EdgeStack)) error,
|
||||
) {
|
||||
service.updateStackFn = updateFunc
|
||||
service.updateStackFnTx = updateFuncTx
|
||||
}
|
||||
|
||||
|
@ -91,24 +86,9 @@ func (service *Service) Create(endpointRelation *portainer.EndpointRelation) err
|
|||
|
||||
// UpdateEndpointRelation updates an Environment(Endpoint) relation object
|
||||
func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error {
|
||||
previousRelationState, _ := service.EndpointRelation(endpointID)
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(endpointID))
|
||||
err := service.connection.UpdateObject(BucketName, identifier, endpointRelation)
|
||||
cache.Del(endpointID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updatedRelationState, _ := service.EndpointRelation(endpointID)
|
||||
|
||||
service.mu.Lock()
|
||||
service.endpointRelationsCache = nil
|
||||
service.mu.Unlock()
|
||||
|
||||
service.updateEdgeStacksAfterRelationChange(previousRelationState, updatedRelationState)
|
||||
|
||||
return nil
|
||||
return service.connection.UpdateTx(func(tx portainer.Transaction) error {
|
||||
return service.Tx(tx).UpdateEndpointRelation(endpointID, endpointRelation)
|
||||
})
|
||||
}
|
||||
|
||||
func (service *Service) AddEndpointRelationsForEdgeStack(endpointIDs []portainer.EndpointID, edgeStackID portainer.EdgeStackID) error {
|
||||
|
@ -125,72 +105,7 @@ func (service *Service) RemoveEndpointRelationsForEdgeStack(endpointIDs []portai
|
|||
|
||||
// DeleteEndpointRelation deletes an Environment(Endpoint) relation object
|
||||
func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID) error {
|
||||
deletedRelation, _ := service.EndpointRelation(endpointID)
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(endpointID))
|
||||
err := service.connection.DeleteObject(BucketName, identifier)
|
||||
cache.Del(endpointID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.mu.Lock()
|
||||
service.endpointRelationsCache = nil
|
||||
service.mu.Unlock()
|
||||
|
||||
service.updateEdgeStacksAfterRelationChange(deletedRelation, nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) updateEdgeStacksAfterRelationChange(previousRelationState *portainer.EndpointRelation, updatedRelationState *portainer.EndpointRelation) {
|
||||
relations, _ := service.EndpointRelations()
|
||||
|
||||
stacksToUpdate := map[portainer.EdgeStackID]bool{}
|
||||
|
||||
if previousRelationState != nil {
|
||||
for stackId, enabled := range previousRelationState.EdgeStacks {
|
||||
// flag stack for update if stack is not in the updated relation state
|
||||
// = stack has been removed for this relation
|
||||
// or this relation has been deleted
|
||||
if enabled && (updatedRelationState == nil || !updatedRelationState.EdgeStacks[stackId]) {
|
||||
stacksToUpdate[stackId] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if updatedRelationState != nil {
|
||||
for stackId, enabled := range updatedRelationState.EdgeStacks {
|
||||
// flag stack for update if stack is not in the previous relation state
|
||||
// = stack has been added for this relation
|
||||
if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) {
|
||||
stacksToUpdate[stackId] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for each stack referenced by the updated relation
|
||||
// list how many time this stack is referenced in all relations
|
||||
// in order to update the stack deployments count
|
||||
for refStackId, refStackEnabled := range stacksToUpdate {
|
||||
if !refStackEnabled {
|
||||
continue
|
||||
}
|
||||
|
||||
numDeployments := 0
|
||||
|
||||
for _, r := range relations {
|
||||
for sId, enabled := range r.EdgeStacks {
|
||||
if enabled && sId == refStackId {
|
||||
numDeployments += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := service.updateStackFn(refStackId, func(edgeStack *portainer.EdgeStack) {
|
||||
edgeStack.NumDeployments = numDeployments
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("could not update the number of deployments")
|
||||
}
|
||||
}
|
||||
return service.connection.UpdateTx(func(tx portainer.Transaction) error {
|
||||
return service.Tx(tx).DeleteEndpointRelation(endpointID)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
package endpointrelation
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/database/boltdb"
|
||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpdateRelation(t *testing.T) {
|
||||
const endpointID = 1
|
||||
const edgeStackID1 = 1
|
||||
const edgeStackID2 = 2
|
||||
|
||||
var conn portainer.Connection = &boltdb.DbConnection{Path: t.TempDir()}
|
||||
err := conn.Open()
|
||||
require.NoError(t, err)
|
||||
|
||||
defer conn.Close()
|
||||
|
||||
service, err := NewService(conn)
|
||||
require.NoError(t, err)
|
||||
|
||||
updateStackFnTxCalled := false
|
||||
|
||||
edgeStacks := make(map[portainer.EdgeStackID]portainer.EdgeStack)
|
||||
edgeStacks[edgeStackID1] = portainer.EdgeStack{ID: edgeStackID1}
|
||||
edgeStacks[edgeStackID2] = portainer.EdgeStack{ID: edgeStackID2}
|
||||
|
||||
service.RegisterUpdateStackFunction(func(tx portainer.Transaction, ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error {
|
||||
updateStackFnTxCalled = true
|
||||
|
||||
s, ok := edgeStacks[ID]
|
||||
require.True(t, ok)
|
||||
|
||||
updateFunc(&s)
|
||||
edgeStacks[ID] = s
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// Nil relation
|
||||
|
||||
cache.Set(endpointID, []byte("value"))
|
||||
|
||||
err = service.UpdateEndpointRelation(endpointID, nil)
|
||||
_, cacheKeyExists := cache.Get(endpointID)
|
||||
require.NoError(t, err)
|
||||
require.False(t, updateStackFnTxCalled)
|
||||
require.False(t, cacheKeyExists)
|
||||
|
||||
// Add a relation to two edge stacks
|
||||
|
||||
cache.Set(endpointID, []byte("value"))
|
||||
|
||||
err = service.UpdateEndpointRelation(endpointID, &portainer.EndpointRelation{
|
||||
EndpointID: endpointID,
|
||||
EdgeStacks: map[portainer.EdgeStackID]bool{
|
||||
edgeStackID1: true,
|
||||
edgeStackID2: true,
|
||||
},
|
||||
})
|
||||
_, cacheKeyExists = cache.Get(endpointID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, updateStackFnTxCalled)
|
||||
require.False(t, cacheKeyExists)
|
||||
require.Equal(t, 1, edgeStacks[edgeStackID1].NumDeployments)
|
||||
require.Equal(t, 1, edgeStacks[edgeStackID2].NumDeployments)
|
||||
|
||||
// Remove a relation to one edge stack
|
||||
|
||||
updateStackFnTxCalled = false
|
||||
cache.Set(endpointID, []byte("value"))
|
||||
|
||||
err = service.UpdateEndpointRelation(endpointID, &portainer.EndpointRelation{
|
||||
EndpointID: endpointID,
|
||||
EdgeStacks: map[portainer.EdgeStackID]bool{
|
||||
2: true,
|
||||
},
|
||||
})
|
||||
_, cacheKeyExists = cache.Get(endpointID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, updateStackFnTxCalled)
|
||||
require.False(t, cacheKeyExists)
|
||||
require.Equal(t, 0, edgeStacks[edgeStackID1].NumDeployments)
|
||||
require.Equal(t, 1, edgeStacks[edgeStackID2].NumDeployments)
|
||||
|
||||
// Delete the relation
|
||||
|
||||
updateStackFnTxCalled = false
|
||||
cache.Set(endpointID, []byte("value"))
|
||||
|
||||
err = service.DeleteEndpointRelation(endpointID)
|
||||
|
||||
_, cacheKeyExists = cache.Get(endpointID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, updateStackFnTxCalled)
|
||||
require.False(t, cacheKeyExists)
|
||||
require.Equal(t, 0, edgeStacks[edgeStackID1].NumDeployments)
|
||||
require.Equal(t, 0, edgeStacks[edgeStackID2].NumDeployments)
|
||||
}
|
|
@ -186,53 +186,49 @@ func (service ServiceTx) cachedEndpointRelations() ([]portainer.EndpointRelation
|
|||
}
|
||||
|
||||
func (service ServiceTx) updateEdgeStacksAfterRelationChange(previousRelationState *portainer.EndpointRelation, updatedRelationState *portainer.EndpointRelation) {
|
||||
relations, _ := service.EndpointRelations()
|
||||
|
||||
stacksToUpdate := map[portainer.EdgeStackID]bool{}
|
||||
|
||||
if previousRelationState != nil {
|
||||
for stackId, enabled := range previousRelationState.EdgeStacks {
|
||||
// flag stack for update if stack is not in the updated relation state
|
||||
// = stack has been removed for this relation
|
||||
// or this relation has been deleted
|
||||
if enabled && (updatedRelationState == nil || !updatedRelationState.EdgeStacks[stackId]) {
|
||||
stacksToUpdate[stackId] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := service.service.updateStackFnTx(service.tx, stackId, func(edgeStack *portainer.EdgeStack) {
|
||||
// Sanity check
|
||||
if edgeStack.NumDeployments <= 0 {
|
||||
log.Error().
|
||||
Int("edgestack_id", int(edgeStack.ID)).
|
||||
Int("endpoint_id", int(previousRelationState.EndpointID)).
|
||||
Int("num_deployments", edgeStack.NumDeployments).
|
||||
Msg("cannot decrement the number of deployments for an edge stack with zero deployments")
|
||||
|
||||
if updatedRelationState != nil {
|
||||
for stackId, enabled := range updatedRelationState.EdgeStacks {
|
||||
// flag stack for update if stack is not in the previous relation state
|
||||
// = stack has been added for this relation
|
||||
if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) {
|
||||
stacksToUpdate[stackId] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// for each stack referenced by the updated relation
|
||||
// list how many time this stack is referenced in all relations
|
||||
// in order to update the stack deployments count
|
||||
for refStackId, refStackEnabled := range stacksToUpdate {
|
||||
if !refStackEnabled {
|
||||
continue
|
||||
}
|
||||
|
||||
numDeployments := 0
|
||||
|
||||
for _, r := range relations {
|
||||
for sId, enabled := range r.EdgeStacks {
|
||||
if enabled && sId == refStackId {
|
||||
numDeployments += 1
|
||||
edgeStack.NumDeployments--
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("could not update the number of deployments")
|
||||
}
|
||||
|
||||
cache.Del(previousRelationState.EndpointID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := service.service.updateStackFnTx(service.tx, refStackId, func(edgeStack *portainer.EdgeStack) {
|
||||
edgeStack.NumDeployments = numDeployments
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("could not update the number of deployments")
|
||||
if updatedRelationState == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for stackId, enabled := range updatedRelationState.EdgeStacks {
|
||||
// flag stack for update if stack is not in the previous relation state
|
||||
// = stack has been added for this relation
|
||||
if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) {
|
||||
if err := service.service.updateStackFnTx(service.tx, stackId, func(edgeStack *portainer.EdgeStack) {
|
||||
edgeStack.NumDeployments++
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("could not update the number of deployments")
|
||||
}
|
||||
|
||||
cache.Del(updatedRelationState.EndpointID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ func (store *Store) initServices() error {
|
|||
return err
|
||||
}
|
||||
store.EdgeStackService = edgeStackService
|
||||
endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFunc, edgeStackService.UpdateEdgeStackFuncTx)
|
||||
endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFuncTx)
|
||||
|
||||
edgeStackStatusService, err := edgestackstatus.NewService(store.connection)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue