mirror of https://github.com/portainer/portainer
fix(edgejobs): fix data race in edge jobs tasks collect EE-4766 (#8542)
parent
2c247efd0f
commit
f9a09301a8
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/portainer/libhttp/request"
|
"github.com/portainer/libhttp/request"
|
||||||
"github.com/portainer/libhttp/response"
|
"github.com/portainer/libhttp/response"
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
"github.com/portainer/portainer/api/dataservices"
|
||||||
"github.com/portainer/portainer/api/internal/edge"
|
"github.com/portainer/portainer/api/internal/edge"
|
||||||
"github.com/portainer/portainer/api/internal/slices"
|
"github.com/portainer/portainer/api/internal/slices"
|
||||||
)
|
)
|
||||||
|
@ -36,46 +37,58 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
|
||||||
return httperror.BadRequest("Invalid Task identifier route variable", err)
|
return httperror.BadRequest("Invalid Task identifier route variable", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
|
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
||||||
if handler.DataStore.IsErrObjectNotFound(err) {
|
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
|
||||||
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
|
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||||
} else if err != nil {
|
return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err)
|
||||||
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
|
} else if err != nil {
|
||||||
}
|
return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err)
|
||||||
|
|
||||||
endpointID := portainer.EndpointID(taskID)
|
|
||||||
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
|
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if slices.Contains(endpointsFromGroups, endpointID) {
|
|
||||||
edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
|
|
||||||
CollectLogs: true,
|
|
||||||
LogsStatus: portainer.EdgeJobLogsStatusPending,
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
meta := edgeJob.Endpoints[endpointID]
|
|
||||||
meta.CollectLogs = true
|
|
||||||
meta.LogsStatus = portainer.EdgeJobLogsStatusPending
|
|
||||||
edgeJob.Endpoints[endpointID] = meta
|
|
||||||
}
|
|
||||||
|
|
||||||
err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
|
endpointID := portainer.EndpointID(taskID)
|
||||||
|
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx)
|
||||||
|
if err != nil {
|
||||||
|
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if slices.Contains(endpointsFromGroups, endpointID) {
|
||||||
|
edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
|
||||||
|
CollectLogs: true,
|
||||||
|
LogsStatus: portainer.EdgeJobLogsStatusPending,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
meta := edgeJob.Endpoints[endpointID]
|
||||||
|
meta.CollectLogs = true
|
||||||
|
meta.LogsStatus = portainer.EdgeJobLogsStatusPending
|
||||||
|
edgeJob.Endpoints[endpointID] = meta
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
|
||||||
|
if err != nil {
|
||||||
|
return httperror.InternalServerError("Unable to persist Edge job changes in the database", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint, err := tx.Endpoint().Endpoint(endpointID)
|
||||||
|
if err != nil {
|
||||||
|
return httperror.InternalServerError("Unable to retrieve environment from the database", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if endpoint.Edge.AsyncMode {
|
||||||
|
return httperror.BadRequest("Async Edge Endpoints are not supported in Portainer CE", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httperror.InternalServerError("Unable to persist Edge job changes in the database", err)
|
if httpErr, ok := err.(*httperror.HandlerError); ok {
|
||||||
}
|
return httpErr
|
||||||
|
}
|
||||||
|
|
||||||
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
|
return httperror.InternalServerError("Unexpected error", err)
|
||||||
if err != nil {
|
|
||||||
return httperror.InternalServerError("Unable to retrieve environment from the database", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if endpoint.Edge.AsyncMode {
|
|
||||||
return httperror.BadRequest("Async Edge Endpoints are not supported in Portainer CE", nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
|
|
||||||
|
|
||||||
return response.Empty(w)
|
return response.Empty(w)
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ func EdgeGroupSet(edgeGroupIDs []portainer.EdgeGroupID) map[portainer.EdgeGroupI
|
||||||
return set
|
return set
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetEndpointsFromEdgeGroups(edgeGroupIDs []portainer.EdgeGroupID, datastore dataservices.DataStore) ([]portainer.EndpointID, error) {
|
func GetEndpointsFromEdgeGroups(edgeGroupIDs []portainer.EdgeGroupID, datastore dataservices.DataStoreTx) ([]portainer.EndpointID, error) {
|
||||||
endpoints, err := datastore.Endpoint().Endpoints()
|
endpoints, err := datastore.Endpoint().Endpoints()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue