perf: 优化跳过处理逻辑

pull/189/head
xiaojunnuo 2024-09-06 10:19:03 +08:00
parent 3bad0b2685
commit b80210f24b
6 changed files with 67 additions and 41 deletions

View File

@ -67,7 +67,7 @@ jobs:
- name: Build and push - name: Build and push
uses: docker/build-push-action@v6 uses: docker/build-push-action@v6
with: with:
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true push: true
context: ./packages/ui/ context: ./packages/ui/
tags: | tags: |

View File

@ -12,6 +12,7 @@ import { RegistryItem } from "../registry/index.js";
import { Decorator } from "../decorator/index.js"; import { Decorator } from "../decorator/index.js";
import { IEmailService } from "../service/index.js"; import { IEmailService } from "../service/index.js";
import { FileStore } from "./file-store.js"; import { FileStore } from "./file-store.js";
import { hashUtils } from "../utils/index.js";
// import { TimeoutPromise } from "../utils/util.promise.js"; // import { TimeoutPromise } from "../utils/util.promise.js";
export type ExecutorOptions = { export type ExecutorOptions = {
@ -69,6 +70,7 @@ export class Executor {
} }
async run(runtimeId: any = 0, triggerType: string) { async run(runtimeId: any = 0, triggerType: string) {
let intervalFlushLogId: any = undefined;
try { try {
await this.init(); await this.init();
const trigger = { type: triggerType }; const trigger = { type: triggerType };
@ -76,8 +78,14 @@ export class Executor {
this.runtime = new RunHistory(runtimeId, trigger, this.pipeline); this.runtime = new RunHistory(runtimeId, trigger, this.pipeline);
this.logger.info(`pipeline.${this.pipeline.id} start`); this.logger.info(`pipeline.${this.pipeline.id} start`);
await this.notification("start"); await this.notification("start");
this.runtime.start(this.pipeline);
intervalFlushLogId = setInterval(async () => {
await this.onChanged(this.runtime);
}, 5000);
await this.runWithHistory(this.pipeline, "pipeline", async () => { await this.runWithHistory(this.pipeline, "pipeline", async () => {
await this.runStages(this.pipeline); return await this.runStages(this.pipeline);
}); });
if (this.lastRuntime && this.lastRuntime.pipeline.status?.status === ResultType.error) { if (this.lastRuntime && this.lastRuntime.pipeline.status?.status === ResultType.error) {
await this.notification("turnToSuccess"); await this.notification("turnToSuccess");
@ -87,52 +95,33 @@ export class Executor {
await this.notification("error", e); await this.notification("error", e);
this.logger.error("pipeline 执行失败", e.stack); this.logger.error("pipeline 执行失败", e.stack);
} finally { } finally {
clearInterval(intervalFlushLogId);
await this.onChanged(this.runtime);
await this.pipelineContext.setObj("lastRuntime", this.runtime); await this.pipelineContext.setObj("lastRuntime", this.runtime);
this.logger.info(`pipeline.${this.pipeline.id} end`); this.logger.info(`pipeline.${this.pipeline.id} end`);
} }
} }
async runWithHistory(runnable: Runnable, runnableType: string, run: () => Promise<void>) { async runWithHistory(runnable: Runnable, runnableType: string, run: () => Promise<ResultType | void>) {
runnable.runnableType = runnableType; runnable.runnableType = runnableType;
this.runtime.start(runnable); this.runtime.start(runnable);
await this.onChanged(this.runtime);
const lastNode = this.lastStatusMap.get(runnable.id);
const lastResult = lastNode?.status?.status;
if (runnable.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) {
//如果是成功后跳过策略
let inputChanged = false;
if (runnableType === "step") {
const lastInput = JSON.stringify((lastNode as Step)?.input);
const step = runnable as Step;
const input = JSON.stringify(step.input);
if (input != null && lastInput !== input) {
//参数有变化
inputChanged = true;
}
}
if (lastResult != null && lastResult === ResultType.success && !inputChanged) {
runnable.status!.output = lastNode?.status?.output;
runnable.status!.files = lastNode?.status?.files;
this.runtime.skip(runnable);
await this.onChanged(this.runtime);
return ResultType.skip;
}
}
const intervalFlushLogId = setInterval(async () => {
await this.onChanged(this.runtime);
}, 5000);
// const timeout = runnable.timeout ?? 20 * 60 * 1000; // const timeout = runnable.timeout ?? 20 * 60 * 1000;
await this.onChanged(this.runtime);
try { try {
if (this.abort.signal.aborted) { if (this.abort.signal.aborted) {
this.runtime.cancel(runnable); this.runtime.cancel(runnable);
return ResultType.canceled; return ResultType.canceled;
} }
await run(); const resultType = await run();
if (this.abort.signal.aborted) { if (this.abort.signal.aborted) {
this.runtime.cancel(runnable); this.runtime.cancel(runnable);
return ResultType.canceled; return ResultType.canceled;
} }
if (resultType == ResultType.skip) {
this.runtime.skip(runnable);
return resultType;
}
this.runtime.success(runnable); this.runtime.success(runnable);
return ResultType.success; return ResultType.success;
} catch (e: any) { } catch (e: any) {
@ -140,7 +129,6 @@ export class Executor {
throw e; throw e;
} finally { } finally {
this.runtime.finally(runnable); this.runtime.finally(runnable);
clearInterval(intervalFlushLogId);
await this.onChanged(this.runtime); await this.onChanged(this.runtime);
} }
} }
@ -149,7 +137,7 @@ export class Executor {
const resList: ResultType[] = []; const resList: ResultType[] = [];
for (const stage of pipeline.stages) { for (const stage of pipeline.stages) {
const res: ResultType = await this.runWithHistory(stage, "stage", async () => { const res: ResultType = await this.runWithHistory(stage, "stage", async () => {
await this.runStage(stage); return await this.runStage(stage);
}); });
resList.push(res); resList.push(res);
} }
@ -162,6 +150,7 @@ export class Executor {
const runner = async () => { const runner = async () => {
return this.runWithHistory(task, "task", async () => { return this.runWithHistory(task, "task", async () => {
await this.runTask(task); await this.runTask(task);
return ResultType.success;
}); });
}; };
runnerList.push(runner); runnerList.push(runner);
@ -204,7 +193,7 @@ export class Executor {
for (const step of task.steps) { for (const step of task.steps) {
step.runnableType = "step"; step.runnableType = "step";
const res: ResultType = await this.runWithHistory(step, "step", async () => { const res: ResultType = await this.runWithHistory(step, "step", async () => {
await this.runStep(step); return await this.runStep(step);
}); });
resList.push(res); resList.push(res);
} }
@ -223,19 +212,40 @@ export class Executor {
// @ts-ignore // @ts-ignore
const define: PluginDefine = plugin.define; const define: PluginDefine = plugin.define;
//从outputContext读取输入参数 //从outputContext读取输入参数
Decorator.inject(define.input, instance, step.input, (item, key) => { const input = _.cloneDeep(step.input);
Decorator.inject(define.input, instance, input, (item, key) => {
if (item.component?.name === "pi-output-selector") { if (item.component?.name === "pi-output-selector") {
const contextKey = step.input[key]; const contextKey = input[key];
if (contextKey != null) { if (contextKey != null) {
// "cert": "step.-BNFVPMKPu2O-i9NiOQxP.cert", // "cert": "step.-BNFVPMKPu2O-i9NiOQxP.cert",
const arr = contextKey.split("."); const arr = contextKey.split(".");
const id = arr[1]; const id = arr[1];
const outputKey = arr[2]; const outputKey = arr[2];
step.input[key] = this.currentStatusMap.get(id)?.status?.output[outputKey] ?? this.lastStatusMap.get(id)?.status?.output[outputKey]; input[key] = this.currentStatusMap.get(id)?.status?.output[outputKey] ?? this.lastStatusMap.get(id)?.status?.output[outputKey];
} }
} }
}); });
const newInputHash = hashUtils.md5(JSON.stringify(input));
step.status!.inputHash = newInputHash;
//判断是否需要跳过
const lastNode = this.lastStatusMap.get(step.id);
const lastResult = lastNode?.status?.status;
if (step.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) {
//如果是成功后跳过策略
let inputChanged = true;
const lastInputHash = lastNode?.status?.inputHash;
if (lastInputHash && newInputHash && lastInputHash === newInputHash) {
//参数有变化
inputChanged = false;
}
if (lastResult != null && lastResult === ResultType.success && !inputChanged) {
step.status!.output = lastNode?.status?.output;
step.status!.files = lastNode?.status?.files;
return ResultType.skip;
}
}
const http = createAxiosService({ logger: currentLogger }); const http = createAxiosService({ logger: currentLogger });
const taskCtx: TaskInstanceContext = { const taskCtx: TaskInstanceContext = {
pipeline: this.pipeline, pipeline: this.pipeline,

View File

@ -119,6 +119,7 @@ export type HistoryResultGroup = {
}; };
export type HistoryResult = { export type HistoryResult = {
// input: any; // input: any;
inputHash?: string;
output: any; output: any;
files?: FileItem[]; files?: FileItem[];
/** /**

View File

@ -4,6 +4,7 @@ export * from "./util.log.js";
export * from "./util.file.js"; export * from "./util.file.js";
export * from "./util.sp.js"; export * from "./util.sp.js";
export * as promises from "./util.promise.js"; export * as promises from "./util.promise.js";
export * from "./util.hash.js";
export const utils = { export const utils = {
sleep, sleep,
http: request, http: request,

View File

@ -0,0 +1,9 @@
import crypto from "crypto";
function md5(data: string) {
return crypto.createHash("md5").update(data).digest("hex");
}
export const hashUtils = {
md5,
};

View File

@ -79,10 +79,15 @@ export class CertReader {
} finally { } finally {
//删除临时文件 //删除临时文件
logger.info("删除临时文件"); logger.info("删除临时文件");
fs.unlinkSync(tmpCrtPath); function removeFile(filepath?: string) {
fs.unlinkSync(tmpKeyPath); if (filepath) {
fs.unlinkSync(tmpPfxPath); fs.unlinkSync(filepath);
fs.unlinkSync(tmpDerPath); }
}
removeFile(tmpCrtPath);
removeFile(tmpKeyPath);
removeFile(tmpPfxPath);
removeFile(tmpDerPath);
} }
} }