mirror of https://github.com/portainer/portainer
fix(edgestacks): fix edge stacks cache invalidation EE-4909 (#8399)
parent
42ca1287df
commit
5f3dd0a64f
|
@ -5,7 +5,6 @@ import (
|
|||
"sync"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
@ -17,9 +16,10 @@ const (
|
|||
|
||||
// Service represents a service for managing Edge stack data.
|
||||
type Service struct {
|
||||
connection portainer.Connection
|
||||
idxVersion map[portainer.EdgeStackID]int
|
||||
mu sync.RWMutex
|
||||
connection portainer.Connection
|
||||
idxVersion map[portainer.EdgeStackID]int
|
||||
mu sync.RWMutex
|
||||
cacheInvalidationFn func(portainer.EdgeStackID)
|
||||
}
|
||||
|
||||
func (service *Service) BucketName() string {
|
||||
|
@ -27,15 +27,20 @@ func (service *Service) BucketName() string {
|
|||
}
|
||||
|
||||
// NewService creates a new instance of a service.
|
||||
func NewService(connection portainer.Connection) (*Service, error) {
|
||||
func NewService(connection portainer.Connection, cacheInvalidationFn func(portainer.EdgeStackID)) (*Service, error) {
|
||||
err := connection.SetServiceName(BucketName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
connection: connection,
|
||||
idxVersion: make(map[portainer.EdgeStackID]int),
|
||||
connection: connection,
|
||||
idxVersion: make(map[portainer.EdgeStackID]int),
|
||||
cacheInvalidationFn: cacheInvalidationFn,
|
||||
}
|
||||
|
||||
if s.cacheInvalidationFn == nil {
|
||||
s.cacheInvalidationFn = func(portainer.EdgeStackID) {}
|
||||
}
|
||||
|
||||
es, err := s.EdgeStacks()
|
||||
|
@ -109,12 +114,9 @@ func (service *Service) Create(id portainer.EdgeStackID, edgeStack *portainer.Ed
|
|||
|
||||
service.mu.Lock()
|
||||
service.idxVersion[id] = edgeStack.Version
|
||||
service.cacheInvalidationFn(id)
|
||||
service.mu.Unlock()
|
||||
|
||||
for endpointID := range edgeStack.Status {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -123,37 +125,15 @@ func (service *Service) UpdateEdgeStack(ID portainer.EdgeStackID, edgeStack *por
|
|||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
|
||||
prevEdgeStack, err := service.EdgeStack(ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
|
||||
err = service.connection.UpdateObject(BucketName, identifier, edgeStack)
|
||||
err := service.connection.UpdateObject(BucketName, identifier, edgeStack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.idxVersion[ID] = edgeStack.Version
|
||||
|
||||
// Invalidate cache for removed environments
|
||||
for endpointID := range prevEdgeStack.Status {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
}
|
||||
|
||||
// Invalidate cache when version changes and for added environments
|
||||
for endpointID := range edgeStack.Status {
|
||||
if prevEdgeStack.Version == edgeStack.Version {
|
||||
if _, ok := prevEdgeStack.Status[endpointID]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -167,35 +147,10 @@ func (service *Service) UpdateEdgeStackFunc(ID portainer.EdgeStackID, updateFunc
|
|||
defer service.mu.Unlock()
|
||||
|
||||
return service.connection.UpdateObjectFunc(BucketName, id, edgeStack, func() {
|
||||
prevEndpoints := make(map[portainer.EndpointID]struct{}, len(edgeStack.Status))
|
||||
for endpointID := range edgeStack.Status {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
prevEndpoints[endpointID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
updateFunc(edgeStack)
|
||||
|
||||
prevVersion := service.idxVersion[ID]
|
||||
service.idxVersion[ID] = edgeStack.Version
|
||||
|
||||
// Invalidate cache for removed environments
|
||||
for endpointID := range prevEndpoints {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
}
|
||||
|
||||
// Invalidate cache when version changes and for added environments
|
||||
for endpointID := range edgeStack.Status {
|
||||
if prevVersion == edgeStack.Version {
|
||||
if _, ok := prevEndpoints[endpointID]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -204,23 +159,16 @@ func (service *Service) DeleteEdgeStack(ID portainer.EdgeStackID) error {
|
|||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
|
||||
edgeStack, err := service.EdgeStack(ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
|
||||
err = service.connection.DeleteObject(BucketName, identifier)
|
||||
err := service.connection.DeleteObject(BucketName, identifier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(service.idxVersion, ID)
|
||||
|
||||
for endpointID := range edgeStack.Status {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -95,3 +95,19 @@ func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID)
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (service *Service) InvalidateEdgeCacheForEdgeStack(edgeStackID portainer.EdgeStackID) {
|
||||
rels, err := service.EndpointRelations()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cannot retrieve endpoint relations")
|
||||
return
|
||||
}
|
||||
|
||||
for _, rel := range rels {
|
||||
for id := range rel.EdgeStacks {
|
||||
if edgeStackID == id {
|
||||
cache.Del(rel.EndpointID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,13 @@ func (store *Store) initServices() error {
|
|||
}
|
||||
store.DockerHubService = dockerhubService
|
||||
|
||||
edgeStackService, err := edgestack.NewService(store.connection)
|
||||
endpointRelationService, err := endpointrelation.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
store.EndpointRelationService = endpointRelationService
|
||||
|
||||
edgeStackService, err := edgestack.NewService(store.connection, endpointRelationService.InvalidateEdgeCacheForEdgeStack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -123,12 +129,6 @@ func (store *Store) initServices() error {
|
|||
}
|
||||
store.EndpointService = endpointService
|
||||
|
||||
endpointRelationService, err := endpointrelation.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
store.EndpointRelationService = endpointRelationService
|
||||
|
||||
extensionService, err := extension.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue