diff --git a/pkg/libstack/compose/composeplugin_test.go b/pkg/libstack/compose/composeplugin_test.go index 9f56e1c3b..dd423b700 100644 --- a/pkg/libstack/compose/composeplugin_test.go +++ b/pkg/libstack/compose/composeplugin_test.go @@ -59,7 +59,7 @@ services: require.True(t, containerExists(composeContainerName)) - waitResult := <-w.WaitForStatus(ctx, projectName, libstack.StatusCompleted) + waitResult := w.WaitForStatus(ctx, projectName, libstack.StatusCompleted) require.Empty(t, waitResult.ErrorMsg) require.Equal(t, libstack.StatusCompleted, waitResult.Status) diff --git a/pkg/libstack/compose/status.go b/pkg/libstack/compose/status.go index f5c3a945c..bbbe2264b 100644 --- a/pkg/libstack/compose/status.go +++ b/pkg/libstack/compose/status.go @@ -111,74 +111,66 @@ func aggregateStatuses(services []service) (libstack.Status, string) { } -func (c *ComposeDeployer) WaitForStatus(ctx context.Context, name string, status libstack.Status) <-chan libstack.WaitResult { - waitResultCh := make(chan libstack.WaitResult) +func (c *ComposeDeployer) WaitForStatus(ctx context.Context, name string, status libstack.Status) libstack.WaitResult { waitResult := libstack.WaitResult{Status: status} - go func() { - for { - select { - case <-ctx.Done(): - waitResult.ErrorMsg = "failed to wait for status: " + ctx.Err().Error() - waitResultCh <- waitResult - default: - } + for { + if ctx.Err() != nil { + waitResult.ErrorMsg = "failed to wait for status: " + ctx.Err().Error() - time.Sleep(1 * time.Second) + return waitResult + } - var containerSummaries []api.ContainerSummary + time.Sleep(1 * time.Second) - if err := withComposeService(ctx, nil, libstack.Options{ProjectName: name}, func(composeService api.Service, project *types.Project) error { - var err error + var containerSummaries []api.ContainerSummary - psCtx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) - defer cancelFunc() - containerSummaries, err = composeService.Ps(psCtx, name, api.PsOptions{All: true}) + if err := withComposeService(ctx, nil, libstack.Options{ProjectName: name}, func(composeService api.Service, project *types.Project) error { + var err error - return err - }); err != nil { - log.Debug(). - Str("project_name", name). - Err(err). - Msg("error from docker compose ps") - - continue - } - - services := serviceListFromContainerSummary(containerSummaries) - - if len(services) == 0 && status == libstack.StatusRemoved { - waitResultCh <- waitResult - return - } - - aggregateStatus, errorMessage := aggregateStatuses(services) - if aggregateStatus == status { - waitResultCh <- waitResult - return - } - - if status == libstack.StatusRunning && aggregateStatus == libstack.StatusCompleted { - waitResult.Status = libstack.StatusCompleted - waitResultCh <- waitResult - return - } - - if errorMessage != "" { - waitResult.ErrorMsg = errorMessage - waitResultCh <- waitResult - return - } + psCtx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) + defer cancelFunc() + containerSummaries, err = composeService.Ps(psCtx, name, api.PsOptions{All: true}) + return err + }); err != nil { log.Debug(). Str("project_name", name). - Str("required_status", string(status)). - Str("status", string(aggregateStatus)). - Msg("waiting for status") - } - }() + Err(err). + Msg("error from docker compose ps") - return waitResultCh + continue + } + + services := serviceListFromContainerSummary(containerSummaries) + + if len(services) == 0 && status == libstack.StatusRemoved { + return waitResult + } + + aggregateStatus, errorMessage := aggregateStatuses(services) + if aggregateStatus == status { + return waitResult + } + + if status == libstack.StatusRunning && aggregateStatus == libstack.StatusCompleted { + waitResult.Status = libstack.StatusCompleted + + return waitResult + } + + if errorMessage != "" { + waitResult.ErrorMsg = errorMessage + + return waitResult + } + + log.Debug(). + Str("project_name", name). + Str("required_status", string(status)). + Str("status", string(aggregateStatus)). + Msg("waiting for status") + } } func serviceListFromContainerSummary(containerSummaries []api.ContainerSummary) []service { diff --git a/pkg/libstack/compose/status_integration_test.go b/pkg/libstack/compose/status_integration_test.go index bc4800cd3..ad38914d2 100644 --- a/pkg/libstack/compose/status_integration_test.go +++ b/pkg/libstack/compose/status_integration_test.go @@ -106,8 +106,7 @@ func waitForStatus(deployer libstack.Deployer, ctx context.Context, stackName st ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - statusCh := deployer.WaitForStatus(ctx, stackName, requiredStatus) - result := <-statusCh + result := deployer.WaitForStatus(ctx, stackName, requiredStatus) if result.ErrorMsg == "" { return result.Status, "", nil } diff --git a/pkg/libstack/libstack.go b/pkg/libstack/libstack.go index 5b9b6effb..4e7b184ea 100644 --- a/pkg/libstack/libstack.go +++ b/pkg/libstack/libstack.go @@ -18,7 +18,7 @@ type Deployer interface { Pull(ctx context.Context, filePaths []string, options Options) error Run(ctx context.Context, filePaths []string, serviceName string, options RunOptions) error Validate(ctx context.Context, filePaths []string, options Options) error - WaitForStatus(ctx context.Context, name string, status Status) <-chan WaitResult + WaitForStatus(ctx context.Context, name string, status Status) WaitResult Config(ctx context.Context, filePaths []string, options Options) ([]byte, error) GetExistingEdgeStacks(ctx context.Context) ([]EdgeStack, error) }