refactor: pipeline run log

pull/9/head^2
xiaojunnuo 2022-10-29 01:10:55 +08:00
parent 3dfdd3c31c
commit dcdb25d92d
5 changed files with 72 additions and 72 deletions

View File

@ -1,6 +1,6 @@
import { ConcurrencyStrategy, HistoryResult, HistoryResultGroup, Pipeline, ResultType, Runnable, RunStrategy, Stage, Step, Task } from "../d.ts"; import { ConcurrencyStrategy, Pipeline, ResultType, Runnable, RunStrategy, Stage, Step, Task } from "../d.ts";
import _ from "lodash"; import _ from "lodash";
import { RunHistory } from "./run-history"; import { HistoryStatus, RunHistory } from "./run-history";
import { pluginRegistry, TaskPlugin } from "../plugin"; import { pluginRegistry, TaskPlugin } from "../plugin";
import { IAccessService } from "../access/access-service"; import { IAccessService } from "../access/access-service";
import { ContextFactory, IContext } from "./context"; import { ContextFactory, IContext } from "./context";
@ -32,34 +32,37 @@ export class Executor {
this.runtime = new RunHistory(runtimeId); this.runtime = new RunHistory(runtimeId);
this.logger.info(`pipeline.${this.pipeline.id} start`); this.logger.info(`pipeline.${this.pipeline.id} start`);
await this.runWithHistory(this.pipeline, "pipeline", async () => { await this.runWithHistory(this.pipeline, "pipeline", async () => {
return await this.runStages(); await this.runStages();
}); });
} finally { } finally {
this.logger.info(`pipeline.${this.pipeline.id} end`); this.logger.info(`pipeline.${this.pipeline.id} end`);
} }
} }
async runWithHistory(runnable: Runnable, runnableType: string, run: (result: HistoryResult) => Promise<any>) { async runWithHistory(runnable: Runnable, runnableType: string, run: (status: HistoryStatus) => Promise<void>) {
const result = this.runtime.start(runnable, runnableType); runnable.runnableType = runnableType;
this.onChanged(this.runtime); const status = this.runtime.start(runnable);
await this.onChanged(this.runtime);
const contextKey = `status.${runnable.id}`; const contextKey = `status.${runnable.id}`;
if (runnable.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) { if (runnable.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) {
//如果是成功后跳过策略
const lastResult = await this.pipelineContext.get(contextKey); const lastResult = await this.pipelineContext.get(contextKey);
if (lastResult != null && lastResult === ResultType.success) { if (lastResult != null && lastResult === ResultType.success) {
this.runtime.log(runnable, result, "跳过"); this.runtime.log(status, "跳过");
return ResultType.skip; return ResultType.skip;
} }
} }
try { try {
await run(result); await run(status);
this.runtime.success(runnable); this.runtime.success(runnable);
this.onChanged(this.runtime); await this.onChanged(this.runtime);
await this.pipelineContext.set(contextKey, ResultType.success); await this.pipelineContext.set(contextKey, ResultType.success);
return ResultType.success; return ResultType.success;
} catch (e: any) { } catch (e: any) {
this.logger.error(e); this.logger.error(e);
this.runtime.error(runnable, e); this.runtime.error(runnable, e);
this.onChanged(this.runtime); await this.onChanged(this.runtime);
await this.pipelineContext.set(contextKey, ResultType.error); await this.pipelineContext.set(contextKey, ResultType.error);
throw e; throw e;
} finally { } finally {
@ -70,19 +73,19 @@ export class Executor {
private async runStages() { private async runStages() {
const resList: ResultType[] = []; const resList: ResultType[] = [];
for (const stage of this.pipeline.stages) { for (const stage of this.pipeline.stages) {
const res: ResultType = await this.runWithHistory(stage, "stage", async (result) => { const res: ResultType = await this.runWithHistory(stage, "stage", async () => {
return await this.runStage(stage, result); await this.runStage(stage);
}); });
resList.push(res); resList.push(res);
} }
return this.compositionResultType(resList); return this.compositionResultType(resList);
} }
async runStage(stage: Stage, result: HistoryResult) { async runStage(stage: Stage) {
const runnerList = []; const runnerList = [];
for (const task of stage.tasks) { for (const task of stage.tasks) {
const runner = this.runWithHistory(task, "task", async (result) => { const runner = this.runWithHistory(task, "task", async () => {
return await this.runTask(task, result); await this.runTask(task);
}); });
runnerList.push(runner); runnerList.push(runner);
} }
@ -115,20 +118,21 @@ export class Executor {
return ResultType.error; return ResultType.error;
} }
private async runTask(task: Task, result: HistoryResult) { private async runTask(task: Task) {
const resList: ResultType[] = []; const resList: ResultType[] = [];
for (const step of task.steps) { for (const step of task.steps) {
const res: ResultType = await this.runWithHistory(step, "step", async (result) => { step.runnableType = "step";
await this.runStep(step, result); const res: ResultType = await this.runWithHistory(step, "step", async (status) => {
await this.runStep(step, status);
}); });
resList.push(res); resList.push(res);
} }
return this.compositionResultType(resList); return this.compositionResultType(resList);
} }
private async runStep(step: Step, result: HistoryResult) { private async runStep(step: Step, status: HistoryStatus) {
//执行任务 //执行任务
const taskPlugin: TaskPlugin = await this.getPlugin(step.type, result.logger); const taskPlugin: TaskPlugin = await this.getPlugin(step.type, status.logger);
// @ts-ignore // @ts-ignore
delete taskPlugin.define; delete taskPlugin.define;
const define = taskPlugin.getDefine(); const define = taskPlugin.getDefine();

View File

@ -1,6 +1,14 @@
import { Context, HistoryResult, Log, Runnable } from "../d.ts"; import { Context, HistoryResult, Runnable } from "../d.ts";
import _ from "lodash"; import _ from "lodash";
import { buildLogger, logger } from "../utils/util.log"; import { buildLogger } from "../utils/util.log";
import { Logger } from "log4js";
export type HistoryStatus = {
runnable: Runnable;
result: HistoryResult;
logs: string[];
logger: Logger;
};
export class RunHistory { export class RunHistory {
constructor(runtimeId: any) { constructor(runtimeId: any) {
@ -8,64 +16,59 @@ export class RunHistory {
} }
id: any; id: any;
logs: Log[] = []; //运行时上下文变量
context: Context = {}; context: Context = {};
results: { status: {
[key: string]: HistoryResult; [nodeId: string]: HistoryStatus;
} = {}; } = {};
logger = logger;
start(runnable: Runnable, runnableType: string) { start(runnable: Runnable): HistoryStatus {
const status = "start";
const now = new Date().getTime(); const now = new Date().getTime();
_.merge(runnable, { status, lastTime: now }); const status: HistoryStatus = {
const result: HistoryResult = { runnable,
type: runnableType, result: {
startTime: new Date().getTime(), status: "start",
title: runnable.title, startTime: now,
status, },
logs: [], logs: [],
logger: buildLogger((text) => { logger: buildLogger((text) => {
result.logs.push(text); status.logs.push(text);
}), }),
}; };
this.results[runnable.id] = result; this.status[runnable.id] = status;
this.log(runnable, result, `${runnable.title}<id:${runnable.id}> 开始执行`); this.log(status, `开始执行`);
return result; return status;
} }
success(runnable: Runnable, res?: any) { success(runnable: Runnable) {
const status = "success";
const now = new Date().getTime(); const now = new Date().getTime();
_.merge(runnable, { status, lastTime: now }); const status = this.status[runnable.id];
const result = this.results[runnable.id]; _.merge(status, {
_.merge(result, { status, endTime: now }, res); result: {
this.log(runnable, result, `${result.title}<id:${runnable.id}> 执行成功`); status: "success",
endTime: now,
},
});
this.log(status, `执行成功`);
} }
error(runnable: Runnable, e: Error) { error(runnable: Runnable, e: Error) {
const status = "error";
const now = new Date().getTime(); const now = new Date().getTime();
_.merge(runnable, { status, lastTime: now }); const status = this.status[runnable.id];
const result = this.results[runnable.id]; _.merge(status, {
_.merge(result, { result: {
status, status: "error",
endTime: now, endTime: now,
errorMessage: e.message, message: e.message,
},
}); });
this.log(runnable, result, `${result.title}<id:${runnable.id}> 执行异常:${e.message}`, status); this.log(status, `执行异常:${e.message}`);
} }
log(runnable: Runnable, result: HistoryResult, text: string, level = "info") { log(status: HistoryStatus, text: string) {
const now = new Date().getTime(); const runnable = status.runnable;
result.logger.info(`[${runnable.title}] [${result.type}]`, text); status.logger.info(`[${runnable.title}]<id:${runnable.id}> [${runnable.runnableType}]`, text);
this.logs.push({
time: now,
level,
title: runnable.title,
text,
});
} }
finally(runnable: Runnable) { finally(runnable: Runnable) {

View File

@ -1,5 +1,3 @@
import { Logger } from "log4js";
export enum RunStrategy { export enum RunStrategy {
AlwaysRun, AlwaysRun,
SkipWhenSucceed, SkipWhenSucceed,
@ -62,6 +60,7 @@ export type Runnable = {
status?: string; status?: string;
lastTime?: number; lastTime?: number;
strategy?: RunnableStrategy; strategy?: RunnableStrategy;
runnableType?: string; // pipeline, stage, task , step
}; };
export type Pipeline = Runnable & { export type Pipeline = Runnable & {
@ -94,8 +93,6 @@ export type HistoryResultGroup = {
}; };
}; };
export type HistoryResult = { export type HistoryResult = {
type: string;
title: string;
/** /**
* *
*/ */
@ -105,8 +102,6 @@ export type HistoryResult = {
/** /**
* *
*/ */
result?: string; result?: string; //success, error,skip
errorMessage?: string; message?: string;
logs: string[];
logger: Logger;
}; };

View File

@ -1,5 +1,3 @@
import { Logger } from "log4js";
export type Registrable = { export type Registrable = {
name: string; name: string;
title: string; title: string;

View File

@ -1,4 +1,4 @@
import log4js, { Appender, Logger, LoggingEvent } from "log4js"; import log4js, { LoggingEvent } from "log4js";
const OutputAppender = { const OutputAppender = {
configure: (config: any, layouts: any, findAppender: any, levels: any) => { configure: (config: any, layouts: any, findAppender: any, levels: any) => {