From 61c5269353ff8fc0729d14db87cfda498bca2067 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Mon, 14 Oct 2024 10:37:13 -0300 Subject: [PATCH] fix(edgejobs): decouple the Edge Jobs from the reverse tunnel service BE-10866 (#11) --- api/chisel/schedules.go | 82 -------------- api/cmd/portainer/main.go | 5 - .../handler/edgegroups/edgegroup_update.go | 19 ++-- api/http/handler/edgejobs/edgejob_create.go | 23 ++-- api/http/handler/edgejobs/edgejob_delete.go | 17 ++- .../edgejobs/edgejob_tasklogs_clear.go | 19 ++-- .../edgejobs/edgejob_tasklogs_collect.go | 14 ++- api/http/handler/edgejobs/edgejob_update.go | 25 ++--- .../endpointedge/endpointedge_job_logs.go | 12 +-- .../endpointedge_status_inspect.go | 30 +++++- .../endpointedge_status_inspect_test.go | 101 ++++++++++++------ api/internal/edge/edgejob.go | 27 ----- api/internal/edge/endpoint.go | 34 ++++++ api/portainer.go | 4 - 14 files changed, 174 insertions(+), 238 deletions(-) delete mode 100644 api/chisel/schedules.go delete mode 100644 api/internal/edge/edgejob.go diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go deleted file mode 100644 index 4b0203a33..000000000 --- a/api/chisel/schedules.go +++ /dev/null @@ -1,82 +0,0 @@ -package chisel - -import ( - portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/internal/edge/cache" -) - -// EdgeJobs retrieves the edge jobs for the given environment -func (service *Service) EdgeJobs(endpointID portainer.EndpointID) []portainer.EdgeJob { - service.mu.RLock() - defer service.mu.RUnlock() - - return append( - make([]portainer.EdgeJob, 0, len(service.edgeJobs[endpointID])), - service.edgeJobs[endpointID]..., - ) -} - -// AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint). -func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) { - if endpoint.Edge.AsyncMode { - return - } - - service.mu.Lock() - defer service.mu.Unlock() - - existingJobIndex := -1 - for idx, existingJob := range service.edgeJobs[endpoint.ID] { - if existingJob.ID == edgeJob.ID { - existingJobIndex = idx - - break - } - } - - if existingJobIndex == -1 { - service.edgeJobs[endpoint.ID] = append(service.edgeJobs[endpoint.ID], *edgeJob) - } else { - service.edgeJobs[endpoint.ID][existingJobIndex] = *edgeJob - } - - cache.Del(endpoint.ID) -} - -// RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with. -func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { - service.mu.Lock() - - for endpointID := range service.edgeJobs { - n := 0 - for _, edgeJob := range service.edgeJobs[endpointID] { - if edgeJob.ID != edgeJobID { - service.edgeJobs[endpointID][n] = edgeJob - n++ - } - } - - service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n] - - cache.Del(endpointID) - } - - service.mu.Unlock() -} - -func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) { - service.mu.Lock() - defer service.mu.Unlock() - - n := 0 - for _, edgeJob := range service.edgeJobs[endpointID] { - if edgeJob.ID != edgeJobID { - service.edgeJobs[endpointID][n] = edgeJob - n++ - } - } - - service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n] - - cache.Del(endpointID) -} diff --git a/api/cmd/portainer/main.go b/api/cmd/portainer/main.go index e822c5456..e264d1911 100644 --- a/api/cmd/portainer/main.go +++ b/api/cmd/portainer/main.go @@ -31,7 +31,6 @@ import ( "github.com/portainer/portainer/api/http/proxy" kubeproxy "github.com/portainer/portainer/api/http/proxy/factory/kubernetes" "github.com/portainer/portainer/api/internal/authorization" - "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge/edgestacks" "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/snapshot" @@ -467,10 +466,6 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server { log.Fatal().Err(err).Msg("failed initializing helm package manager") } - if err := edge.LoadEdgeJobs(dataStore, reverseTunnelService); err != nil { - log.Fatal().Err(err).Msg("failed loading edge jobs from database") - } - applicationStatus := initStatus(instanceID) // channel to control when the admin user is created diff --git a/api/http/handler/edgegroups/edgegroup_update.go b/api/http/handler/edgegroups/edgegroup_update.go index c724a228c..2d04f7cd2 100644 --- a/api/http/handler/edgegroups/edgegroup_update.go +++ b/api/http/handler/edgegroups/edgegroup_update.go @@ -8,6 +8,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/slicesx" httperror "github.com/portainer/portainer/pkg/libhttp/error" @@ -55,8 +56,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) } var payload edgeGroupUpdatePayload - err = request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { + if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil { return httperror.BadRequest("Invalid request payload", err) } @@ -105,8 +105,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) edgeGroup.PartialMatch = *payload.PartialMatch } - err = tx.EdgeGroup().Update(edgeGroup.ID, edgeGroup) - if err != nil { + if err := tx.EdgeGroup().Update(edgeGroup.ID, edgeGroup); err != nil { return httperror.InternalServerError("Unable to persist Edge group changes inside the database", err) } @@ -136,8 +135,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) return httperror.InternalServerError("Unable to get Environment from database", err) } - err = handler.updateEndpointStacks(tx, endpoint, edgeGroups, edgeStacks) - if err != nil { + if err := handler.updateEndpointStacks(tx, endpoint, edgeGroups, edgeStacks); err != nil { return httperror.InternalServerError("Unable to persist Environment relation changes inside the database", err) } @@ -156,8 +154,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) continue } - err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpoint, edgeJobs, operation) - if err != nil { + if err := handler.updateEndpointEdgeJobs(edgeGroup.ID, endpoint, edgeJobs, operation); err != nil { return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err) } } @@ -198,10 +195,8 @@ func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID } switch operation { - case "add": - handler.ReverseTunnelService.AddEdgeJob(endpoint, &edgeJob) - case "remove": - handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpoint.ID, edgeJob.ID) + case "add", "remove": + cache.Del(endpoint.ID) } } diff --git a/api/http/handler/edgejobs/edgejob_create.go b/api/http/handler/edgejobs/edgejob_create.go index 0446c107b..252770aa2 100644 --- a/api/http/handler/edgejobs/edgejob_create.go +++ b/api/http/handler/edgejobs/edgejob_create.go @@ -11,6 +11,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" "github.com/portainer/portainer/api/internal/endpointutils" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" @@ -113,11 +114,14 @@ func (handler *Handler) createEdgeJob(tx dataservices.DataStoreTx, payload *edge } } - err = handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints) - if err != nil { + if err := handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints); err != nil { return nil, httperror.InternalServerError("Unable to schedule Edge job", err) } + for _, endpointID := range endpoints { + cache.Del(endpointID) + } + return edgeJob, nil } @@ -144,15 +148,13 @@ func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error { payload.CronExpression = cronExpression var endpoints []portainer.EndpointID - err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true) - if err != nil { + if err := request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true); err != nil { return errors.New("invalid environments") } payload.Endpoints = endpoints var edgeGroups []portainer.EdgeGroupID - err = request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true) - if err != nil { + if err := request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true); err != nil { return errors.New("invalid edge groups") } payload.EdgeGroups = edgeGroups @@ -267,15 +269,6 @@ func (handler *Handler) addAndPersistEdgeJob(tx dataservices.DataStoreTx, edgeJo return errors.New("environments or edge groups are mandatory for an Edge job") } - for endpointID := range endpointsMap { - endpoint, err := tx.Endpoint().Endpoint(endpointID) - if err != nil { - return err - } - - handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) - } - return tx.EdgeJob().CreateWithID(edgeJob.ID, edgeJob) } diff --git a/api/http/handler/edgejobs/edgejob_delete.go b/api/http/handler/edgejobs/edgejob_delete.go index 9472167f8..fa78e6a50 100644 --- a/api/http/handler/edgejobs/edgejob_delete.go +++ b/api/http/handler/edgejobs/edgejob_delete.go @@ -9,9 +9,11 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" + "github.com/rs/zerolog/log" ) @@ -33,10 +35,9 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h return httperror.BadRequest("Invalid Edge job identifier route variable", err) } - err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { return handler.deleteEdgeJob(tx, portainer.EdgeJobID(edgeJobID)) - }) - if err != nil { + }); err != nil { var handlerError *httperror.HandlerError if errors.As(err, &handlerError) { return handlerError @@ -57,13 +58,10 @@ func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID por } edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(int(edgeJobID))) - err = handler.FileService.RemoveDirectory(edgeJobFolder) - if err != nil { + if err := handler.FileService.RemoveDirectory(edgeJobFolder); err != nil { log.Warn().Err(err).Msg("Unable to remove the files associated to the Edge job on the filesystem") } - handler.ReverseTunnelService.RemoveEdgeJob(edgeJob.ID) - var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta if len(edgeJob.EdgeGroups) > 0 { endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) @@ -78,11 +76,10 @@ func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID por } for endpointID := range endpointsMap { - handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + cache.Del(endpointID) } - err = tx.EdgeJob().Delete(edgeJob.ID) - if err != nil { + if err := tx.EdgeJob().Delete(edgeJob.ID); err != nil { return httperror.InternalServerError("Unable to remove the Edge job from the database", err) } diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go index 6594834d9..dfa99042f 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go @@ -9,6 +9,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" @@ -53,7 +54,7 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request } } - err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error { mutationFn(edgeJob, endpointID, endpointsFromGroups) @@ -61,8 +62,7 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request } return handler.clearEdgeJobTaskLogs(tx, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn) - }) - if err != nil { + }); err != nil { var handlerError *httperror.HandlerError if errors.As(err, &handlerError) { return handlerError @@ -82,8 +82,7 @@ func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJo return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) } - err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))) - if err != nil { + if err := handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))); err != nil { return httperror.InternalServerError("Unable to clear log file from disk", err) } @@ -92,17 +91,11 @@ func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJo return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) } - err = updateEdgeJob(edgeJob, endpointID, endpointsFromGroups) - if err != nil { + if err := updateEdgeJob(edgeJob, endpointID, endpointsFromGroups); err != nil { return httperror.InternalServerError("Unable to persist Edge job changes in the database", err) } - endpoint, err := tx.Endpoint().Endpoint(endpointID) - if err != nil { - return httperror.NotFound("Unable to retrieve environment from the database", err) - } - - handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) + cache.Del(endpointID) return nil } diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go index a0f5940f8..e6bc53e46 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go @@ -8,6 +8,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" @@ -38,7 +39,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque return httperror.BadRequest("Invalid Task identifier route variable", err) } - err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { edgeJob, err := tx.EdgeJob().Read(portainer.EdgeJobID(edgeJobID)) if tx.IsErrObjectNotFound(err) { return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) @@ -64,8 +65,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque edgeJob.Endpoints[endpointID] = meta } - err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) - if err != nil { + if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil { return httperror.InternalServerError("Unable to persist Edge job changes in the database", err) } @@ -74,16 +74,14 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque return httperror.InternalServerError("Unable to retrieve environment from the database", err) } + cache.Del(endpointID) + if endpoint.Edge.AsyncMode { return httperror.BadRequest("Async Edge Endpoints are not supported in Portainer CE", nil) } - handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) - return nil - }) - - if err != nil { + }); err != nil { var handlerError *httperror.HandlerError if errors.As(err, &handlerError) { return handlerError diff --git a/api/http/handler/edgejobs/edgejob_update.go b/api/http/handler/edgejobs/edgejob_update.go index 558c80f81..468fbb8b7 100644 --- a/api/http/handler/edgejobs/edgejob_update.go +++ b/api/http/handler/edgejobs/edgejob_update.go @@ -10,6 +10,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/edge/cache" "github.com/portainer/portainer/api/internal/endpointutils" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" @@ -56,8 +57,7 @@ func (handler *Handler) edgeJobUpdate(w http.ResponseWriter, r *http.Request) *h } var payload edgeJobUpdatePayload - err = request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { + if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil { return httperror.BadRequest("Invalid request payload", err) } @@ -78,13 +78,11 @@ func (handler *Handler) updateEdgeJob(tx dataservices.DataStoreTx, edgeJobID por return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) } - err = handler.updateEdgeSchedule(tx, edgeJob, &payload) - if err != nil { + if err := handler.updateEdgeSchedule(tx, edgeJob, &payload); err != nil { return nil, httperror.InternalServerError("Unable to update Edge job", err) } - err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) - if err != nil { + if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil { return nil, httperror.InternalServerError("Unable to persist Edge job changes inside the database", err) } @@ -149,8 +147,7 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob if len(payload.EdgeGroups) > 0 { for _, edgeGroupID := range payload.EdgeGroups { - _, err := tx.EdgeGroup().Read(edgeGroupID) - if err != nil { + if _, err := tx.EdgeGroup().Read(edgeGroupID); err != nil { return err } @@ -203,8 +200,7 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob if payload.FileContent != nil && *payload.FileContent != string(fileContent) { fileContent = []byte(*payload.FileContent) - _, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), fileContent) - if err != nil { + if _, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), fileContent); err != nil { return err } @@ -223,16 +219,11 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints) for endpointID := range endpointsFromGroupsToAddMap { - endpoint, err := tx.Endpoint().Endpoint(endpointID) - if err != nil { - return err - } - - handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) + cache.Del(endpointID) } for endpointID := range endpointsToRemove { - handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + cache.Del(endpointID) } return nil diff --git a/api/http/handler/endpointedge/endpointedge_job_logs.go b/api/http/handler/endpointedge/endpointedge_job_logs.go index c6cf3de96..4ca01a65f 100644 --- a/api/http/handler/endpointedge/endpointedge_job_logs.go +++ b/api/http/handler/endpointedge/endpointedge_job_logs.go @@ -9,6 +9,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/http/middlewares" + "github.com/portainer/portainer/api/internal/edge/cache" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" @@ -84,8 +85,7 @@ func (handler *Handler) getEdgeJobLobs(tx dataservices.DataStoreTx, endpointID p return httperror.InternalServerError("Unable to find an edge job with the specified identifier inside the database", err) } - err = handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID)), []byte(payload.FileContent)) - if err != nil { + if err := handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpoint.ID)), []byte(payload.FileContent)); err != nil { return httperror.InternalServerError("Unable to save task log to the filesystem", err) } @@ -96,13 +96,11 @@ func (handler *Handler) getEdgeJobLobs(tx dataservices.DataStoreTx, endpointID p edgeJob.Endpoints[endpoint.ID] = meta } - err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) - - handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) - - if err != nil { + if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil { return httperror.InternalServerError("Unable to persist edge job changes to the database", err) } + cache.Del(endpointID) + return nil } diff --git a/api/http/handler/endpointedge/endpointedge_status_inspect.go b/api/http/handler/endpointedge/endpointedge_status_inspect.go index de22b195f..9bd341561 100644 --- a/api/http/handler/endpointedge/endpointedge_status_inspect.go +++ b/api/http/handler/endpointedge/endpointedge_status_inspect.go @@ -170,7 +170,7 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque Credentials: tunnel.Credentials, } - schedules, handlerErr := handler.buildSchedules(endpoint.ID) + schedules, handlerErr := handler.buildSchedules(tx, endpoint.ID) if handlerErr != nil { return nil, handlerErr } @@ -208,9 +208,33 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) { } } -func (handler *Handler) buildSchedules(endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) { +func (handler *Handler) buildSchedules(tx dataservices.DataStoreTx, endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) { schedules := []edgeJobResponse{} - for _, job := range handler.ReverseTunnelService.EdgeJobs(endpointID) { + + edgeJobs, err := tx.EdgeJob().ReadAll() + if err != nil { + return nil, httperror.InternalServerError("Unable to retrieve Edge Jobs", err) + } + + for _, job := range edgeJobs { + _, endpointHasJob := job.Endpoints[endpointID] + if !endpointHasJob { + for _, edgeGroupID := range job.EdgeGroups { + member, _, err := edge.EndpointInEdgeGroup(tx, endpointID, edgeGroupID) + if err != nil { + return nil, httperror.InternalServerError("Unable to retrieve relations", err) + } else if member { + endpointHasJob = true + + break + } + } + } + + if !endpointHasJob { + continue + } + var collectLogs bool if _, ok := job.GroupLogsCollection[endpointID]; ok { collectLogs = job.GroupLogsCollection[endpointID].CollectLogs diff --git a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go index 2b9be5722..8bfaa9814 100644 --- a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go +++ b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strconv" "testing" "time" @@ -36,7 +37,7 @@ var endpointTestCases = []endpointTestCase{ { portainer.Endpoint{ ID: -1, - Name: "endpoint-id--1", + Name: "endpoint-id-1", Type: portainer.EdgeAgentOnDockerEnvironment, URL: "https://portainer.io:9443", EdgeID: "edge-id", @@ -342,28 +343,48 @@ func TestEdgeStackStatus(t *testing.T) { func TestEdgeJobsResponse(t *testing.T) { handler := mustSetupHandler(t) - endpointID := portainer.EndpointID(77) - endpoint := portainer.Endpoint{ - ID: endpointID, - Name: "test-endpoint-77", - Type: portainer.EdgeAgentOnDockerEnvironment, - URL: "https://portainer.io:9443", - EdgeID: "edge-id", - LastCheckInDate: time.Now().Unix(), + localCreateEndpoint := func(endpointID portainer.EndpointID, tagIDs []portainer.TagID) *portainer.Endpoint { + endpoint := portainer.Endpoint{ + ID: endpointID, + Name: "test-endpoint-" + strconv.Itoa(int(endpointID)), + Type: portainer.EdgeAgentOnDockerEnvironment, + URL: "https://portainer.io:9443", + EdgeID: "edge-id-" + strconv.Itoa(int(endpointID)), + TagIDs: tagIDs, + LastCheckInDate: time.Now().Unix(), + UserTrusted: true, + } + err := createEndpoint(handler, endpoint, + portainer.EndpointRelation{EndpointID: endpointID}) + require.NoError(t, err) + + return &endpoint } - endpointRelation := portainer.EndpointRelation{ - EndpointID: endpoint.ID, + dynamicGroupTags := []portainer.TagID{1, 2, 3} + + endpoint := localCreateEndpoint(77, nil) + endpointFromStaticEdgeGroup := localCreateEndpoint(78, nil) + endpointFromDynamicEdgeGroup := localCreateEndpoint(79, dynamicGroupTags) + unrelatedEndpoint := localCreateEndpoint(80, nil) + + staticEdgeGroup := portainer.EdgeGroup{ + ID: 1, + Endpoints: []portainer.EndpointID{endpointFromStaticEdgeGroup.ID}, } + err := handler.DataStore.EdgeGroup().Create(&staticEdgeGroup) + require.NoError(t, err) - if err := createEndpoint(handler, endpoint, endpointRelation); err != nil { - t.Fatal(err) + dynamicEdgeGroup := portainer.EdgeGroup{ + ID: 2, + Dynamic: true, + TagIDs: dynamicGroupTags, } + err = handler.DataStore.EdgeGroup().Create(&dynamicEdgeGroup) + require.NoError(t, err) path, err := handler.FileService.StoreEdgeJobFileFromBytes("test-script", []byte("pwd")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) edgeJobID := portainer.EdgeJobID(35) edgeJob := portainer.EdgeJob{ @@ -374,32 +395,42 @@ func TestEdgeJobsResponse(t *testing.T) { ScriptPath: path, Recurring: true, Version: 57, + Endpoints: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{ + endpoint.ID: {}, + }, + EdgeGroups: []portainer.EdgeGroupID{staticEdgeGroup.ID, dynamicEdgeGroup.ID}, } - handler.ReverseTunnelService.AddEdgeJob(&endpoint, &edgeJob) + err = handler.DataStore.EdgeJob().Create(&edgeJob) + require.NoError(t, err) - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/api/endpoints/%d/edge/status", endpoint.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + f := func(endpoint *portainer.Endpoint, scheduleLen int) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/api/endpoints/%d/edge/status", endpoint.ID), nil) + require.NoError(t, err) - req.Header.Set(portainer.PortainerAgentEdgeIDHeader, "edge-id") - req.Header.Set(portainer.HTTPResponseAgentPlatform, "1") + req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID) + req.Header.Set(portainer.HTTPResponseAgentPlatform, "1") - rec := httptest.NewRecorder() - handler.ServeHTTP(rec, req) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("expected a %d response, found: %d", http.StatusOK, rec.Code) - } + require.Equal(t, http.StatusOK, rec.Code) - var data endpointEdgeStatusInspectResponse - if err := json.NewDecoder(rec.Body).Decode(&data); err != nil { - t.Fatal("error decoding response:", err) + var data endpointEdgeStatusInspectResponse + err = json.NewDecoder(rec.Body).Decode(&data) + require.NoError(t, err) + + require.Len(t, data.Schedules, scheduleLen) + + if scheduleLen > 0 { + require.Equal(t, edgeJob.ID, data.Schedules[0].ID) + require.Equal(t, edgeJob.CronExpression, data.Schedules[0].CronExpression) + require.Equal(t, edgeJob.Version, data.Schedules[0].Version) + } } - assert.Len(t, data.Schedules, 1) - assert.Equal(t, edgeJob.ID, data.Schedules[0].ID) - assert.Equal(t, edgeJob.CronExpression, data.Schedules[0].CronExpression) - assert.Equal(t, edgeJob.Version, data.Schedules[0].Version) + f(endpoint, 1) + f(endpointFromStaticEdgeGroup, 1) + f(endpointFromDynamicEdgeGroup, 1) + f(unrelatedEndpoint, 0) } diff --git a/api/internal/edge/edgejob.go b/api/internal/edge/edgejob.go deleted file mode 100644 index 4838dbc6a..000000000 --- a/api/internal/edge/edgejob.go +++ /dev/null @@ -1,27 +0,0 @@ -package edge - -import ( - portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/dataservices" -) - -// LoadEdgeJobs registers all edge jobs inside corresponding environment(endpoint) tunnel -func LoadEdgeJobs(dataStore dataservices.DataStore, reverseTunnelService portainer.ReverseTunnelService) error { - edgeJobs, err := dataStore.EdgeJob().ReadAll() - if err != nil { - return err - } - - for _, edgeJob := range edgeJobs { - for endpointID := range edgeJob.Endpoints { - endpoint, err := dataStore.Endpoint().Endpoint(endpointID) - if err != nil { - return err - } - - reverseTunnelService.AddEdgeJob(endpoint, &edgeJob) - } - } - - return nil -} diff --git a/api/internal/edge/endpoint.go b/api/internal/edge/endpoint.go index 090fe8ef9..27cc50b3b 100644 --- a/api/internal/edge/endpoint.go +++ b/api/internal/edge/endpoint.go @@ -1,8 +1,12 @@ package edge import ( + "slices" + portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" + + "github.com/rs/zerolog/log" ) // EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint) @@ -39,3 +43,33 @@ func EffectiveCheckinInterval(tx dataservices.DataStoreTx, endpoint *portainer.E return portainer.DefaultEdgeAgentCheckinIntervalInSeconds } + +// EndpointInEdgeGroup returns true and the edge group name if the endpoint is in the edge group +func EndpointInEdgeGroup( + tx dataservices.DataStoreTx, + endpointID portainer.EndpointID, + edgeGroupID portainer.EdgeGroupID, +) (bool, string, error) { + endpointIDs, err := GetEndpointsFromEdgeGroups( + []portainer.EdgeGroupID{edgeGroupID}, tx, + ) + if err != nil { + return false, "", err + } + + if slices.Contains(endpointIDs, endpointID) { + edgeGroup, err := tx.EdgeGroup().Read(edgeGroupID) + if err != nil { + log.Warn(). + Err(err). + Int("edgeGroupID", int(edgeGroupID)). + Msg("Unable to retrieve edge group") + + return false, "", err + } + + return true, edgeGroup.Name, nil + } + + return false, "", nil +} diff --git a/api/portainer.go b/api/portainer.go index a087887fe..20aee9a8f 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -1568,10 +1568,6 @@ type ( TunnelAddr(endpoint *Endpoint) (string, error) UpdateLastActivity(endpointID EndpointID) KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration) - EdgeJobs(endpointId EndpointID) []EdgeJob - AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob) - RemoveEdgeJob(edgeJobID EdgeJobID) - RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID) } // Server defines the interface to serve the API