diff --git a/packages/core/pipeline/src/core/executor.ts b/packages/core/pipeline/src/core/executor.ts index 9dba70b2..47e398e8 100644 --- a/packages/core/pipeline/src/core/executor.ts +++ b/packages/core/pipeline/src/core/executor.ts @@ -1,6 +1,6 @@ -import { ConcurrencyStrategy, HistoryResult, HistoryResultGroup, Pipeline, ResultType, Runnable, RunStrategy, Stage, Step, Task } from "../d.ts"; +import { ConcurrencyStrategy, Pipeline, ResultType, Runnable, RunStrategy, Stage, Step, Task } from "../d.ts"; import _ from "lodash"; -import { RunHistory } from "./run-history"; +import { HistoryStatus, RunHistory } from "./run-history"; import { pluginRegistry, TaskPlugin } from "../plugin"; import { IAccessService } from "../access/access-service"; import { ContextFactory, IContext } from "./context"; @@ -32,34 +32,37 @@ export class Executor { this.runtime = new RunHistory(runtimeId); this.logger.info(`pipeline.${this.pipeline.id} start`); await this.runWithHistory(this.pipeline, "pipeline", async () => { - return await this.runStages(); + await this.runStages(); }); } finally { this.logger.info(`pipeline.${this.pipeline.id} end`); } } - async runWithHistory(runnable: Runnable, runnableType: string, run: (result: HistoryResult) => Promise) { - const result = this.runtime.start(runnable, runnableType); - this.onChanged(this.runtime); + async runWithHistory(runnable: Runnable, runnableType: string, run: (status: HistoryStatus) => Promise) { + runnable.runnableType = runnableType; + const status = this.runtime.start(runnable); + await this.onChanged(this.runtime); const contextKey = `status.${runnable.id}`; + if (runnable.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) { + //如果是成功后跳过策略 const lastResult = await this.pipelineContext.get(contextKey); if (lastResult != null && lastResult === ResultType.success) { - this.runtime.log(runnable, result, "跳过"); + this.runtime.log(status, "跳过"); return ResultType.skip; } } try { - await run(result); + await run(status); this.runtime.success(runnable); - this.onChanged(this.runtime); + await this.onChanged(this.runtime); await this.pipelineContext.set(contextKey, ResultType.success); return ResultType.success; } catch (e: any) { this.logger.error(e); this.runtime.error(runnable, e); - this.onChanged(this.runtime); + await this.onChanged(this.runtime); await this.pipelineContext.set(contextKey, ResultType.error); throw e; } finally { @@ -70,19 +73,19 @@ export class Executor { private async runStages() { const resList: ResultType[] = []; for (const stage of this.pipeline.stages) { - const res: ResultType = await this.runWithHistory(stage, "stage", async (result) => { - return await this.runStage(stage, result); + const res: ResultType = await this.runWithHistory(stage, "stage", async () => { + await this.runStage(stage); }); resList.push(res); } return this.compositionResultType(resList); } - async runStage(stage: Stage, result: HistoryResult) { + async runStage(stage: Stage) { const runnerList = []; for (const task of stage.tasks) { - const runner = this.runWithHistory(task, "task", async (result) => { - return await this.runTask(task, result); + const runner = this.runWithHistory(task, "task", async () => { + await this.runTask(task); }); runnerList.push(runner); } @@ -115,20 +118,21 @@ export class Executor { return ResultType.error; } - private async runTask(task: Task, result: HistoryResult) { + private async runTask(task: Task) { const resList: ResultType[] = []; for (const step of task.steps) { - const res: ResultType = await this.runWithHistory(step, "step", async (result) => { - await this.runStep(step, result); + step.runnableType = "step"; + const res: ResultType = await this.runWithHistory(step, "step", async (status) => { + await this.runStep(step, status); }); resList.push(res); } return this.compositionResultType(resList); } - private async runStep(step: Step, result: HistoryResult) { + private async runStep(step: Step, status: HistoryStatus) { //执行任务 - const taskPlugin: TaskPlugin = await this.getPlugin(step.type, result.logger); + const taskPlugin: TaskPlugin = await this.getPlugin(step.type, status.logger); // @ts-ignore delete taskPlugin.define; const define = taskPlugin.getDefine(); diff --git a/packages/core/pipeline/src/core/run-history.ts b/packages/core/pipeline/src/core/run-history.ts index a4987082..4ba61aee 100644 --- a/packages/core/pipeline/src/core/run-history.ts +++ b/packages/core/pipeline/src/core/run-history.ts @@ -1,6 +1,14 @@ -import { Context, HistoryResult, Log, Runnable } from "../d.ts"; +import { Context, HistoryResult, Runnable } from "../d.ts"; import _ from "lodash"; -import { buildLogger, logger } from "../utils/util.log"; +import { buildLogger } from "../utils/util.log"; +import { Logger } from "log4js"; + +export type HistoryStatus = { + runnable: Runnable; + result: HistoryResult; + logs: string[]; + logger: Logger; +}; export class RunHistory { constructor(runtimeId: any) { @@ -8,64 +16,59 @@ export class RunHistory { } id: any; - logs: Log[] = []; + //运行时上下文变量 context: Context = {}; - results: { - [key: string]: HistoryResult; + status: { + [nodeId: string]: HistoryStatus; } = {}; - logger = logger; - start(runnable: Runnable, runnableType: string) { - const status = "start"; + start(runnable: Runnable): HistoryStatus { const now = new Date().getTime(); - _.merge(runnable, { status, lastTime: now }); - const result: HistoryResult = { - type: runnableType, - startTime: new Date().getTime(), - title: runnable.title, - status, + const status: HistoryStatus = { + runnable, + result: { + status: "start", + startTime: now, + }, logs: [], logger: buildLogger((text) => { - result.logs.push(text); + status.logs.push(text); }), }; - this.results[runnable.id] = result; - this.log(runnable, result, `${runnable.title} 开始执行`); - return result; + this.status[runnable.id] = status; + this.log(status, `开始执行`); + return status; } - success(runnable: Runnable, res?: any) { - const status = "success"; + success(runnable: Runnable) { const now = new Date().getTime(); - _.merge(runnable, { status, lastTime: now }); - const result = this.results[runnable.id]; - _.merge(result, { status, endTime: now }, res); - this.log(runnable, result, `${result.title} 执行成功`); + const status = this.status[runnable.id]; + _.merge(status, { + result: { + status: "success", + endTime: now, + }, + }); + this.log(status, `执行成功`); } error(runnable: Runnable, e: Error) { - const status = "error"; const now = new Date().getTime(); - _.merge(runnable, { status, lastTime: now }); - const result = this.results[runnable.id]; - _.merge(result, { - status, - endTime: now, - errorMessage: e.message, + const status = this.status[runnable.id]; + _.merge(status, { + result: { + status: "error", + endTime: now, + message: e.message, + }, }); - this.log(runnable, result, `${result.title} 执行异常:${e.message}`, status); + this.log(status, `执行异常:${e.message}`); } - log(runnable: Runnable, result: HistoryResult, text: string, level = "info") { - const now = new Date().getTime(); - result.logger.info(`[${runnable.title}] [${result.type}]`, text); - this.logs.push({ - time: now, - level, - title: runnable.title, - text, - }); + log(status: HistoryStatus, text: string) { + const runnable = status.runnable; + status.logger.info(`[${runnable.title}] [${runnable.runnableType}]`, text); } finally(runnable: Runnable) { diff --git a/packages/core/pipeline/src/d.ts/pipeline.ts b/packages/core/pipeline/src/d.ts/pipeline.ts index 3ee8eec8..210dfb08 100644 --- a/packages/core/pipeline/src/d.ts/pipeline.ts +++ b/packages/core/pipeline/src/d.ts/pipeline.ts @@ -1,5 +1,3 @@ -import { Logger } from "log4js"; - export enum RunStrategy { AlwaysRun, SkipWhenSucceed, @@ -62,6 +60,7 @@ export type Runnable = { status?: string; lastTime?: number; strategy?: RunnableStrategy; + runnableType?: string; // pipeline, stage, task , step }; export type Pipeline = Runnable & { @@ -94,8 +93,6 @@ export type HistoryResultGroup = { }; }; export type HistoryResult = { - type: string; - title: string; /** * 任务状态 */ @@ -105,8 +102,6 @@ export type HistoryResult = { /** * 处理结果 */ - result?: string; - errorMessage?: string; - logs: string[]; - logger: Logger; + result?: string; //success, error,skip + message?: string; }; diff --git a/packages/core/pipeline/src/registry/registry.ts b/packages/core/pipeline/src/registry/registry.ts index 08dff317..23448ab4 100644 --- a/packages/core/pipeline/src/registry/registry.ts +++ b/packages/core/pipeline/src/registry/registry.ts @@ -1,5 +1,3 @@ -import { Logger } from "log4js"; - export type Registrable = { name: string; title: string; diff --git a/packages/core/pipeline/src/utils/util.log.ts b/packages/core/pipeline/src/utils/util.log.ts index 6e48c676..c5698cab 100644 --- a/packages/core/pipeline/src/utils/util.log.ts +++ b/packages/core/pipeline/src/utils/util.log.ts @@ -1,4 +1,4 @@ -import log4js, { Appender, Logger, LoggingEvent } from "log4js"; +import log4js, { LoggingEvent } from "log4js"; const OutputAppender = { configure: (config: any, layouts: any, findAppender: any, levels: any) => {