fix(edgejobs): migrate to transactional code EE-5324 (#8747)

pull/8755/head
andres-portainer 2023-04-10 15:59:34 -03:00 committed by GitHub
parent a65ffe519a
commit 62128d1069
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 229 additions and 133 deletions

View File

@ -7,16 +7,26 @@ import (
"strings"
"time"
"github.com/asaskevich/govalidator"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/maps"
"github.com/portainer/portainer/pkg/featureflags"
"github.com/asaskevich/govalidator"
)
type edgeJobBasePayload struct {
Name string
CronExpression string
Recurring bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
}
// @id EdgeJobCreate
// @summary Create an EdgeJob
// @description **Access policy**: administrator
@ -48,12 +58,8 @@ func (handler *Handler) edgeJobCreate(w http.ResponseWriter, r *http.Request) *h
}
type edgeJobCreateFromFileContentPayload struct {
Name string
CronExpression string
Recurring bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
FileContent string
edgeJobBasePayload
FileContent string
}
func (payload *edgeJobCreateFromFileContentPayload) Validate(r *http.Request) error {
@ -87,32 +93,44 @@ func (handler *Handler) createEdgeJobFromFileContent(w http.ResponseWriter, r *h
return httperror.BadRequest("Invalid request payload", err)
}
edgeJob := handler.createEdgeJobObjectFromFileContentPayload(&payload)
var edgeJob *portainer.EdgeJob
if featureflags.IsEnabled(portainer.FeatureNoTx) {
edgeJob, err = handler.createEdgeJob(handler.DataStore, &payload.edgeJobBasePayload, []byte(payload.FileContent))
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
edgeJob, err = handler.createEdgeJob(tx, &payload.edgeJobBasePayload, []byte(payload.FileContent))
return err
})
}
return txResponse(w, edgeJob, err)
}
func (handler *Handler) createEdgeJob(tx dataservices.DataStoreTx, payload *edgeJobBasePayload, fileContent []byte) (*portainer.EdgeJob, error) {
var err error
edgeJob := handler.createEdgeJobObjectFromPayload(tx, payload)
var endpoints []portainer.EndpointID
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore)
endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, tx)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
return nil, httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
}
err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent), endpoints)
err = handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints)
if err != nil {
return httperror.InternalServerError("Unable to schedule Edge job", err)
return nil, httperror.InternalServerError("Unable to schedule Edge job", err)
}
return response.JSON(w, edgeJob)
return edgeJob, nil
}
type edgeJobCreateFromFilePayload struct {
Name string
CronExpression string
Recurring bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
File []byte
edgeJobBasePayload
File []byte
}
func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error {
@ -166,66 +184,35 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req
return httperror.BadRequest("Invalid request payload", err)
}
edgeJob := handler.createEdgeJobObjectFromFilePayload(payload)
var edgeJob *portainer.EdgeJob
if featureflags.IsEnabled(portainer.FeatureNoTx) {
edgeJob, err = handler.createEdgeJob(handler.DataStore, &payload.edgeJobBasePayload, payload.File)
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
edgeJob, err = handler.createEdgeJob(tx, &payload.edgeJobBasePayload, payload.File)
var endpoints []portainer.EndpointID
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
return err
})
}
err = handler.addAndPersistEdgeJob(edgeJob, payload.File, endpoints)
if err != nil {
return httperror.InternalServerError("Unable to schedule Edge job", err)
}
return response.JSON(w, edgeJob)
return txResponse(w, edgeJob, err)
}
func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreateFromFilePayload) *portainer.EdgeJob {
edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier())
endpoints := convertEndpointsToMetaObject(payload.Endpoints)
edgeJob := &portainer.EdgeJob{
ID: edgeJobIdentifier,
func (handler *Handler) createEdgeJobObjectFromPayload(tx dataservices.DataStoreTx, payload *edgeJobBasePayload) *portainer.EdgeJob {
return &portainer.EdgeJob{
ID: portainer.EdgeJobID(tx.EdgeJob().GetNextIdentifier()),
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
Endpoints: convertEndpointsToMetaObject(payload.Endpoints),
EdgeGroups: payload.EdgeGroups,
Version: 1,
GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{},
}
return edgeJob
}
func (handler *Handler) createEdgeJobObjectFromFileContentPayload(payload *edgeJobCreateFromFileContentPayload) *portainer.EdgeJob {
edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier())
endpoints := convertEndpointsToMetaObject(payload.Endpoints)
edgeJob := &portainer.EdgeJob{
ID: edgeJobIdentifier,
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
EdgeGroups: payload.EdgeGroups,
Version: 1,
GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{},
}
return edgeJob
}
func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error {
func (handler *Handler) addAndPersistEdgeJob(tx dataservices.DataStoreTx, edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error {
edgeCronExpression := strings.Split(edgeJob.CronExpression, " ")
if len(edgeCronExpression) == 6 {
edgeCronExpression = edgeCronExpression[1:]
@ -233,7 +220,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []
edgeJob.CronExpression = strings.Join(edgeCronExpression, " ")
for ID := range edgeJob.Endpoints {
endpoint, err := handler.DataStore.Endpoint().Endpoint(ID)
endpoint, err := tx.Endpoint().Endpoint(ID)
if err != nil {
return err
}
@ -254,7 +241,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []
endpointsMap = convertEndpointsToMetaObject(endpointsFromGroups)
for ID := range endpointsMap {
endpoint, err := handler.DataStore.Endpoint().Endpoint(ID)
endpoint, err := tx.Endpoint().Endpoint(ID)
if err != nil {
return err
}
@ -274,7 +261,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []
}
for endpointID := range endpointsMap {
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}
@ -282,5 +269,5 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
}
return handler.DataStore.EdgeJob().Create(edgeJob.ID, edgeJob)
return tx.EdgeJob().Create(edgeJob.ID, edgeJob)
}

View File

@ -8,8 +8,10 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/maps"
"github.com/portainer/portainer/pkg/featureflags"
"github.com/rs/zerolog/log"
)
@ -31,14 +33,34 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h
return httperror.BadRequest("Invalid Edge job identifier route variable", err)
}
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if handler.DataStore.IsErrObjectNotFound(err) {
if featureflags.IsEnabled(portainer.FeatureNoTx) {
err = handler.deleteEdgeJob(handler.DataStore, portainer.EdgeJobID(edgeJobID))
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
return handler.deleteEdgeJob(tx, portainer.EdgeJobID(edgeJobID))
})
}
if err != nil {
if httpErr, ok := err.(*httperror.HandlerError); ok {
return httpErr
}
return httperror.InternalServerError("Unexpected error", err)
}
return response.Empty(w)
}
func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID) error {
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
}
edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(edgeJobID))
edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(int(edgeJobID)))
err = handler.FileService.RemoveDirectory(edgeJobFolder)
if err != nil {
log.Warn().Err(err).Msg("Unable to remove the files associated to the Edge job on the filesystem")
@ -48,7 +70,7 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h
var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
@ -63,10 +85,10 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID)
}
err = handler.DataStore.EdgeJob().DeleteEdgeJob(edgeJob.ID)
err = tx.EdgeJob().DeleteEdgeJob(edgeJob.ID)
if err != nil {
return httperror.InternalServerError("Unable to remove the Edge job from the database", err)
}
return response.Empty(w)
return nil
}

View File

@ -8,8 +8,10 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/slices"
"github.com/portainer/portainer/pkg/featureflags"
)
// @id EdgeJobTasksClear
@ -37,53 +39,86 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request
return httperror.BadRequest("Invalid Task identifier route variable", err)
}
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if handler.DataStore.IsErrObjectNotFound(err) {
mutationFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) {
if slices.Contains(endpointsFromGroups, endpointID) {
edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
CollectLogs: false,
LogsStatus: portainer.EdgeJobLogsStatusIdle,
}
} else {
meta := edgeJob.Endpoints[endpointID]
meta.CollectLogs = false
meta.LogsStatus = portainer.EdgeJobLogsStatusIdle
edgeJob.Endpoints[endpointID] = meta
}
}
if featureflags.IsEnabled(portainer.FeatureNoTx) {
updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error {
return handler.DataStore.EdgeJob().UpdateEdgeJobFunc(edgeJob.ID, func(j *portainer.EdgeJob) {
mutationFn(j, endpointID, endpointsFromGroups)
})
}
err = handler.clearEdgeJobTaskLogs(handler.DataStore, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn)
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error {
mutationFn(edgeJob, endpointID, endpointsFromGroups)
return tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
}
return handler.clearEdgeJobTaskLogs(tx, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn)
})
}
if err != nil {
if httpErr, ok := err.(*httperror.HandlerError); ok {
return httpErr
}
return httperror.InternalServerError("Unexpected error", err)
}
return response.Empty(w)
}
func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID, endpointID portainer.EndpointID, updateEdgeJob func(*portainer.EdgeJob, portainer.EndpointID, []portainer.EndpointID) error) error {
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
}
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID))
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID)))
if err != nil {
return httperror.InternalServerError("Unable to clear log file from disk", err)
}
endpointID := portainer.EndpointID(taskID)
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
err = handler.DataStore.EdgeJob().UpdateEdgeJobFunc(edgeJob.ID, func(j *portainer.EdgeJob) {
if slices.Contains(endpointsFromGroups, endpointID) {
j.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
CollectLogs: false,
LogsStatus: portainer.EdgeJobLogsStatusIdle,
}
} else {
meta := j.Endpoints[endpointID]
meta.CollectLogs = false
meta.LogsStatus = portainer.EdgeJobLogsStatusIdle
j.Endpoints[endpointID] = meta
}
})
err = updateEdgeJob(edgeJob, endpointID, endpointsFromGroups)
if err != nil {
return httperror.InternalServerError("Unable to persist Edge job changes in the database", err)
}
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID))
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID)))
if err != nil {
return httperror.InternalServerError("Unable to clear log file from disk", err)
}
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return httperror.NotFound("Unable to retrieve environment from the database", err)
}
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
return response.Empty(w)
return nil
}

View File

@ -39,7 +39,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if handler.DataStore.IsErrObjectNotFound(err) {
if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)

View File

@ -6,10 +6,11 @@ import (
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/maps"
"github.com/portainer/portainer/pkg/featureflags"
)
type taskContainer struct {
@ -37,20 +38,34 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request)
return httperror.BadRequest("Invalid Edge job identifier route variable", err)
}
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if handler.DataStore.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
var tasks []taskContainer
if featureflags.IsEnabled(portainer.FeatureNoTx) {
tasks, err = listEdgeJobTasks(handler.DataStore, portainer.EdgeJobID(edgeJobID))
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
tasks, err = listEdgeJobTasks(tx, portainer.EdgeJobID(edgeJobID))
return err
})
}
return txResponse(w, tasks, err)
}
func listEdgeJobTasks(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID) ([]taskContainer, error) {
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) {
return nil, httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
}
tasks := make([]taskContainer, 0)
endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
return nil, httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
endpointsMap = convertEndpointsToMetaObject(endpoints)
@ -67,5 +82,5 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request)
})
}
return response.JSON(w, tasks)
return tasks, nil
}

View File

@ -7,12 +7,13 @@ import (
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/maps"
"github.com/portainer/portainer/api/internal/slices"
"github.com/portainer/portainer/pkg/featureflags"
"github.com/asaskevich/govalidator"
)
@ -30,6 +31,7 @@ func (payload *edgeJobUpdatePayload) Validate(r *http.Request) error {
if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) {
return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
}
return nil
}
@ -60,27 +62,41 @@ func (handler *Handler) edgeJobUpdate(w http.ResponseWriter, r *http.Request) *h
return httperror.BadRequest("Invalid request payload", err)
}
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if handler.DataStore.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
var edgeJob *portainer.EdgeJob
if featureflags.IsEnabled(portainer.FeatureNoTx) {
edgeJob, err = handler.updateEdgeJob(handler.DataStore, portainer.EdgeJobID(edgeJobID), payload)
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
edgeJob, err = handler.updateEdgeJob(tx, portainer.EdgeJobID(edgeJobID), payload)
return err
})
}
err = handler.updateEdgeSchedule(edgeJob, &payload)
if err != nil {
return httperror.InternalServerError("Unable to update Edge job", err)
}
err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
if err != nil {
return httperror.InternalServerError("Unable to persist Edge job changes inside the database", err)
}
return response.JSON(w, edgeJob)
return txResponse(w, edgeJob, err)
}
func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *edgeJobUpdatePayload) error {
func (handler *Handler) updateEdgeJob(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID, payload edgeJobUpdatePayload) (*portainer.EdgeJob, error) {
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) {
return nil, httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
} else if err != nil {
return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
}
err = handler.updateEdgeSchedule(tx, edgeJob, &payload)
if err != nil {
return nil, httperror.InternalServerError("Unable to update Edge job", err)
}
err = tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
if err != nil {
return nil, httperror.InternalServerError("Unable to persist Edge job changes inside the database", err)
}
return edgeJob, nil
}
func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob *portainer.EdgeJob, payload *edgeJobUpdatePayload) error {
if payload.Name != nil {
edgeJob.Name = *payload.Name
}
@ -99,7 +115,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
}
for _, endpointID := range payload.Endpoints {
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}
@ -120,7 +136,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
}
if len(payload.EdgeGroups) == 0 && len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
@ -138,7 +154,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
if len(payload.EdgeGroups) > 0 {
for _, edgeGroupID := range payload.EdgeGroups {
_, err := handler.DataStore.EdgeGroup().EdgeGroup(edgeGroupID)
_, err := tx.EdgeGroup().EdgeGroup(edgeGroupID)
if err != nil {
return err
}
@ -148,7 +164,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
}
}
endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, handler.DataStore)
endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, tx)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
@ -165,7 +181,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
}
}
endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, handler.DataStore)
endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, tx)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
@ -212,7 +228,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints)
for endpointID := range endpointsFromGroupsToAddMap {
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}

View File

@ -3,11 +3,13 @@ package edgejobs
import (
"net/http"
"github.com/gorilla/mux"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/security"
"github.com/gorilla/mux"
)
// Handler is the HTTP handler used to handle Edge job operations.
@ -44,6 +46,7 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler {
bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksCollect)))).Methods(http.MethodPost)
h.Handle("/edge_jobs/{id}/tasks/{taskID}/logs",
bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksClear)))).Methods(http.MethodDelete)
return h
}
@ -56,3 +59,15 @@ func convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portaine
return endpointsMap
}
func txResponse(w http.ResponseWriter, r any, err error) *httperror.HandlerError {
if err != nil {
if httpErr, ok := err.(*httperror.HandlerError); ok {
return httpErr
}
return httperror.InternalServerError("Unexpected error", err)
}
return response.JSON(w, r)
}

View File

@ -1565,8 +1565,14 @@ const (
)
// List of supported features
const (
FeatureFdo = "fdo"
FeatureNoTx = "noTx"
)
var SupportedFeatureFlags = []featureflags.Feature{
"fdo",
FeatureFdo,
FeatureNoTx,
}
const (