From 9a86737caa5cfa8b6c691041f940d255ea8e7d2c Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:24:54 -0300 Subject: [PATCH] fix(edgestacks): add a status update coordinator to increase performance BE-11572 (#337) --- .../edgestacks/edgestack_status_update.go | 50 ++---- .../edgestack_status_update_coordinator.go | 166 ++++++++++++++++++ api/http/handler/edgestacks/handler.go | 5 + 3 files changed, 182 insertions(+), 39 deletions(-) create mode 100644 api/http/handler/edgestacks/edgestack_status_update_coordinator.go diff --git a/api/http/handler/edgestacks/edgestack_status_update.go b/api/http/handler/edgestacks/edgestack_status_update.go index ab6ffd8a3..f3f903385 100644 --- a/api/http/handler/edgestacks/edgestack_status_update.go +++ b/api/http/handler/edgestacks/edgestack_status_update.go @@ -8,7 +8,6 @@ import ( "time" portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/dataservices" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" @@ -69,15 +68,17 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req return httperror.BadRequest("Invalid request payload", fmt.Errorf("edge polling error: %w. Environment ID: %d", err, payload.EndpointID)) } - var stack *portainer.EdgeStack - if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { - if err := r.Context().Err(); err != nil { - return err - } + endpoint, err := handler.DataStore.Endpoint().Endpoint(payload.EndpointID) + if err != nil { + return handlerDBErr(fmt.Errorf("unable to find the environment from the database: %w. Environment ID: %d", err, payload.EndpointID), "unable to find the environment") + } - stack, err = handler.updateEdgeStackStatus(tx, r, portainer.EdgeStackID(stackID), payload) - return err - }); err != nil { + if err := handler.requestBouncer.AuthorizedEdgeEndpointOperation(r, endpoint); err != nil { + return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name)) + } + + stack, err := handler.stackCoordinator.UpdateStatus(r, endpoint, portainer.EdgeStackID(stackID), payload) + if err != nil { var httpErr *httperror.HandlerError if errors.As(err, &httpErr) { return httpErr @@ -93,36 +94,11 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req return response.JSON(w, stack) } -func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) { - stack, err := tx.EdgeStack().EdgeStack(stackID) - if err != nil { - if dataservices.IsErrObjectNotFound(err) { - // Skip error because agent tries to report on deleted stack - log.Debug(). - Err(err). - Int("stackID", int(stackID)). - Int("status", int(*payload.Status)). - Msg("Unable to find a stack inside the database, skipping error") - - return nil, nil - } - - return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w. Environment ID: %d", err, payload.EndpointID) - } - +func (handler *Handler) updateEdgeStackStatus(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) { if payload.Version > 0 && payload.Version < stack.Version { return stack, nil } - endpoint, err := tx.Endpoint().Endpoint(payload.EndpointID) - if err != nil { - return nil, handlerDBErr(fmt.Errorf("unable to find the environment from the database: %w. Environment ID: %d", err, payload.EndpointID), "unable to find the environment") - } - - if err := handler.requestBouncer.AuthorizedEdgeEndpointOperation(r, endpoint); err != nil { - return nil, httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name)) - } - status := *payload.Status log.Debug(). @@ -138,10 +114,6 @@ func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, r *ht updateEnvStatus(payload.EndpointID, stack, deploymentStatus) - if err := tx.EdgeStack().UpdateEdgeStack(stackID, stack); err != nil { - return nil, handlerDBErr(fmt.Errorf("unable to update Edge stack to the database: %w. Environment name: %s", err, endpoint.Name), "unable to update Edge stack") - } - return stack, nil } diff --git a/api/http/handler/edgestacks/edgestack_status_update_coordinator.go b/api/http/handler/edgestacks/edgestack_status_update_coordinator.go new file mode 100644 index 000000000..7e557e93f --- /dev/null +++ b/api/http/handler/edgestacks/edgestack_status_update_coordinator.go @@ -0,0 +1,166 @@ +package edgestacks + +import ( + "errors" + "fmt" + "net/http" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" + + "github.com/rs/zerolog/log" +) + +type statusRequest struct { + r *http.Request + respCh chan statusResponse + endpoint *portainer.Endpoint + stackID portainer.EdgeStackID + payload updateStatusPayload +} + +type statusResponse struct { + Stack *portainer.EdgeStack + Error error +} + +type statusUpdateFn func(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) + +type EdgeStackStatusUpdateCoordinator struct { + updateCh chan statusRequest + dataStore dataservices.DataStore + statusUpdateFn statusUpdateFn +} + +var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress") + +func NewEdgeStackStatusUpdateCoordinator( + dataStore dataservices.DataStore, + statusUpdateFn statusUpdateFn, +) *EdgeStackStatusUpdateCoordinator { + return &EdgeStackStatusUpdateCoordinator{ + updateCh: make(chan statusRequest), + dataStore: dataStore, + statusUpdateFn: statusUpdateFn, + } +} + +func (c *EdgeStackStatusUpdateCoordinator) Start() { + for { + c.loop() + } +} + +func (c *EdgeStackStatusUpdateCoordinator) loop() { + u := <-c.updateCh + + respChs := []chan statusResponse{u.respCh} + + var stack *portainer.EdgeStack + + err := c.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + // 1. Load the edge stack + var err error + + stack, err = loadEdgeStack(tx, u.stackID) + if err != nil { + return err + } + + // 2. Mutate the edge stack opportunistically until there are no more pending updates + for { + stack, err = c.statusUpdateFn(stack, u.endpoint, u.r, stack.ID, u.payload) + if err != nil { + return err + } + + if m, ok := c.getNextUpdate(stack.ID); ok { + u = m + } else { + break + } + + respChs = append(respChs, u.respCh) + } + + // 3. Save the changes back to the database + if err := tx.EdgeStack().UpdateEdgeStack(stack.ID, stack); err != nil { + return handlerDBErr(fmt.Errorf("unable to update Edge stack: %w.", err), "Unable to persist the stack changes inside the database") + } + + return nil + }) + + // 4. Send back the responses + for _, ch := range respChs { + ch <- statusResponse{Stack: stack, Error: err} + } +} + +func loadEdgeStack(tx dataservices.DataStoreTx, stackID portainer.EdgeStackID) (*portainer.EdgeStack, error) { + stack, err := tx.EdgeStack().EdgeStack(stackID) + if err != nil { + if dataservices.IsErrObjectNotFound(err) { + // Skip the error when the agent tries to update the status on a deleted stack + log.Debug(). + Err(err). + Int("stackID", int(stackID)). + Msg("Unable to find a stack inside the database, skipping error") + + return nil, nil + } + + return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w.", err) + } + + return stack, nil +} + +func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeStackID) (statusRequest, bool) { + for { + select { + case u := <-c.updateCh: + // Discard the update and let the agent retry + if u.stackID != stackID { + u.respCh <- statusResponse{Error: errAnotherStackUpdateInProgress} + + continue + } + + return u, true + + default: + return statusRequest{}, false + } + } +} + +func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus( + r *http.Request, + endpoint *portainer.Endpoint, + stackID portainer.EdgeStackID, + payload updateStatusPayload) ( + *portainer.EdgeStack, + error, +) { + respCh := make(chan statusResponse) + defer close(respCh) + + msg := statusRequest{ + respCh: respCh, + r: r, + endpoint: endpoint, + stackID: stackID, + payload: payload, + } + + select { + case c.updateCh <- msg: + r := <-respCh + + return r.Stack, r.Error + + case <-r.Context().Done(): + return nil, r.Context().Err() + } +} diff --git a/api/http/handler/edgestacks/handler.go b/api/http/handler/edgestacks/handler.go index dd3b5593d..6a4a51e3e 100644 --- a/api/http/handler/edgestacks/handler.go +++ b/api/http/handler/edgestacks/handler.go @@ -22,6 +22,7 @@ type Handler struct { GitService portainer.GitService edgeStacksService *edgestackservice.Service KubernetesDeployer portainer.KubernetesDeployer + stackCoordinator *EdgeStackStatusUpdateCoordinator } // NewHandler creates a handler to manage environment(endpoint) group operations. @@ -33,6 +34,10 @@ func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStor edgeStacksService: edgeStacksService, } + h.stackCoordinator = NewEdgeStackStatusUpdateCoordinator(dataStore, h.updateEdgeStackStatus) + + go h.stackCoordinator.Start() + h.Handle("/edge_stacks/create/{method}", bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackCreate)))).Methods(http.MethodPost) h.Handle("/edge_stacks",