mirror of https://github.com/portainer/portainer
fix(pending-actions): Small improvements to pending actions (R8S-350) (#949)
parent
2c08becf6c
commit
129b9d5db9
|
@ -2,6 +2,7 @@ package postinit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
|
@ -83,17 +84,27 @@ func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
|
||||||
|
|
||||||
// try to create a post init migration pending action. If it already exists, do nothing
|
// try to create a post init migration pending action. If it already exists, do nothing
|
||||||
// this function exists for readability, not reusability
|
// this function exists for readability, not reusability
|
||||||
// TODO: This should be moved into pending actions as part of the pending action migration
|
|
||||||
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
|
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
|
||||||
// If there are no pending actions for the given endpoint, create one
|
action := portainer.PendingAction{
|
||||||
err := postInitMigrator.dataStore.PendingActions().Create(&portainer.PendingAction{
|
|
||||||
EndpointID: environmentID,
|
EndpointID: environmentID,
|
||||||
Action: actions.PostInitMigrateEnvironment,
|
Action: actions.PostInitMigrateEnvironment,
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environmentID)
|
|
||||||
}
|
}
|
||||||
|
pendingActions, err := postInitMigrator.dataStore.PendingActions().ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve pending actions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dba := range pendingActions {
|
||||||
|
if dba.EndpointID == action.EndpointID && dba.Action == action.Action {
|
||||||
|
log.Debug().
|
||||||
|
Str("action", action.Action).
|
||||||
|
Int("endpoint_id", int(action.EndpointID)).
|
||||||
|
Msg("pending action already exists for environment, skipping...")
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return postInitMigrator.dataStore.PendingActions().Create(&action)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MigrateEnvironment runs migrations on a single environment
|
// MigrateEnvironment runs migrations on a single environment
|
||||||
|
|
|
@ -8,10 +8,12 @@ import (
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
"github.com/portainer/portainer/api/datastore"
|
"github.com/portainer/portainer/api/datastore"
|
||||||
|
"github.com/portainer/portainer/api/pendingactions/actions"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
"github.com/segmentio/encoding/json"
|
"github.com/segmentio/encoding/json"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -73,3 +75,96 @@ func TestMigrateGPUs(t *testing.T) {
|
||||||
require.False(t, migratedEndpoint.PostInitMigrations.MigrateGPUs)
|
require.False(t, migratedEndpoint.PostInitMigrations.MigrateGPUs)
|
||||||
require.True(t, migratedEndpoint.EnableGPUManagement)
|
require.True(t, migratedEndpoint.EnableGPUManagement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPostInitMigrate_PendingActionsCreated(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
existingPendingActions []*portainer.PendingAction
|
||||||
|
expectedPendingActions int
|
||||||
|
expectedAction string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "when existing non-matching action exists, should add migration action",
|
||||||
|
existingPendingActions: []*portainer.PendingAction{
|
||||||
|
{
|
||||||
|
EndpointID: 7,
|
||||||
|
Action: "some-other-action",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPendingActions: 2,
|
||||||
|
expectedAction: actions.PostInitMigrateEnvironment,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "when matching action exists, should not add duplicate",
|
||||||
|
existingPendingActions: []*portainer.PendingAction{
|
||||||
|
{
|
||||||
|
EndpointID: 7,
|
||||||
|
Action: actions.PostInitMigrateEnvironment,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPendingActions: 1,
|
||||||
|
expectedAction: actions.PostInitMigrateEnvironment,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "when no actions exist, should add migration action",
|
||||||
|
existingPendingActions: []*portainer.PendingAction{},
|
||||||
|
expectedPendingActions: 1,
|
||||||
|
expectedAction: actions.PostInitMigrateEnvironment,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
is := assert.New(t)
|
||||||
|
_, store := datastore.MustNewTestStore(t, true, true)
|
||||||
|
|
||||||
|
// Create test endpoint
|
||||||
|
endpoint := &portainer.Endpoint{
|
||||||
|
ID: 7,
|
||||||
|
UserTrusted: true,
|
||||||
|
Type: portainer.EdgeAgentOnDockerEnvironment,
|
||||||
|
Edge: portainer.EnvironmentEdgeSettings{
|
||||||
|
AsyncMode: false,
|
||||||
|
},
|
||||||
|
EdgeID: "edgeID",
|
||||||
|
}
|
||||||
|
err := store.Endpoint().Create(endpoint)
|
||||||
|
is.NoError(err, "error creating endpoint")
|
||||||
|
|
||||||
|
// Create any existing pending actions
|
||||||
|
for _, action := range tt.existingPendingActions {
|
||||||
|
err = store.PendingActions().Create(action)
|
||||||
|
is.NoError(err, "error creating pending action")
|
||||||
|
}
|
||||||
|
|
||||||
|
migrator := NewPostInitMigrator(
|
||||||
|
nil, // kubeFactory not needed for this test
|
||||||
|
nil, // dockerFactory not needed for this test
|
||||||
|
store,
|
||||||
|
"", // assetsPath not needed for this test
|
||||||
|
nil, // kubernetesDeployer not needed for this test
|
||||||
|
)
|
||||||
|
|
||||||
|
err = migrator.PostInitMigrate()
|
||||||
|
is.NoError(err, "PostInitMigrate should not return error")
|
||||||
|
|
||||||
|
// Verify the results
|
||||||
|
pendingActions, err := store.PendingActions().ReadAll()
|
||||||
|
is.NoError(err, "error reading pending actions")
|
||||||
|
is.Len(pendingActions, tt.expectedPendingActions, "unexpected number of pending actions")
|
||||||
|
|
||||||
|
// If we expect any actions, verify at least one has the expected action type
|
||||||
|
if tt.expectedPendingActions > 0 {
|
||||||
|
hasExpectedAction := false
|
||||||
|
for _, action := range pendingActions {
|
||||||
|
if action.Action == tt.expectedAction {
|
||||||
|
hasExpectedAction = true
|
||||||
|
is.Equal(endpoint.ID, action.EndpointID, "action should reference correct endpoint")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
is.True(hasExpectedAction, "should have found action of expected type")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -460,3 +460,39 @@ func WithStacks(stacks []portainer.Stack) datastoreOption {
|
||||||
d.stack = &stubStacksService{stacks: stacks}
|
d.stack = &stubStacksService{stacks: stacks}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type stubPendingActionService struct {
|
||||||
|
actions []portainer.PendingAction
|
||||||
|
dataservices.PendingActionsService
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithPendingActions(pendingActions []portainer.PendingAction) datastoreOption {
|
||||||
|
return func(d *testDatastore) {
|
||||||
|
d.pendingActionsService = &stubPendingActionService{
|
||||||
|
actions: pendingActions,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stubPendingActionService) ReadAll(predicates ...func(portainer.PendingAction) bool) ([]portainer.PendingAction, error) {
|
||||||
|
filtered := s.actions
|
||||||
|
|
||||||
|
for _, predicate := range predicates {
|
||||||
|
filtered = slicesx.Filter(filtered, predicate)
|
||||||
|
}
|
||||||
|
|
||||||
|
return filtered, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stubPendingActionService) Delete(ID portainer.PendingActionID) error {
|
||||||
|
actions := []portainer.PendingAction{}
|
||||||
|
|
||||||
|
for _, action := range s.actions {
|
||||||
|
if action.ID != ID {
|
||||||
|
actions = append(actions, action)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.actions = actions
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -71,10 +71,14 @@ func (service *PendingActionsService) execute(environmentID portainer.EndpointID
|
||||||
|
|
||||||
isKubernetesEndpoint := endpointutils.IsKubernetesEndpoint(endpoint) && !endpointutils.IsEdgeEndpoint(endpoint)
|
isKubernetesEndpoint := endpointutils.IsKubernetesEndpoint(endpoint) && !endpointutils.IsEdgeEndpoint(endpoint)
|
||||||
|
|
||||||
// EndpointStatusUp is only relevant for non-Kubernetes endpoints
|
|
||||||
// Sometimes the endpoint is UP but the status is not updated in the database
|
|
||||||
if !isKubernetesEndpoint {
|
if !isKubernetesEndpoint {
|
||||||
if endpoint.Status != portainer.EndpointStatusUp {
|
// Edge environments check the heartbeat
|
||||||
|
// Other environments check the endpoint status
|
||||||
|
if endpointutils.IsEdgeEndpoint(endpoint) {
|
||||||
|
if !endpoint.Heartbeat {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if endpoint.Status != portainer.EndpointStatusUp {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
package pendingactions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
"github.com/portainer/portainer/api/internal/testhelpers"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestExecute(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
endpoint *portainer.Endpoint
|
||||||
|
pendingActions []portainer.PendingAction
|
||||||
|
shouldExecute bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Edge endpoint with heartbeat should execute",
|
||||||
|
// Create test endpoint
|
||||||
|
endpoint: &portainer.Endpoint{
|
||||||
|
ID: 1,
|
||||||
|
Heartbeat: true,
|
||||||
|
Type: portainer.EdgeAgentOnDockerEnvironment,
|
||||||
|
EdgeID: "edge-1",
|
||||||
|
},
|
||||||
|
pendingActions: []portainer.PendingAction{
|
||||||
|
{ID: 1, EndpointID: 1, Action: "test-action"},
|
||||||
|
},
|
||||||
|
shouldExecute: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Edge endpoint without heartbeat should not execute",
|
||||||
|
endpoint: &portainer.Endpoint{
|
||||||
|
ID: 2,
|
||||||
|
EdgeID: "edge-2",
|
||||||
|
Heartbeat: false,
|
||||||
|
Type: portainer.EdgeAgentOnDockerEnvironment,
|
||||||
|
},
|
||||||
|
pendingActions: []portainer.PendingAction{
|
||||||
|
{ID: 2, EndpointID: 2, Action: "test-action"},
|
||||||
|
},
|
||||||
|
shouldExecute: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Regular endpoint with status UP should execute",
|
||||||
|
endpoint: &portainer.Endpoint{
|
||||||
|
ID: 3,
|
||||||
|
Status: portainer.EndpointStatusUp,
|
||||||
|
Type: portainer.AgentOnDockerEnvironment,
|
||||||
|
},
|
||||||
|
pendingActions: []portainer.PendingAction{
|
||||||
|
{ID: 3, EndpointID: 3, Action: "test-action"},
|
||||||
|
},
|
||||||
|
shouldExecute: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Regular endpoint with status DOWN should not execute",
|
||||||
|
endpoint: &portainer.Endpoint{
|
||||||
|
ID: 4,
|
||||||
|
Status: portainer.EndpointStatusDown,
|
||||||
|
Type: portainer.AgentOnDockerEnvironment,
|
||||||
|
},
|
||||||
|
pendingActions: []portainer.PendingAction{
|
||||||
|
{ID: 4, EndpointID: 4, Action: "test-action"},
|
||||||
|
},
|
||||||
|
shouldExecute: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Setup services
|
||||||
|
store := testhelpers.NewDatastore(testhelpers.WithEndpoints([]portainer.Endpoint{*tt.endpoint}), testhelpers.WithPendingActions(tt.pendingActions))
|
||||||
|
service := NewService(store, nil)
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
service.execute(tt.endpoint.ID)
|
||||||
|
|
||||||
|
// Verify expectations
|
||||||
|
pendingActions, _ := store.PendingActions().ReadAll()
|
||||||
|
if tt.shouldExecute {
|
||||||
|
assert.Equal(t, len(tt.pendingActions)-1, len(pendingActions))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, len(tt.pendingActions), len(pendingActions))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue