From b80210f24bf5db1c958d06ab27c9e5d3db452eda Mon Sep 17 00:00:00 2001 From: xiaojunnuo Date: Fri, 6 Sep 2024 10:19:03 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E8=B7=B3=E8=BF=87?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build-image.yml | 2 +- packages/core/pipeline/src/core/executor.ts | 82 +++++++++++-------- packages/core/pipeline/src/dt/pipeline.ts | 1 + packages/core/pipeline/src/utils/index.ts | 1 + packages/core/pipeline/src/utils/util.hash.ts | 9 ++ .../src/plugin/cert-plugin/cert-reader.ts | 13 ++- 6 files changed, 67 insertions(+), 41 deletions(-) create mode 100644 packages/core/pipeline/src/utils/util.hash.ts diff --git a/.github/workflows/build-image.yml b/.github/workflows/build-image.yml index ff1b2dd8..7f529888 100644 --- a/.github/workflows/build-image.yml +++ b/.github/workflows/build-image.yml @@ -67,7 +67,7 @@ jobs: - name: Build and push uses: docker/build-push-action@v6 with: - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64,linux/arm64,linux/arm/v7 push: true context: ./packages/ui/ tags: | diff --git a/packages/core/pipeline/src/core/executor.ts b/packages/core/pipeline/src/core/executor.ts index fa93c86e..4928dafa 100644 --- a/packages/core/pipeline/src/core/executor.ts +++ b/packages/core/pipeline/src/core/executor.ts @@ -12,6 +12,7 @@ import { RegistryItem } from "../registry/index.js"; import { Decorator } from "../decorator/index.js"; import { IEmailService } from "../service/index.js"; import { FileStore } from "./file-store.js"; +import { hashUtils } from "../utils/index.js"; // import { TimeoutPromise } from "../utils/util.promise.js"; export type ExecutorOptions = { @@ -69,6 +70,7 @@ export class Executor { } async run(runtimeId: any = 0, triggerType: string) { + let intervalFlushLogId: any = undefined; try { await this.init(); const trigger = { type: triggerType }; @@ -76,8 +78,14 @@ export class Executor { this.runtime = new RunHistory(runtimeId, trigger, this.pipeline); this.logger.info(`pipeline.${this.pipeline.id} start`); await this.notification("start"); + + this.runtime.start(this.pipeline); + intervalFlushLogId = setInterval(async () => { + await this.onChanged(this.runtime); + }, 5000); + await this.runWithHistory(this.pipeline, "pipeline", async () => { - await this.runStages(this.pipeline); + return await this.runStages(this.pipeline); }); if (this.lastRuntime && this.lastRuntime.pipeline.status?.status === ResultType.error) { await this.notification("turnToSuccess"); @@ -87,52 +95,33 @@ export class Executor { await this.notification("error", e); this.logger.error("pipeline 执行失败", e.stack); } finally { + clearInterval(intervalFlushLogId); + await this.onChanged(this.runtime); await this.pipelineContext.setObj("lastRuntime", this.runtime); this.logger.info(`pipeline.${this.pipeline.id} end`); } } - async runWithHistory(runnable: Runnable, runnableType: string, run: () => Promise) { + async runWithHistory(runnable: Runnable, runnableType: string, run: () => Promise) { runnable.runnableType = runnableType; this.runtime.start(runnable); - await this.onChanged(this.runtime); - const lastNode = this.lastStatusMap.get(runnable.id); - const lastResult = lastNode?.status?.status; - if (runnable.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) { - //如果是成功后跳过策略 - let inputChanged = false; - if (runnableType === "step") { - const lastInput = JSON.stringify((lastNode as Step)?.input); - const step = runnable as Step; - const input = JSON.stringify(step.input); - if (input != null && lastInput !== input) { - //参数有变化 - inputChanged = true; - } - } - if (lastResult != null && lastResult === ResultType.success && !inputChanged) { - runnable.status!.output = lastNode?.status?.output; - runnable.status!.files = lastNode?.status?.files; - this.runtime.skip(runnable); - await this.onChanged(this.runtime); - return ResultType.skip; - } - } - const intervalFlushLogId = setInterval(async () => { - await this.onChanged(this.runtime); - }, 5000); - // const timeout = runnable.timeout ?? 20 * 60 * 1000; + await this.onChanged(this.runtime); + try { if (this.abort.signal.aborted) { this.runtime.cancel(runnable); return ResultType.canceled; } - await run(); + const resultType = await run(); if (this.abort.signal.aborted) { this.runtime.cancel(runnable); return ResultType.canceled; } + if (resultType == ResultType.skip) { + this.runtime.skip(runnable); + return resultType; + } this.runtime.success(runnable); return ResultType.success; } catch (e: any) { @@ -140,7 +129,6 @@ export class Executor { throw e; } finally { this.runtime.finally(runnable); - clearInterval(intervalFlushLogId); await this.onChanged(this.runtime); } } @@ -149,7 +137,7 @@ export class Executor { const resList: ResultType[] = []; for (const stage of pipeline.stages) { const res: ResultType = await this.runWithHistory(stage, "stage", async () => { - await this.runStage(stage); + return await this.runStage(stage); }); resList.push(res); } @@ -162,6 +150,7 @@ export class Executor { const runner = async () => { return this.runWithHistory(task, "task", async () => { await this.runTask(task); + return ResultType.success; }); }; runnerList.push(runner); @@ -204,7 +193,7 @@ export class Executor { for (const step of task.steps) { step.runnableType = "step"; const res: ResultType = await this.runWithHistory(step, "step", async () => { - await this.runStep(step); + return await this.runStep(step); }); resList.push(res); } @@ -223,19 +212,40 @@ export class Executor { // @ts-ignore const define: PluginDefine = plugin.define; //从outputContext读取输入参数 - Decorator.inject(define.input, instance, step.input, (item, key) => { + const input = _.cloneDeep(step.input); + Decorator.inject(define.input, instance, input, (item, key) => { if (item.component?.name === "pi-output-selector") { - const contextKey = step.input[key]; + const contextKey = input[key]; if (contextKey != null) { // "cert": "step.-BNFVPMKPu2O-i9NiOQxP.cert", const arr = contextKey.split("."); const id = arr[1]; const outputKey = arr[2]; - step.input[key] = this.currentStatusMap.get(id)?.status?.output[outputKey] ?? this.lastStatusMap.get(id)?.status?.output[outputKey]; + input[key] = this.currentStatusMap.get(id)?.status?.output[outputKey] ?? this.lastStatusMap.get(id)?.status?.output[outputKey]; } } }); + const newInputHash = hashUtils.md5(JSON.stringify(input)); + step.status!.inputHash = newInputHash; + //判断是否需要跳过 + const lastNode = this.lastStatusMap.get(step.id); + const lastResult = lastNode?.status?.status; + if (step.strategy?.runStrategy === RunStrategy.SkipWhenSucceed) { + //如果是成功后跳过策略 + let inputChanged = true; + const lastInputHash = lastNode?.status?.inputHash; + if (lastInputHash && newInputHash && lastInputHash === newInputHash) { + //参数有变化 + inputChanged = false; + } + if (lastResult != null && lastResult === ResultType.success && !inputChanged) { + step.status!.output = lastNode?.status?.output; + step.status!.files = lastNode?.status?.files; + return ResultType.skip; + } + } + const http = createAxiosService({ logger: currentLogger }); const taskCtx: TaskInstanceContext = { pipeline: this.pipeline, diff --git a/packages/core/pipeline/src/dt/pipeline.ts b/packages/core/pipeline/src/dt/pipeline.ts index 7cfa7885..90b74649 100644 --- a/packages/core/pipeline/src/dt/pipeline.ts +++ b/packages/core/pipeline/src/dt/pipeline.ts @@ -119,6 +119,7 @@ export type HistoryResultGroup = { }; export type HistoryResult = { // input: any; + inputHash?: string; output: any; files?: FileItem[]; /** diff --git a/packages/core/pipeline/src/utils/index.ts b/packages/core/pipeline/src/utils/index.ts index 2d90b162..76faa040 100644 --- a/packages/core/pipeline/src/utils/index.ts +++ b/packages/core/pipeline/src/utils/index.ts @@ -4,6 +4,7 @@ export * from "./util.log.js"; export * from "./util.file.js"; export * from "./util.sp.js"; export * as promises from "./util.promise.js"; +export * from "./util.hash.js"; export const utils = { sleep, http: request, diff --git a/packages/core/pipeline/src/utils/util.hash.ts b/packages/core/pipeline/src/utils/util.hash.ts new file mode 100644 index 00000000..d21f5fec --- /dev/null +++ b/packages/core/pipeline/src/utils/util.hash.ts @@ -0,0 +1,9 @@ +import crypto from "crypto"; + +function md5(data: string) { + return crypto.createHash("md5").update(data).digest("hex"); +} + +export const hashUtils = { + md5, +}; diff --git a/packages/plugins/plugin-cert/src/plugin/cert-plugin/cert-reader.ts b/packages/plugins/plugin-cert/src/plugin/cert-plugin/cert-reader.ts index af7478f8..0702b0f3 100644 --- a/packages/plugins/plugin-cert/src/plugin/cert-plugin/cert-reader.ts +++ b/packages/plugins/plugin-cert/src/plugin/cert-plugin/cert-reader.ts @@ -79,10 +79,15 @@ export class CertReader { } finally { //删除临时文件 logger.info("删除临时文件"); - fs.unlinkSync(tmpCrtPath); - fs.unlinkSync(tmpKeyPath); - fs.unlinkSync(tmpPfxPath); - fs.unlinkSync(tmpDerPath); + function removeFile(filepath?: string) { + if (filepath) { + fs.unlinkSync(filepath); + } + } + removeFile(tmpCrtPath); + removeFile(tmpKeyPath); + removeFile(tmpPfxPath); + removeFile(tmpDerPath); } }