fix(edgejobs): decouple the Edge Jobs from the reverse tunnel service BE-10866 (#11)

pull/12346/head
andres-portainer 2024-10-14 10:37:13 -03:00 committed by GitHub
parent 7a35b5b0e4
commit 61c5269353
14 changed files with 179 additions and 243 deletions

View File

@ -1,82 +0,0 @@
package chisel
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge/cache"
)
// EdgeJobs retrieves the edge jobs for the given environment
func (service *Service) EdgeJobs(endpointID portainer.EndpointID) []portainer.EdgeJob {
service.mu.RLock()
defer service.mu.RUnlock()
return append(
make([]portainer.EdgeJob, 0, len(service.edgeJobs[endpointID])),
service.edgeJobs[endpointID]...,
)
}
// AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint).
func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) {
if endpoint.Edge.AsyncMode {
return
}
service.mu.Lock()
defer service.mu.Unlock()
existingJobIndex := -1
for idx, existingJob := range service.edgeJobs[endpoint.ID] {
if existingJob.ID == edgeJob.ID {
existingJobIndex = idx
break
}
}
if existingJobIndex == -1 {
service.edgeJobs[endpoint.ID] = append(service.edgeJobs[endpoint.ID], *edgeJob)
} else {
service.edgeJobs[endpoint.ID][existingJobIndex] = *edgeJob
}
cache.Del(endpoint.ID)
}
// RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with.
func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
service.mu.Lock()
for endpointID := range service.edgeJobs {
n := 0
for _, edgeJob := range service.edgeJobs[endpointID] {
if edgeJob.ID != edgeJobID {
service.edgeJobs[endpointID][n] = edgeJob
n++
}
}
service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n]
cache.Del(endpointID)
}
service.mu.Unlock()
}
func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) {
service.mu.Lock()
defer service.mu.Unlock()
n := 0
for _, edgeJob := range service.edgeJobs[endpointID] {
if edgeJob.ID != edgeJobID {
service.edgeJobs[endpointID][n] = edgeJob
n++
}
}
service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n]
cache.Del(endpointID)
}

View File

@ -31,7 +31,6 @@ import (
"github.com/portainer/portainer/api/http/proxy" "github.com/portainer/portainer/api/http/proxy"
kubeproxy "github.com/portainer/portainer/api/http/proxy/factory/kubernetes" kubeproxy "github.com/portainer/portainer/api/http/proxy/factory/kubernetes"
"github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/edgestacks" "github.com/portainer/portainer/api/internal/edge/edgestacks"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/snapshot" "github.com/portainer/portainer/api/internal/snapshot"
@ -467,10 +466,6 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
log.Fatal().Err(err).Msg("failed initializing helm package manager") log.Fatal().Err(err).Msg("failed initializing helm package manager")
} }
if err := edge.LoadEdgeJobs(dataStore, reverseTunnelService); err != nil {
log.Fatal().Err(err).Msg("failed loading edge jobs from database")
}
applicationStatus := initStatus(instanceID) applicationStatus := initStatus(instanceID)
// channel to control when the admin user is created // channel to control when the admin user is created

View File

@ -8,6 +8,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/slicesx" "github.com/portainer/portainer/api/slicesx"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
@ -55,8 +56,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
} }
var payload edgeGroupUpdatePayload var payload edgeGroupUpdatePayload
err = request.DecodeAndValidateJSONPayload(r, &payload) if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil {
if err != nil {
return httperror.BadRequest("Invalid request payload", err) return httperror.BadRequest("Invalid request payload", err)
} }
@ -105,8 +105,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
edgeGroup.PartialMatch = *payload.PartialMatch edgeGroup.PartialMatch = *payload.PartialMatch
} }
err = tx.EdgeGroup().Update(edgeGroup.ID, edgeGroup) if err := tx.EdgeGroup().Update(edgeGroup.ID, edgeGroup); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to persist Edge group changes inside the database", err) return httperror.InternalServerError("Unable to persist Edge group changes inside the database", err)
} }
@ -136,8 +135,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
return httperror.InternalServerError("Unable to get Environment from database", err) return httperror.InternalServerError("Unable to get Environment from database", err)
} }
err = handler.updateEndpointStacks(tx, endpoint, edgeGroups, edgeStacks) if err := handler.updateEndpointStacks(tx, endpoint, edgeGroups, edgeStacks); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to persist Environment relation changes inside the database", err) return httperror.InternalServerError("Unable to persist Environment relation changes inside the database", err)
} }
@ -156,8 +154,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
continue continue
} }
err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpoint, edgeJobs, operation) if err := handler.updateEndpointEdgeJobs(edgeGroup.ID, endpoint, edgeJobs, operation); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err) return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err)
} }
} }
@ -198,10 +195,8 @@ func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID
} }
switch operation { switch operation {
case "add": case "add", "remove":
handler.ReverseTunnelService.AddEdgeJob(endpoint, &edgeJob) cache.Del(endpoint.ID)
case "remove":
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpoint.ID, edgeJob.ID)
} }
} }

View File

@ -11,6 +11,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
@ -113,11 +114,14 @@ func (handler *Handler) createEdgeJob(tx dataservices.DataStoreTx, payload *edge
} }
} }
err = handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints) if err := handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints); err != nil {
if err != nil {
return nil, httperror.InternalServerError("Unable to schedule Edge job", err) return nil, httperror.InternalServerError("Unable to schedule Edge job", err)
} }
for _, endpointID := range endpoints {
cache.Del(endpointID)
}
return edgeJob, nil return edgeJob, nil
} }
@ -144,15 +148,13 @@ func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error {
payload.CronExpression = cronExpression payload.CronExpression = cronExpression
var endpoints []portainer.EndpointID var endpoints []portainer.EndpointID
err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true) if err := request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true); err != nil {
if err != nil {
return errors.New("invalid environments") return errors.New("invalid environments")
} }
payload.Endpoints = endpoints payload.Endpoints = endpoints
var edgeGroups []portainer.EdgeGroupID var edgeGroups []portainer.EdgeGroupID
err = request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true) if err := request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true); err != nil {
if err != nil {
return errors.New("invalid edge groups") return errors.New("invalid edge groups")
} }
payload.EdgeGroups = edgeGroups payload.EdgeGroups = edgeGroups
@ -267,15 +269,6 @@ func (handler *Handler) addAndPersistEdgeJob(tx dataservices.DataStoreTx, edgeJo
return errors.New("environments or edge groups are mandatory for an Edge job") return errors.New("environments or edge groups are mandatory for an Edge job")
} }
for endpointID := range endpointsMap {
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
}
return tx.EdgeJob().CreateWithID(edgeJob.ID, edgeJob) return tx.EdgeJob().CreateWithID(edgeJob.ID, edgeJob)
} }

View File

@ -9,9 +9,11 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -33,10 +35,9 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h
return httperror.BadRequest("Invalid Edge job identifier route variable", err) return httperror.BadRequest("Invalid Edge job identifier route variable", err)
} }
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
return handler.deleteEdgeJob(tx, portainer.EdgeJobID(edgeJobID)) return handler.deleteEdgeJob(tx, portainer.EdgeJobID(edgeJobID))
}) }); err != nil {
if err != nil {
var handlerError *httperror.HandlerError var handlerError *httperror.HandlerError
if errors.As(err, &handlerError) { if errors.As(err, &handlerError) {
return handlerError return handlerError
@ -57,13 +58,10 @@ func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID por
} }
edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(int(edgeJobID))) edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(int(edgeJobID)))
err = handler.FileService.RemoveDirectory(edgeJobFolder) if err := handler.FileService.RemoveDirectory(edgeJobFolder); err != nil {
if err != nil {
log.Warn().Err(err).Msg("Unable to remove the files associated to the Edge job on the filesystem") log.Warn().Err(err).Msg("Unable to remove the files associated to the Edge job on the filesystem")
} }
handler.ReverseTunnelService.RemoveEdgeJob(edgeJob.ID)
var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta
if len(edgeJob.EdgeGroups) > 0 { if len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
@ -78,11 +76,10 @@ func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID por
} }
for endpointID := range endpointsMap { for endpointID := range endpointsMap {
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) cache.Del(endpointID)
} }
err = tx.EdgeJob().Delete(edgeJob.ID) if err := tx.EdgeJob().Delete(edgeJob.ID); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to remove the Edge job from the database", err) return httperror.InternalServerError("Unable to remove the Edge job from the database", err)
} }

View File

@ -9,6 +9,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -53,7 +54,7 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request
} }
} }
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error { updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error {
mutationFn(edgeJob, endpointID, endpointsFromGroups) mutationFn(edgeJob, endpointID, endpointsFromGroups)
@ -61,8 +62,7 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request
} }
return handler.clearEdgeJobTaskLogs(tx, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn) return handler.clearEdgeJobTaskLogs(tx, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn)
}) }); err != nil {
if err != nil {
var handlerError *httperror.HandlerError var handlerError *httperror.HandlerError
if errors.As(err, &handlerError) { if errors.As(err, &handlerError) {
return handlerError return handlerError
@ -82,8 +82,7 @@ func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJo
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
} }
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))) if err := handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to clear log file from disk", err) return httperror.InternalServerError("Unable to clear log file from disk", err)
} }
@ -92,17 +91,11 @@ func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJo
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
} }
err = updateEdgeJob(edgeJob, endpointID, endpointsFromGroups) if err := updateEdgeJob(edgeJob, endpointID, endpointsFromGroups); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to persist Edge job changes in the database", err) return httperror.InternalServerError("Unable to persist Edge job changes in the database", err)
} }
endpoint, err := tx.Endpoint().Endpoint(endpointID) cache.Del(endpointID)
if err != nil {
return httperror.NotFound("Unable to retrieve environment from the database", err)
}
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
return nil return nil
} }

View File

@ -8,6 +8,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -38,7 +39,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
return httperror.BadRequest("Invalid Task identifier route variable", err) return httperror.BadRequest("Invalid Task identifier route variable", err)
} }
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
edgeJob, err := tx.EdgeJob().Read(portainer.EdgeJobID(edgeJobID)) edgeJob, err := tx.EdgeJob().Read(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) { if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
@ -64,8 +65,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
edgeJob.Endpoints[endpointID] = meta edgeJob.Endpoints[endpointID] = meta
} }
err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to persist Edge job changes in the database", err) return httperror.InternalServerError("Unable to persist Edge job changes in the database", err)
} }
@ -74,16 +74,14 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
return httperror.InternalServerError("Unable to retrieve environment from the database", err) return httperror.InternalServerError("Unable to retrieve environment from the database", err)
} }
cache.Del(endpointID)
if endpoint.Edge.AsyncMode { if endpoint.Edge.AsyncMode {
return httperror.BadRequest("Async Edge Endpoints are not supported in Portainer CE", nil) return httperror.BadRequest("Async Edge Endpoints are not supported in Portainer CE", nil)
} }
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
return nil return nil
}) }); err != nil {
if err != nil {
var handlerError *httperror.HandlerError var handlerError *httperror.HandlerError
if errors.As(err, &handlerError) { if errors.As(err, &handlerError) {
return handlerError return handlerError

View File

@ -10,6 +10,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/edge/cache"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
@ -56,8 +57,7 @@ func (handler *Handler) edgeJobUpdate(w http.ResponseWriter, r *http.Request) *h
} }
var payload edgeJobUpdatePayload var payload edgeJobUpdatePayload
err = request.DecodeAndValidateJSONPayload(r, &payload) if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil {
if err != nil {
return httperror.BadRequest("Invalid request payload", err) return httperror.BadRequest("Invalid request payload", err)
} }
@ -78,13 +78,11 @@ func (handler *Handler) updateEdgeJob(tx dataservices.DataStoreTx, edgeJobID por
return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
} }
err = handler.updateEdgeSchedule(tx, edgeJob, &payload) if err := handler.updateEdgeSchedule(tx, edgeJob, &payload); err != nil {
if err != nil {
return nil, httperror.InternalServerError("Unable to update Edge job", err) return nil, httperror.InternalServerError("Unable to update Edge job", err)
} }
err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil {
if err != nil {
return nil, httperror.InternalServerError("Unable to persist Edge job changes inside the database", err) return nil, httperror.InternalServerError("Unable to persist Edge job changes inside the database", err)
} }
@ -149,8 +147,7 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob
if len(payload.EdgeGroups) > 0 { if len(payload.EdgeGroups) > 0 {
for _, edgeGroupID := range payload.EdgeGroups { for _, edgeGroupID := range payload.EdgeGroups {
_, err := tx.EdgeGroup().Read(edgeGroupID) if _, err := tx.EdgeGroup().Read(edgeGroupID); err != nil {
if err != nil {
return err return err
} }
@ -203,8 +200,7 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob
if payload.FileContent != nil && *payload.FileContent != string(fileContent) { if payload.FileContent != nil && *payload.FileContent != string(fileContent) {
fileContent = []byte(*payload.FileContent) fileContent = []byte(*payload.FileContent)
_, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), fileContent) if _, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), fileContent); err != nil {
if err != nil {
return err return err
} }
@ -223,16 +219,11 @@ func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob
maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints) maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints)
for endpointID := range endpointsFromGroupsToAddMap { for endpointID := range endpointsFromGroupsToAddMap {
endpoint, err := tx.Endpoint().Endpoint(endpointID) cache.Del(endpointID)
if err != nil {
return err
}
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
} }
for endpointID := range endpointsToRemove { for endpointID := range endpointsToRemove {
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) cache.Del(endpointID)
} }
return nil return nil

View File

@ -9,6 +9,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/middlewares" "github.com/portainer/portainer/api/http/middlewares"
"github.com/portainer/portainer/api/internal/edge/cache"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -84,8 +85,7 @@ func (handler *Handler) getEdgeJobLobs(tx dataservices.DataStoreTx, endpointID p
return httperror.InternalServerError("Unable to find an edge job with the specified identifier inside the database", err) return httperror.InternalServerError("Unable to find an edge job with the specified identifier inside the database", err)
} }
err = handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID)), []byte(payload.FileContent)) if err := handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpoint.ID)), []byte(payload.FileContent)); err != nil {
if err != nil {
return httperror.InternalServerError("Unable to save task log to the filesystem", err) return httperror.InternalServerError("Unable to save task log to the filesystem", err)
} }
@ -96,13 +96,11 @@ func (handler *Handler) getEdgeJobLobs(tx dataservices.DataStoreTx, endpointID p
edgeJob.Endpoints[endpoint.ID] = meta edgeJob.Endpoints[endpoint.ID] = meta
} }
err = tx.EdgeJob().Update(edgeJob.ID, edgeJob) if err := tx.EdgeJob().Update(edgeJob.ID, edgeJob); err != nil {
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
if err != nil {
return httperror.InternalServerError("Unable to persist edge job changes to the database", err) return httperror.InternalServerError("Unable to persist edge job changes to the database", err)
} }
cache.Del(endpointID)
return nil return nil
} }

View File

@ -170,7 +170,7 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque
Credentials: tunnel.Credentials, Credentials: tunnel.Credentials,
} }
schedules, handlerErr := handler.buildSchedules(endpoint.ID) schedules, handlerErr := handler.buildSchedules(tx, endpoint.ID)
if handlerErr != nil { if handlerErr != nil {
return nil, handlerErr return nil, handlerErr
} }
@ -208,9 +208,33 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) {
} }
} }
func (handler *Handler) buildSchedules(endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) { func (handler *Handler) buildSchedules(tx dataservices.DataStoreTx, endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) {
schedules := []edgeJobResponse{} schedules := []edgeJobResponse{}
for _, job := range handler.ReverseTunnelService.EdgeJobs(endpointID) {
edgeJobs, err := tx.EdgeJob().ReadAll()
if err != nil {
return nil, httperror.InternalServerError("Unable to retrieve Edge Jobs", err)
}
for _, job := range edgeJobs {
_, endpointHasJob := job.Endpoints[endpointID]
if !endpointHasJob {
for _, edgeGroupID := range job.EdgeGroups {
member, _, err := edge.EndpointInEdgeGroup(tx, endpointID, edgeGroupID)
if err != nil {
return nil, httperror.InternalServerError("Unable to retrieve relations", err)
} else if member {
endpointHasJob = true
break
}
}
}
if !endpointHasJob {
continue
}
var collectLogs bool var collectLogs bool
if _, ok := job.GroupLogsCollection[endpointID]; ok { if _, ok := job.GroupLogsCollection[endpointID]; ok {
collectLogs = job.GroupLogsCollection[endpointID].CollectLogs collectLogs = job.GroupLogsCollection[endpointID].CollectLogs

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strconv"
"testing" "testing"
"time" "time"
@ -36,7 +37,7 @@ var endpointTestCases = []endpointTestCase{
{ {
portainer.Endpoint{ portainer.Endpoint{
ID: -1, ID: -1,
Name: "endpoint-id--1", Name: "endpoint-id-1",
Type: portainer.EdgeAgentOnDockerEnvironment, Type: portainer.EdgeAgentOnDockerEnvironment,
URL: "https://portainer.io:9443", URL: "https://portainer.io:9443",
EdgeID: "edge-id", EdgeID: "edge-id",
@ -342,28 +343,48 @@ func TestEdgeStackStatus(t *testing.T) {
func TestEdgeJobsResponse(t *testing.T) { func TestEdgeJobsResponse(t *testing.T) {
handler := mustSetupHandler(t) handler := mustSetupHandler(t)
endpointID := portainer.EndpointID(77) localCreateEndpoint := func(endpointID portainer.EndpointID, tagIDs []portainer.TagID) *portainer.Endpoint {
endpoint := portainer.Endpoint{ endpoint := portainer.Endpoint{
ID: endpointID, ID: endpointID,
Name: "test-endpoint-77", Name: "test-endpoint-" + strconv.Itoa(int(endpointID)),
Type: portainer.EdgeAgentOnDockerEnvironment, Type: portainer.EdgeAgentOnDockerEnvironment,
URL: "https://portainer.io:9443", URL: "https://portainer.io:9443",
EdgeID: "edge-id", EdgeID: "edge-id-" + strconv.Itoa(int(endpointID)),
LastCheckInDate: time.Now().Unix(), TagIDs: tagIDs,
LastCheckInDate: time.Now().Unix(),
UserTrusted: true,
}
err := createEndpoint(handler, endpoint,
portainer.EndpointRelation{EndpointID: endpointID})
require.NoError(t, err)
return &endpoint
} }
endpointRelation := portainer.EndpointRelation{ dynamicGroupTags := []portainer.TagID{1, 2, 3}
EndpointID: endpoint.ID,
}
if err := createEndpoint(handler, endpoint, endpointRelation); err != nil { endpoint := localCreateEndpoint(77, nil)
t.Fatal(err) endpointFromStaticEdgeGroup := localCreateEndpoint(78, nil)
endpointFromDynamicEdgeGroup := localCreateEndpoint(79, dynamicGroupTags)
unrelatedEndpoint := localCreateEndpoint(80, nil)
staticEdgeGroup := portainer.EdgeGroup{
ID: 1,
Endpoints: []portainer.EndpointID{endpointFromStaticEdgeGroup.ID},
} }
err := handler.DataStore.EdgeGroup().Create(&staticEdgeGroup)
require.NoError(t, err)
dynamicEdgeGroup := portainer.EdgeGroup{
ID: 2,
Dynamic: true,
TagIDs: dynamicGroupTags,
}
err = handler.DataStore.EdgeGroup().Create(&dynamicEdgeGroup)
require.NoError(t, err)
path, err := handler.FileService.StoreEdgeJobFileFromBytes("test-script", []byte("pwd")) path, err := handler.FileService.StoreEdgeJobFileFromBytes("test-script", []byte("pwd"))
if err != nil { require.NoError(t, err)
t.Fatal(err)
}
edgeJobID := portainer.EdgeJobID(35) edgeJobID := portainer.EdgeJobID(35)
edgeJob := portainer.EdgeJob{ edgeJob := portainer.EdgeJob{
@ -374,32 +395,42 @@ func TestEdgeJobsResponse(t *testing.T) {
ScriptPath: path, ScriptPath: path,
Recurring: true, Recurring: true,
Version: 57, Version: 57,
Endpoints: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{
endpoint.ID: {},
},
EdgeGroups: []portainer.EdgeGroupID{staticEdgeGroup.ID, dynamicEdgeGroup.ID},
} }
handler.ReverseTunnelService.AddEdgeJob(&endpoint, &edgeJob) err = handler.DataStore.EdgeJob().Create(&edgeJob)
require.NoError(t, err)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/api/endpoints/%d/edge/status", endpoint.ID), nil) f := func(endpoint *portainer.Endpoint, scheduleLen int) {
if err != nil { req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/api/endpoints/%d/edge/status", endpoint.ID), nil)
t.Fatal("request error:", err) require.NoError(t, err)
req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID)
req.Header.Set(portainer.HTTPResponseAgentPlatform, "1")
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
require.Equal(t, http.StatusOK, rec.Code)
var data endpointEdgeStatusInspectResponse
err = json.NewDecoder(rec.Body).Decode(&data)
require.NoError(t, err)
require.Len(t, data.Schedules, scheduleLen)
if scheduleLen > 0 {
require.Equal(t, edgeJob.ID, data.Schedules[0].ID)
require.Equal(t, edgeJob.CronExpression, data.Schedules[0].CronExpression)
require.Equal(t, edgeJob.Version, data.Schedules[0].Version)
}
} }
req.Header.Set(portainer.PortainerAgentEdgeIDHeader, "edge-id") f(endpoint, 1)
req.Header.Set(portainer.HTTPResponseAgentPlatform, "1") f(endpointFromStaticEdgeGroup, 1)
f(endpointFromDynamicEdgeGroup, 1)
rec := httptest.NewRecorder() f(unrelatedEndpoint, 0)
handler.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected a %d response, found: %d", http.StatusOK, rec.Code)
}
var data endpointEdgeStatusInspectResponse
if err := json.NewDecoder(rec.Body).Decode(&data); err != nil {
t.Fatal("error decoding response:", err)
}
assert.Len(t, data.Schedules, 1)
assert.Equal(t, edgeJob.ID, data.Schedules[0].ID)
assert.Equal(t, edgeJob.CronExpression, data.Schedules[0].CronExpression)
assert.Equal(t, edgeJob.Version, data.Schedules[0].Version)
} }

View File

@ -1,27 +0,0 @@
package edge
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
)
// LoadEdgeJobs registers all edge jobs inside corresponding environment(endpoint) tunnel
func LoadEdgeJobs(dataStore dataservices.DataStore, reverseTunnelService portainer.ReverseTunnelService) error {
edgeJobs, err := dataStore.EdgeJob().ReadAll()
if err != nil {
return err
}
for _, edgeJob := range edgeJobs {
for endpointID := range edgeJob.Endpoints {
endpoint, err := dataStore.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}
reverseTunnelService.AddEdgeJob(endpoint, &edgeJob)
}
}
return nil
}

View File

@ -1,8 +1,12 @@
package edge package edge
import ( import (
"slices"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/rs/zerolog/log"
) )
// EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint) // EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint)
@ -39,3 +43,33 @@ func EffectiveCheckinInterval(tx dataservices.DataStoreTx, endpoint *portainer.E
return portainer.DefaultEdgeAgentCheckinIntervalInSeconds return portainer.DefaultEdgeAgentCheckinIntervalInSeconds
} }
// EndpointInEdgeGroup returns true and the edge group name if the endpoint is in the edge group
func EndpointInEdgeGroup(
tx dataservices.DataStoreTx,
endpointID portainer.EndpointID,
edgeGroupID portainer.EdgeGroupID,
) (bool, string, error) {
endpointIDs, err := GetEndpointsFromEdgeGroups(
[]portainer.EdgeGroupID{edgeGroupID}, tx,
)
if err != nil {
return false, "", err
}
if slices.Contains(endpointIDs, endpointID) {
edgeGroup, err := tx.EdgeGroup().Read(edgeGroupID)
if err != nil {
log.Warn().
Err(err).
Int("edgeGroupID", int(edgeGroupID)).
Msg("Unable to retrieve edge group")
return false, "", err
}
return true, edgeGroup.Name, nil
}
return false, "", nil
}

View File

@ -1568,10 +1568,6 @@ type (
TunnelAddr(endpoint *Endpoint) (string, error) TunnelAddr(endpoint *Endpoint) (string, error)
UpdateLastActivity(endpointID EndpointID) UpdateLastActivity(endpointID EndpointID)
KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration) KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration)
EdgeJobs(endpointId EndpointID) []EdgeJob
AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob)
RemoveEdgeJob(edgeJobID EdgeJobID)
RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID)
} }
// Server defines the interface to serve the API // Server defines the interface to serve the API