mirror of https://github.com/usual2970/certimate
				
				
				
			
		
			
				
	
	
		
			125 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Go
		
	
	
| package workflow
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/usual2970/certimate/internal/app"
 | |
| 	"github.com/usual2970/certimate/internal/domain"
 | |
| 	"github.com/usual2970/certimate/internal/domain/dtos"
 | |
| 	"github.com/usual2970/certimate/internal/workflow/dispatcher"
 | |
| )
 | |
| 
 | |
| type workflowRepository interface {
 | |
| 	ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
 | |
| 	GetById(ctx context.Context, id string) (*domain.Workflow, error)
 | |
| 	Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
 | |
| }
 | |
| 
 | |
| type workflowRunRepository interface {
 | |
| 	GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
 | |
| 	Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
 | |
| }
 | |
| 
 | |
| type WorkflowService struct {
 | |
| 	dispatcher *dispatcher.WorkflowDispatcher
 | |
| 
 | |
| 	workflowRepo    workflowRepository
 | |
| 	workflowRunRepo workflowRunRepository
 | |
| }
 | |
| 
 | |
| func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService {
 | |
| 	srv := &WorkflowService{
 | |
| 		dispatcher: dispatcher.GetSingletonDispatcher(workflowRepo, workflowRunRepo),
 | |
| 
 | |
| 		workflowRepo:    workflowRepo,
 | |
| 		workflowRunRepo: workflowRunRepo,
 | |
| 	}
 | |
| 	return srv
 | |
| }
 | |
| 
 | |
| func (s *WorkflowService) InitSchedule(ctx context.Context) error {
 | |
| 	workflows, err := s.workflowRepo.ListEnabledAuto(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	scheduler := app.GetScheduler()
 | |
| 	for _, workflow := range workflows {
 | |
| 		var errs []error
 | |
| 
 | |
| 		err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
 | |
| 			s.StartRun(ctx, &dtos.WorkflowStartRunReq{
 | |
| 				WorkflowId: workflow.Id,
 | |
| 				RunTrigger: domain.WorkflowTriggerTypeAuto,
 | |
| 			})
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			errs = append(errs, err)
 | |
| 		}
 | |
| 
 | |
| 		if len(errs) > 0 {
 | |
| 			return errors.Join(errs...)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
 | |
| 	workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if workflow.LastRunStatus == domain.WorkflowRunStatusTypePending || workflow.LastRunStatus == domain.WorkflowRunStatusTypeRunning {
 | |
| 		return errors.New("workflow is already pending or running")
 | |
| 	}
 | |
| 
 | |
| 	run := &domain.WorkflowRun{
 | |
| 		WorkflowId: workflow.Id,
 | |
| 		Status:     domain.WorkflowRunStatusTypePending,
 | |
| 		Trigger:    req.RunTrigger,
 | |
| 		StartedAt:  time.Now(),
 | |
| 	}
 | |
| 	if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil {
 | |
| 		return err
 | |
| 	} else {
 | |
| 		run = resp
 | |
| 	}
 | |
| 
 | |
| 	s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{
 | |
| 		WorkflowId:      workflow.Id,
 | |
| 		WorkflowContent: workflow.Content,
 | |
| 		RunId:           run.Id,
 | |
| 	})
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
 | |
| 	workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	} else if workflowRun.WorkflowId != workflow.Id {
 | |
| 		return errors.New("workflow run not found")
 | |
| 	} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
 | |
| 		return errors.New("workflow run is not pending or running")
 | |
| 	}
 | |
| 
 | |
| 	s.dispatcher.Cancel(workflowRun.Id)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *WorkflowService) Shutdown(ctx context.Context) {
 | |
| 	s.dispatcher.Shutdown()
 | |
| }
 |