mirror of
https://github.com/allinssl/allinssl.git
synced 2025-12-15 09:55:37 +08:00
Add files via upload
This commit is contained in:
33
backend/internal/workflow/context.go
Normal file
33
backend/internal/workflow/context.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package workflow
|
||||
|
||||
import "ALLinSSL/backend/public"
|
||||
|
||||
func NewExecutionContext(RunID string) *ExecutionContext {
|
||||
Logger, _ := public.NewLogger(public.GetSettingIgnoreError("workflow_log_path") + RunID + ".log")
|
||||
return &ExecutionContext{
|
||||
Data: make(map[string]any),
|
||||
Status: make(map[string]ExecutionStatus),
|
||||
RunID: RunID,
|
||||
Logger: Logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) SetOutput(nodeID string, output any, status ExecutionStatus) {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
ctx.Data[nodeID] = output
|
||||
ctx.Status[nodeID] = status
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) GetOutput(nodeID string) (any, bool) {
|
||||
ctx.mu.RLock()
|
||||
defer ctx.mu.RUnlock()
|
||||
out, ok := ctx.Data[nodeID]
|
||||
return out, ok
|
||||
}
|
||||
|
||||
func (ctx *ExecutionContext) GetStatus(nodeID string) ExecutionStatus {
|
||||
ctx.mu.RLock()
|
||||
defer ctx.mu.RUnlock()
|
||||
return ctx.Status[nodeID]
|
||||
}
|
||||
107
backend/internal/workflow/executor.go
Normal file
107
backend/internal/workflow/executor.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"ALLinSSL/backend/internal/cert"
|
||||
certApply "ALLinSSL/backend/internal/cert/apply"
|
||||
certDeploy "ALLinSSL/backend/internal/cert/deploy"
|
||||
"ALLinSSL/backend/internal/report"
|
||||
"ALLinSSL/backend/public"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// var executors map[string]func(map[string]any) (any, error)
|
||||
//
|
||||
// func RegistExector(executorName string, executor func(map[string]any) (any, error)) {
|
||||
// executors[executorName] = executor
|
||||
// }
|
||||
|
||||
func Executors(exec string, params map[string]any) (any, error) {
|
||||
switch exec {
|
||||
case "apply":
|
||||
return apply(params)
|
||||
case "deploy":
|
||||
return deploy(params)
|
||||
case "upload":
|
||||
return upload(params)
|
||||
case "notify":
|
||||
return notify(params)
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func apply(params map[string]any) (any, error) {
|
||||
logger := params["logger"].(*public.Logger)
|
||||
|
||||
logger.Info("=============申请证书=============")
|
||||
certificate, err := certApply.Apply(params, logger)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
logger.Info("=============申请失败=============")
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("=============申请成功=============")
|
||||
return certificate, nil
|
||||
}
|
||||
|
||||
func deploy(params map[string]any) (any, error) {
|
||||
logger := params["logger"].(*public.Logger)
|
||||
logger.Info("=============部署证书=============")
|
||||
certificate := params["certificate"]
|
||||
if certificate == nil {
|
||||
logger.Error("证书不存在")
|
||||
logger.Info("=============部署失败=============")
|
||||
return nil, errors.New("证书不存在")
|
||||
}
|
||||
err := certDeploy.Deploy(params, logger)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
logger.Info("=============部署失败=============")
|
||||
} else {
|
||||
logger.Info("=============部署成功=============")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func upload(params map[string]any) (any, error) {
|
||||
logger := params["logger"].(*public.Logger)
|
||||
logger.Info("=============上传证书=============")
|
||||
|
||||
keyStr, ok := params["key"].(string)
|
||||
if !ok {
|
||||
logger.Error("上传的密钥有误")
|
||||
logger.Info("=============上传失败=============")
|
||||
return nil, errors.New("上传的密钥有误")
|
||||
}
|
||||
certStr, ok := params["cert"].(string)
|
||||
if !ok {
|
||||
logger.Error("上传的证书有误")
|
||||
logger.Info("=============上传失败=============")
|
||||
return nil, errors.New("上传的证书有误")
|
||||
}
|
||||
err := cert.UploadCert(keyStr, certStr)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
logger.Info("=============上传失败=============")
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("=============上传成功=============")
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func notify(params map[string]any) (any, error) {
|
||||
// fmt.Println("通知:", params)
|
||||
logger := params["logger"].(*public.Logger)
|
||||
logger.Info("=============发送通知=============")
|
||||
logger.Debug(fmt.Sprintf("发送通知:%s", params["subject"].(string)))
|
||||
err := report.Notify(params)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
logger.Info("=============发送失败=============")
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("=============发送成功=============")
|
||||
return fmt.Sprintf("通知到: %s", params["message"]), nil
|
||||
}
|
||||
49
backend/internal/workflow/models.go
Normal file
49
backend/internal/workflow/models.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"ALLinSSL/backend/public"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type ExecutionStatus string
|
||||
|
||||
const (
|
||||
StatusSuccess ExecutionStatus = "success"
|
||||
StatusFailed ExecutionStatus = "fail"
|
||||
)
|
||||
|
||||
type WorkflowNodeParams struct {
|
||||
Name string `json:"name"`
|
||||
FromNodeID string `json:"fromNodeId,omitempty"`
|
||||
}
|
||||
|
||||
type WorkflowNode struct {
|
||||
Id string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
|
||||
Config map[string]any `json:"config"`
|
||||
Inputs []WorkflowNodeParams `json:"inputs"`
|
||||
// Outputs []WorkflowNodeParams `json:"outputs"`
|
||||
|
||||
ChildNode *WorkflowNode `json:"childNode,omitempty"`
|
||||
ConditionNodes []*WorkflowNode `json:"conditionNodes,omitempty"`
|
||||
|
||||
Validated bool `json:"validated"`
|
||||
}
|
||||
|
||||
type ExecutionContext struct {
|
||||
Data map[string]any
|
||||
Status map[string]ExecutionStatus
|
||||
mu sync.RWMutex
|
||||
RunID string
|
||||
Logger *public.Logger
|
||||
}
|
||||
|
||||
type ExecTime struct {
|
||||
Type string `json:"type"`
|
||||
Month int `json:"month,omitempty"`
|
||||
Week int `json:"week,omitempty"`
|
||||
Hour int `json:"hour"`
|
||||
Minute int `json:"minute"`
|
||||
}
|
||||
294
backend/internal/workflow/workflow.go
Normal file
294
backend/internal/workflow/workflow.go
Normal file
@@ -0,0 +1,294 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"ALLinSSL/backend/public"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func GetSqlite() (*public.Sqlite, error) {
|
||||
s, err := public.NewSqlite("data/data.db", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Connect()
|
||||
s.TableName = "workflow"
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func GetList(search string, p, limit int64) ([]map[string]any, int, error) {
|
||||
var data []map[string]any
|
||||
var count int64
|
||||
s, err := GetSqlite()
|
||||
if err != nil {
|
||||
return data, 0, err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
var limits []int64
|
||||
if p >= 0 && limit >= 0 {
|
||||
limits = []int64{0, limit}
|
||||
if p > 1 {
|
||||
limits[0] = (p - 1) * limit
|
||||
limits[1] = p * limit
|
||||
}
|
||||
}
|
||||
|
||||
if search != "" {
|
||||
count, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Count()
|
||||
data, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Order("update_time", "desc").Limit(limits).Select()
|
||||
} else {
|
||||
count, err = s.Count()
|
||||
data, err = s.Order("update_time", "desc").Limit(limits).Select()
|
||||
}
|
||||
if err != nil {
|
||||
return data, 0, err
|
||||
}
|
||||
return data, int(count), nil
|
||||
}
|
||||
|
||||
func AddWorkflow(name, content, execType, active, execTime string) error {
|
||||
var node WorkflowNode
|
||||
err := json.Unmarshal([]byte(content), &node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("检测到工作流配置有问题:%v", err)
|
||||
}
|
||||
|
||||
s, err := GetSqlite()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
now := time.Now().Format("2006-01-02 15:04:05")
|
||||
_, err = s.Insert(map[string]interface{}{
|
||||
"name": name,
|
||||
"content": content,
|
||||
"exec_type": execType,
|
||||
"active": active,
|
||||
"exec_time": execTime,
|
||||
"create_time": now,
|
||||
"update_time": now,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DelWorkflow(id string) error {
|
||||
s, err := GetSqlite()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
_, err = s.Where("id=?", []interface{}{id}).Delete()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdDb(id string, data map[string]any) error {
|
||||
s, err := GetSqlite()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
data["update_time"] = time.Now().Format("2006-01-02 15:04:05")
|
||||
_, err = s.Where("id=?", []interface{}{id}).Update(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdWorkflow(id, name, content, execType, active, execTime string) error {
|
||||
var node WorkflowNode
|
||||
err := json.Unmarshal([]byte(content), &node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("检测到工作流配置有问题:%v", err)
|
||||
}
|
||||
err = UpdDb(id, map[string]interface{}{
|
||||
"name": name,
|
||||
"content": content,
|
||||
"exec_type": execType,
|
||||
"active": active,
|
||||
"exec_time": execTime,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdExecType(id, execType string) error {
|
||||
err := UpdDb(id, map[string]interface{}{
|
||||
"exec_type": execType,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdActive(id, active string) error {
|
||||
err := UpdDb(id, map[string]interface{}{
|
||||
"active": active,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ExecuteWorkflow(id string) error {
|
||||
s, err := GetSqlite()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
data, err := s.Where("id=?", []interface{}{id}).Select()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return fmt.Errorf("workflow not found")
|
||||
}
|
||||
if data[0]["last_run_status"] != nil && data[0]["last_run_status"].(string) == "running" {
|
||||
return fmt.Errorf("工作流正在执行中")
|
||||
}
|
||||
content := data[0]["content"].(string)
|
||||
|
||||
go func(id, c string) {
|
||||
// defer wg.Done()
|
||||
// WorkflowID := strconv.FormatInt(id, 10)
|
||||
RunID, err := AddWorkflowHistory(id, "manual")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ctx := NewExecutionContext(RunID)
|
||||
defer ctx.Logger.Close()
|
||||
err = RunWorkflow(c, ctx)
|
||||
if err != nil {
|
||||
fmt.Println("执行工作流失败:", err)
|
||||
SetWorkflowStatus(id, RunID, "fail")
|
||||
} else {
|
||||
SetWorkflowStatus(id, RunID, "success")
|
||||
}
|
||||
}(id, content)
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetWorkflowStatus(id, RunID, status string) {
|
||||
_ = UpdateWorkflowHistory(RunID, status)
|
||||
_ = UpdDb(id, map[string]interface{}{"last_run_status": status})
|
||||
}
|
||||
|
||||
func resolveInputs(inputs []WorkflowNodeParams, ctx *ExecutionContext) map[string]any {
|
||||
resolved := make(map[string]any)
|
||||
for _, input := range inputs {
|
||||
if input.FromNodeID != "" {
|
||||
if val, ok := ctx.GetOutput(input.FromNodeID); ok {
|
||||
switch strings.Split(strings.TrimPrefix(input.FromNodeID, "-"), "-")[0] {
|
||||
case "apply":
|
||||
input.Name = "certificate"
|
||||
case "upload":
|
||||
input.Name = "certificate"
|
||||
}
|
||||
resolved[input.Name] = val
|
||||
}
|
||||
}
|
||||
}
|
||||
return resolved
|
||||
}
|
||||
|
||||
func RunNode(node *WorkflowNode, ctx *ExecutionContext) error {
|
||||
// 获取上下文
|
||||
inputs := resolveInputs(node.Inputs, ctx)
|
||||
// 组装参数
|
||||
if node.Config == nil {
|
||||
node.Config = make(map[string]any)
|
||||
}
|
||||
for k, v := range inputs {
|
||||
node.Config[k] = v
|
||||
}
|
||||
node.Config["_runId"] = ctx.RunID
|
||||
node.Config["logger"] = ctx.Logger
|
||||
|
||||
// 执行当前节点
|
||||
result, err := Executors(node.Type, node.Config)
|
||||
|
||||
var status ExecutionStatus
|
||||
if err != nil {
|
||||
status = StatusFailed
|
||||
if node.ChildNode == nil || node.ChildNode.Type != "execute_result_branch" {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
status = StatusSuccess
|
||||
}
|
||||
|
||||
ctx.SetOutput(node.Id, result, status)
|
||||
|
||||
// 普通的并行
|
||||
if node.Type == "branch" {
|
||||
if len(node.ConditionNodes) > 0 {
|
||||
var wg sync.WaitGroup
|
||||
errChan := make(chan error, len(node.ConditionNodes))
|
||||
for _, branch := range node.ConditionNodes {
|
||||
wg.Add(1)
|
||||
go func(node *WorkflowNode) {
|
||||
defer wg.Done()
|
||||
if err = RunNode(node, ctx); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}(branch)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
for err := range errChan {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// 条件分支
|
||||
if node.Type == "execute_result_branch" {
|
||||
//
|
||||
if len(node.ConditionNodes) > 0 {
|
||||
lastStatus := ctx.GetStatus(node.Config["fromNodeId"].(string))
|
||||
for _, branch := range node.ConditionNodes {
|
||||
if branch.Config["type"] == string(lastStatus) {
|
||||
return RunNode(branch, ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if node.ChildNode != nil {
|
||||
return RunNode(node.ChildNode, ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunWorkflow(content string, ctx *ExecutionContext) error {
|
||||
var node WorkflowNode
|
||||
err := json.Unmarshal([]byte(content), &node)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
ctx.Logger.Info("=============开始执行=============")
|
||||
err = RunNode(&node, ctx)
|
||||
// fmt.Println(err)
|
||||
if err != nil {
|
||||
ctx.Logger.Info("=============执行失败=============")
|
||||
return err
|
||||
}
|
||||
ctx.Logger.Info("=============执行完成=============")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
117
backend/internal/workflow/workflow_history.go
Normal file
117
backend/internal/workflow/workflow_history.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"ALLinSSL/backend/public"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetSqliteObjWH 工作流执行历史记录表对象
|
||||
func GetSqliteObjWH() (*public.Sqlite, error) {
|
||||
s, err := public.NewSqlite("data/data.db", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Connect()
|
||||
s.TableName = "workflow_history"
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// GetListWH 获取工作流执行历史记录列表
|
||||
func GetListWH(id string, p, limit int64) ([]map[string]any, int, error) {
|
||||
var data []map[string]any
|
||||
var count int64
|
||||
s, err := GetSqliteObjWH()
|
||||
if err != nil {
|
||||
return data, 0, err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
var limits []int64
|
||||
if p >= 0 && limit >= 0 {
|
||||
limits = []int64{0, limit}
|
||||
if p > 1 {
|
||||
limits[0] = (p - 1) * limit
|
||||
limits[1] = p * limit
|
||||
}
|
||||
}
|
||||
if id == "" {
|
||||
count, err = s.Count()
|
||||
data, err = s.Limit(limits).Order("create_time", "desc").Select()
|
||||
} else {
|
||||
count, err = s.Where("workflow_id=?", []interface{}{id}).Count()
|
||||
data, err = s.Where("workflow_id=?", []interface{}{id}).Limit(limits).Order("create_time", "desc").Select()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return data, 0, err
|
||||
}
|
||||
return data, int(count), nil
|
||||
}
|
||||
|
||||
// 添加工作流执行历史记录
|
||||
func AddWorkflowHistory(workflowID, execType string) (string, error) {
|
||||
s, err := GetSqliteObjWH()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer s.Close()
|
||||
now := time.Now().Format("2006-01-02 15:04:05")
|
||||
ID := public.GenerateUUID()
|
||||
_, err = s.Insert(map[string]interface{}{
|
||||
"id": ID,
|
||||
"workflow_id": workflowID,
|
||||
"status": "running",
|
||||
"exec_type": execType,
|
||||
"create_time": now,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
_ = UpdDb(workflowID, map[string]interface{}{"last_run_status": "running", "last_run_time": now})
|
||||
return ID, nil
|
||||
}
|
||||
|
||||
// 工作流执行结束
|
||||
func UpdateWorkflowHistory(id, status string) error {
|
||||
s, err := GetSqliteObjWH()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
now := time.Now().Format("2006-01-02 15:04:05")
|
||||
_, err = s.Where("id=?", []interface{}{id}).Update(map[string]interface{}{
|
||||
"status": status,
|
||||
"end_time": now,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func StopWorkflow(id string) error {
|
||||
s, err := GetSqliteObjWH()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
data, err := s.Where("id=?", []interface{}{id}).Select()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
SetWorkflowStatus(data[0]["workflow_id"].(string), id, "fail")
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetExecLog(id string) (string, error) {
|
||||
log, err := os.ReadFile(filepath.Join(public.GetSettingIgnoreError("workflow_log_path"), id+".log"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(log), nil
|
||||
}
|
||||
Reference in New Issue
Block a user