perf: 流水线同一个阶段任务优化为并行执行

pull/361/head
xiaojunnuo 2025-03-11 00:46:03 +08:00
parent fc8bef5aae
commit efa9c748c5
4 changed files with 56 additions and 15 deletions

View File

@ -1,10 +1,10 @@
import crypto from 'crypto';
import crypto, { BinaryToTextEncoding } from 'crypto';
function md5(data: string) {
return crypto.createHash('md5').update(data).digest('hex');
function md5(data: string, digest: BinaryToTextEncoding = 'hex') {
return crypto.createHash('md5').update(data).digest(digest);
}
function sha256(data: string) {
return crypto.createHash('sha256').update(data).digest('hex');
function sha256(data: string, digest: BinaryToTextEncoding = 'hex') {
return crypto.createHash('sha256').update(data).digest(digest);
}
function base64(data: string) {

View File

@ -1,5 +1,15 @@
import { ResultError } from "../dt/index.js";
export class CancelError extends Error {
constructor(message: string) {
super(message);
}
}
export class RunnableError extends Error {
errors: ResultError[];
constructor(message: string, errors: ResultError[]) {
super(message);
this.errors = errors;
}
}

View File

@ -12,12 +12,23 @@ import { FileStore } from "./file-store.js";
import { cloneDeep, forEach, merge } from "lodash-es";
import { INotificationService } from "../notification/index.js";
import { taskEmitterCreate } from "../service/emit.js";
import { RunnableError } from "./exceptions.js";
export type SysInfo = {
//系统标题
title?: string;
};
export type ResultError = {
e: any;
returnType: ResultType;
runnable: Runnable;
};
export type ResultErrors = {
resultType: ResultType;
errors: ResultError[];
};
export type ExecutorOptions = {
pipeline: Pipeline;
storage: IStorage;
@ -152,10 +163,6 @@ export class Executor {
this.runtime.disabled(runnable);
return resultType;
}
if (resultType == ResultType.error) {
this.runtime.error(runnable, new Error("执行失败"));
return resultType;
}
this.runtime.success(runnable);
return ResultType.success;
} catch (e: any) {
@ -178,9 +185,6 @@ export class Executor {
const res: ResultType = await this.runWithHistory(stage, "stage", async () => {
return await this.runStage(stage);
});
if (res === ResultType.error) {
return ResultType.error;
}
resList.push(res);
}
return this.compositionResultType(resList);
@ -199,6 +203,8 @@ export class Executor {
}
let resList: ResultType[] = [];
const errorList: ResultError[] = [];
let errorMessage = "";
if (stage.concurrency === ConcurrencyStrategy.Parallel) {
//并行
const pList = [];
@ -212,11 +218,21 @@ export class Executor {
const runner = runnerList[i];
try {
resList[i] = await runner();
} catch (e:any) {
this.logger.error("任务执行异常,继续执行后续任务:", e.message);
} catch (e: any) {
const t = stage.tasks[i];
this.logger.error(`任务 ${t.title} 执行异常:`, e.message);
resList[i] = ResultType.error;
errorList.push({
e,
returnType: ResultType.error,
runnable: t,
});
errorMessage += `任务${t.title}执行失败,错误详情:${e.message}`;
}
}
if (errorList.length > 0) {
throw new RunnableError(errorMessage, errorList);
}
}
return this.compositionResultType(resList);
}
@ -414,7 +430,16 @@ export class Executor {
content = `流水线ID:${this.pipeline.id}运行ID:${this.runtime.id}`;
} else if (when === "error") {
subject = `执行失败,${this.pipeline.title}${this.pipeline.id}`;
content = `流水线ID:${this.pipeline.id}运行ID:${this.runtime.id}\n\n${this.currentStatusMap?.currentStep?.title} 执行失败\n\n错误详情:${error.message}`;
if (error instanceof RunnableError) {
const runnableError = error as RunnableError;
content = `流水线ID:${this.pipeline.id}运行ID:${this.runtime.id}\n\n`;
for (const re of runnableError.errors) {
content += ` - ${re.runnable.title} 执行失败\n 错误详情:${re.e.message}\n\n`;
}
} else {
content = `流水线ID:${this.pipeline.id}运行ID:${this.runtime.id}\n\n${this.currentStatusMap?.currentStep?.title} 执行失败\n\n错误详情:${error.message}`;
}
} else {
return;
}

View File

@ -107,6 +107,12 @@ export type Log = {
text: string;
};
export type ResultError = {
e: any;
returnType: ResultType;
runnable: Runnable;
};
export enum ResultType {
start = "start",
success = "success",