From 5d20612d77326dacd3c759404eff60402be8d7e0 Mon Sep 17 00:00:00 2001 From: andres-portainer Date: Fri, 1 Dec 2023 16:29:10 -0300 Subject: [PATCH] fix(gitops): add data reconciliation to avoid data races EE-6114 --- api/datastore/services_tx.go | 5 +- api/stacks/deployments/deploy.go | 208 ++++++++++++++++++++----------- 2 files changed, 136 insertions(+), 77 deletions(-) diff --git a/api/datastore/services_tx.go b/api/datastore/services_tx.go index 44375baaa..113952446 100644 --- a/api/datastore/services_tx.go +++ b/api/datastore/services_tx.go @@ -66,7 +66,10 @@ func (tx *StoreTx) Snapshot() dataservices.SnapshotService { } func (tx *StoreTx) SSLSettings() dataservices.SSLSettingsService { return nil } -func (tx *StoreTx) Stack() dataservices.StackService { return nil } + +func (tx *StoreTx) Stack() dataservices.StackService { + return tx.store.StackService.Tx(tx.tx) +} func (tx *StoreTx) Tag() dataservices.TagService { return tx.store.TagService.Tx(tx.tx) diff --git a/api/stacks/deployments/deploy.go b/api/stacks/deployments/deploy.go index b5111de02..cc8997313 100644 --- a/api/stacks/deployments/deploy.go +++ b/api/stacks/deployments/deploy.go @@ -27,50 +27,61 @@ func (e *StackAuthorMissingErr) Error() string { return fmt.Sprintf("stack's %v author %s is missing", e.stackID, e.authorName) } +var errDoNothing = errors.New("do nothing") + // RedeployWhenChanged pull and redeploy the stack when git repo changed // Stack will always be redeployed if force deployment is set to true func RedeployWhenChanged(stackID portainer.StackID, deployer StackDeployer, datastore dataservices.DataStore, gitService portainer.GitService) error { log.Debug().Int("stack_id", int(stackID)).Msg("redeploying stack") - stack, err := datastore.Stack().Read(stackID) - if dataservices.IsErrObjectNotFound(err) { - return scheduler.NewPermanentError(errors.WithMessagef(err, "failed to get the stack %v", stackID)) + var stack *portainer.Stack + var endpoint *portainer.Endpoint + var user *portainer.User + var registries []portainer.Registry + + err := datastore.ViewTx(func(tx dataservices.DataStoreTx) error { + var err error + + stack, err = tx.Stack().Read(stackID) + if dataservices.IsErrObjectNotFound(err) { + return scheduler.NewPermanentError(errors.WithMessagef(err, "failed to get the stack %v", stackID)) + } else if err != nil { + return errors.WithMessagef(err, "failed to get the stack %v", stackID) + } + + if stack.GitConfig == nil { + return errDoNothing // do nothing if it isn't a git-based stack + } + + endpoint, err = tx.Endpoint().Endpoint(stack.EndpointID) + if dataservices.IsErrObjectNotFound(err) { + return scheduler.NewPermanentError( + errors.WithMessagef(err, + "failed to find the environment %v associated to the stack %v", + stack.EndpointID, + stack.ID, + ), + ) + } else if err != nil { + return errors.WithMessagef(err, "failed to find the environment %v associated to the stack %v", stack.EndpointID, stack.ID) + } + + user, err = validateAuthor(tx, stack) + if err != nil { + return err + } + + registries, err = getUserRegistries(tx, user, endpoint.ID) + if dataservices.IsErrObjectNotFound(err) { + return scheduler.NewPermanentError(err) + } + + return err + }) + if errors.Is(err, errDoNothing) { + return nil } else if err != nil { - return errors.WithMessagef(err, "failed to get the stack %v", stackID) - } - - if stack.GitConfig == nil { - return nil // do nothing if it isn't a git-based stack - } - - endpoint, err := datastore.Endpoint().Endpoint(stack.EndpointID) - if dataservices.IsErrObjectNotFound(err) { - return scheduler.NewPermanentError( - errors.WithMessagef(err, - "failed to find the environment %v associated to the stack %v", - stack.EndpointID, - stack.ID, - ), - ) - } else if err != nil { - return errors.WithMessagef(err, "failed to find the environment %v associated to the stack %v", stack.EndpointID, stack.ID) - } - - author := stack.UpdatedBy - if author == "" { - author = stack.CreatedBy - } - - user, err := datastore.User().UserByUsername(author) - if err != nil { - log.Warn(). - Int("stack_id", int(stackID)). - Str("author", author). - Str("stack", stack.Name). - Int("endpoint_id", int(stack.EndpointID)). - Msg("cannot auto update a stack, stack author user is missing") - - return &StackAuthorMissingErr{int(stack.ID), author} + return err } if !isEnvironmentOnline(endpoint) { @@ -95,58 +106,58 @@ func RedeployWhenChanged(stackID portainer.StackID, deployer StackDeployer, data return nil } - registries, err := getUserRegistries(datastore, user, endpoint.ID) - if dataservices.IsErrObjectNotFound(err) { - return scheduler.NewPermanentError(err) - } else if err != nil { + if err := deployStack(deployer, stack, endpoint, user, registries); err != nil { return err } - switch stack.Type { - case portainer.DockerComposeStack: - - if stackutils.IsRelativePathStack(stack) { - err = deployer.DeployRemoteComposeStack(stack, endpoint, registries, true, false) - } else { - err = deployer.DeployComposeStack(stack, endpoint, registries, true, false) - } - + err = datastore.UpdateTx(func(tx dataservices.DataStoreTx) error { + latestStack, err := tx.Stack().Read(stack.ID) if err != nil { - return errors.WithMessagef(err, "failed to deploy a docker compose stack %v", stackID) + return err } - case portainer.DockerSwarmStack: - if stackutils.IsRelativePathStack(stack) { - err = deployer.DeployRemoteSwarmStack(stack, endpoint, registries, true, true) - } else { - err = deployer.DeploySwarmStack(stack, endpoint, registries, true, true) - } - if err != nil { - return errors.WithMessagef(err, "failed to deploy a docker compose stack %v", stackID) - } - case portainer.KubernetesStack: - log.Debug(). - Int("stack_id", int(stackID)). - Msg("deploying a kube app") - err := deployer.DeployKubernetesStack(stack, endpoint, user) - if err != nil { - return errors.WithMessagef(err, "failed to deploy a kubernetes app stack %v", stackID) + if gitCommitChangedOrForceUpdate { + if latestStack.GitConfig != nil && stack.GitConfig != nil { + latestStack.GitConfig.ConfigHash = stack.GitConfig.ConfigHash + } + + latestStack.UpdateDate = time.Now().Unix() } - default: - return errors.Errorf("cannot update stack, type %v is unsupported", stack.Type) - } - stack.Status = portainer.StackStatusActive + latestStack.Status = portainer.StackStatusActive - if err := datastore.Stack().Update(stack.ID, stack); err != nil { + return tx.Stack().Update(stack.ID, latestStack) + }) + if err != nil { return errors.WithMessagef(err, "failed to update the stack %v", stack.ID) } return nil } -func getUserRegistries(datastore dataservices.DataStore, user *portainer.User, endpointID portainer.EndpointID) ([]portainer.Registry, error) { - registries, err := datastore.Registry().ReadAll() +func validateAuthor(tx dataservices.DataStoreTx, stack *portainer.Stack) (*portainer.User, error) { + author := stack.UpdatedBy + if author == "" { + author = stack.CreatedBy + } + + user, err := tx.User().UserByUsername(author) + if err != nil { + log.Warn(). + Int("stack_id", int(stack.ID)). + Str("author", author). + Str("stack", stack.Name). + Int("endpoint_id", int(stack.EndpointID)). + Msg("cannot auto update a stack, stack author user is missing") + + return nil, &StackAuthorMissingErr{int(stack.ID), author} + } + + return user, nil +} + +func getUserRegistries(tx dataservices.DataStoreTx, user *portainer.User, endpointID portainer.EndpointID) ([]portainer.Registry, error) { + registries, err := tx.Registry().ReadAll() if err != nil { return nil, errors.WithMessage(err, "unable to retrieve registries from the database") } @@ -155,7 +166,7 @@ func getUserRegistries(datastore dataservices.DataStore, user *portainer.User, e return registries, nil } - userMemberships, err := datastore.TeamMembership().TeamMembershipsByUserID(user.ID) + userMemberships, err := tx.TeamMembership().TeamMembershipsByUserID(user.ID) if err != nil { return nil, errors.WithMessagef(err, "failed to fetch memberships of the stack author [%s]", user.Username) } @@ -188,3 +199,48 @@ func isEnvironmentOnline(endpoint *portainer.Endpoint) bool { _, _, err = agent.GetAgentVersionAndPlatform(endpoint.URL, tlsConfig) return err == nil } + +func deployStack( + deployer StackDeployer, + stack *portainer.Stack, + endpoint *portainer.Endpoint, + user *portainer.User, + registries []portainer.Registry, +) error { + var err error + + switch stack.Type { + case portainer.DockerComposeStack: + if stackutils.IsRelativePathStack(stack) { + err = deployer.DeployRemoteComposeStack(stack, endpoint, registries, true, false) + } else { + err = deployer.DeployComposeStack(stack, endpoint, registries, true, false) + } + + if err != nil { + return errors.WithMessagef(err, "failed to deploy a docker compose stack %v", stack.ID) + } + case portainer.DockerSwarmStack: + if stackutils.IsRelativePathStack(stack) { + err = deployer.DeployRemoteSwarmStack(stack, endpoint, registries, true, true) + } else { + err = deployer.DeploySwarmStack(stack, endpoint, registries, true, true) + } + if err != nil { + return errors.WithMessagef(err, "failed to deploy a docker compose stack %v", stack.ID) + } + case portainer.KubernetesStack: + log.Debug(). + Int("stack_id", int(stack.ID)). + Msg("deploying a kube app") + + err := deployer.DeployKubernetesStack(stack, endpoint, user) + if err != nil { + return errors.WithMessagef(err, "failed to deploy a kubernetes app stack %v", stack.ID) + } + default: + return errors.Errorf("cannot update stack, type %v is unsupported", stack.Type) + } + + return nil +}