diff --git a/package-lock.json b/package-lock.json index 37decbc..f019ba2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12141,7 +12141,7 @@ }, "packages/engine": { "name": "@serverless-cd/engine", - "version": "0.0.54", + "version": "0.0.58", "license": "ISC", "dependencies": { "@serverless-cd/core": "^0.0.26", diff --git a/packages/engine/__tests__/engine.test.ts b/packages/engine/__tests__/engine.test.ts index a0a773e..2894f13 100644 --- a/packages/engine/__tests__/engine.test.ts +++ b/packages/engine/__tests__/engine.test.ts @@ -1,4 +1,5 @@ -import Engine, { IStepOptions, IContext } from '../lib'; +// import Engine, { IStepOptions, IContext } from '../lib'; +import Engine, { IStepOptions, IContext } from '../src'; import { lodash } from '@serverless-cd/core'; import * as path from 'path'; const { get, map } = lodash; @@ -258,7 +259,7 @@ test('inputs测试 env', async () => { ]); }); -test.only('测试plugin安装逻辑', async () => { +test('测试plugin安装逻辑', async () => { const steps = [ { run: 'echo "hello"', id: 'xhello' }, { plugin: '@serverless-cd/ding-talk', id: 'ding' }, @@ -277,4 +278,108 @@ test.only('测试plugin安装逻辑', async () => { { status: 'success', name: 'Run echo "hello"' }, { status: 'failure', name: 'Run @serverless-cd/ding-talk' }, ]); +}); + +// 新增性能统计测试 +test('性能统计功能测试', async () => { + const steps = [ + { run: 'echo "step1"' }, + { run: 'sleep 1 && echo "step2"' }, // 添加短暂延迟 + { run: 'echo "step3"' }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix } + }); + + const res: IContext | undefined = await engine.start(); + + // 验证任务执行成功 + expect(res?.status).toBe('success'); + + // 验证性能数据存在 + expect(res?.performance).toBeDefined(); + + // 验证初始化耗时存在 + expect(res?.performance?.initTime).toBeDefined(); + expect(res?.performance?.initTime).toBeGreaterThanOrEqual(0); + + // 验证总耗时存在 + expect(res?.performance?.totalTime).toBeDefined(); + expect(res?.performance?.totalTime).toBeGreaterThanOrEqual(0); + + // 验证步骤耗时存在 + expect(res?.performance?.stepTimes).toBeDefined(); + expect(Object.keys(res?.performance?.stepTimes || {})).toHaveLength(steps.length); + + // 验证任务状态和步骤数 + expect(res?.performance?.taskStatus).toBe('success'); + + // 验证除初始化外每个步骤都有耗时记录 + for (const step of res?.steps || []) { + if (step.stepCount&&step.stepCount!=='0') { + expect(res?.performance?.stepTimes?.[step.stepCount]).toBeDefined(); + expect(res?.performance?.stepTimes?.[step.stepCount]).toBeGreaterThanOrEqual(0); + } + } +}); + +// 测试带超时的性能统计 +test('带超时的性能统计功能测试', async () => { + const steps = [ + { run: 'echo "step1"' }, + { run: 'sleep 0.1 && echo "step2"' }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix }, + stepTimeout: 3 + }); + + const res: IContext | undefined = await engine.start(); + + // 验证任务执行成功 + expect(res?.status).toBe('success'); + // 验证性能数据存在 + expect(res?.performance).toBeDefined(); + // 验证超时配置被正确传递 + // 这部分验证需要查看Engine内部实现 +}); + +// 测试失败情况下的性能统计 +test('失败情况下的性能统计功能测试', async () => { + const steps = [ + { run: 'echo "step1"' }, + { run: 'exit 1' }, // 故意失败的步骤 + { run: 'echo "step3"' }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix } + }); + + const res: IContext | undefined = await engine.start(); + + // 验证任务执行失败 + expect(res?.status).toBe('failure'); + + // 验证性能数据仍然存在 + expect(res?.performance).toBeDefined(); + + // 验证初始化耗时存在 + expect(res?.performance?.initTime).toBeDefined(); + expect(res?.performance?.initTime).toBeGreaterThanOrEqual(0); + + // 验证总耗时存在 + expect(res?.performance?.totalTime).toBeDefined(); + expect(res?.performance?.totalTime).toBeGreaterThanOrEqual(0); + + // 验证步骤耗时存在(即使任务失败也应该记录) + expect(res?.performance?.stepTimes).toBeDefined(); + + // 验证任务状态 + expect(res?.performance?.taskStatus).toBe('failure'); }); \ No newline at end of file diff --git a/packages/engine/__tests__/failure.test.ts b/packages/engine/__tests__/failure.test.ts index db32145..68bd192 100644 --- a/packages/engine/__tests__/failure.test.ts +++ b/packages/engine/__tests__/failure.test.ts @@ -11,6 +11,15 @@ test('某一步执行失败,错误信息记录在context.error', async () => { { run: 'echo "world"', id: 'xworld' }, ] as IStepOptions[]; + const engine = new Engine({ steps, logConfig: { logPrefix } }); + const res: IContext | undefined = await engine.start(); + expect(res.error).toBeInstanceOf(Object); +}); + +test('路径错误,正常抛出', async () => { + const steps = [ + { run: 'npm install', id: 'xhello', "working-directory": './xsadads' }, + ] as IStepOptions[]; const engine = new Engine({ steps, logConfig: { logPrefix } }); const res: IContext | undefined = await engine.start(); expect(res.error).toBeInstanceOf(Error); diff --git a/packages/engine/__tests__/fixtures/timeout-plugin/index.js b/packages/engine/__tests__/fixtures/timeout-plugin/index.js new file mode 100644 index 0000000..16bdb4e --- /dev/null +++ b/packages/engine/__tests__/fixtures/timeout-plugin/index.js @@ -0,0 +1,10 @@ +module.exports = { + run: async (inputs, context, logger) => { + // 模拟一个长时间运行的插件 + return new Promise((resolve) => { + setTimeout(() => { + resolve({ success: true }); + }, 5000); // 5秒延迟 + }); + } +}; \ No newline at end of file diff --git a/packages/engine/__tests__/mock/run.yaml b/packages/engine/__tests__/mock/run.yaml index e4cc38d..62622f6 100644 --- a/packages/engine/__tests__/mock/run.yaml +++ b/packages/engine/__tests__/mock/run.yaml @@ -16,8 +16,8 @@ steps: echo "Serverless Devs is not installed." exit 1 fi - s config add --AccessKeyID "${access_key_id}" --NewAccessKeySecret "${access_key_secret}" \ - --AccountID "${uid}" --NewSecurityToken "${security_token}" --access "${alias}" -f + s config add --AccessKeyID "${access_key_id}" --AccessKeySecret "${access_key_secret}" \ + --AccountID "${uid}" --SecurityToken "${security_token}" --access "${alias}" -f if [[ $? -ne 0 ]]; then echo "Failed to setup Serverless Devs." exit 1 diff --git a/packages/engine/__tests__/timeout.test.ts b/packages/engine/__tests__/timeout.test.ts new file mode 100644 index 0000000..cb7fb0d --- /dev/null +++ b/packages/engine/__tests__/timeout.test.ts @@ -0,0 +1,379 @@ +import Engine, { IStepOptions } from '../src'; +import * as path from 'path'; +import { TimeoutError } from '../src/types'; + +const logPrefix = path.join(__dirname, 'logs/performance'); + +describe('Engine超时相关单测', () => { + beforeEach(() => { + // 清理日志目录 + const fs = require('fs'); + if (fs.existsSync(logPrefix)) { + const rimraf = require('rimraf'); + rimraf.sync(logPrefix); + } + }); + + test('step1超时', async () => { + const steps = [ + { + run: 'sleep 1', + timeout: 2, // 2秒超时 + }, + { + run: 'sleep 3', + timeout: 2, + }, + { + run: 'sleep 1', + } + ] as IStepOptions[]; + const engine = new Engine({ + steps, + stepTimeout:2, + logConfig: { logPrefix, logLevel: 'DEBUG' }, + }); + + const result = await engine.start(); + console.log(result); + expect(result.status).toBe('timeout-failure'); + expect(result.steps[2].error).toBeInstanceOf(TimeoutError); + expect(result.steps[2].error!.message).toContain('Step'); + expect(result.steps[2].error!.message).toContain('timeout after 2s'); + }); + + test('plugin超时', async () => { + // 创建一个会超时的插件 + const steps = [ + { + plugin: path.join(__dirname, 'fixtures', 'timeout-plugin'), + timeout: 1, // 1秒超时 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('timeout-failure'); + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + }); + + test('Engine全局step配置超时', async () => { + const steps = [ + { + run: 'sleep 3', + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + stepTimeout: 1, // 1秒超时 + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('timeout-failure'); + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + }); + + test('超时后不执行后续步骤', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 1, // 步骤超时1秒 + }, + { + run: 'sleep 1', // 这个步骤不会执行到 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + stepTimeout: 5, // 引擎默认超时5秒 + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('timeout-failure'); + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + }); + + test('成功执行,不超时', async () => { + const steps = [ + { + run: 'echo "quick command"', + timeout: 5, // 5秒超时 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('success'); + }); + + test('continue-on-error 在 step 超时时生效', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 1, // 1秒超时 + 'continue-on-error': true, + }, + { + run: 'echo "next step"', + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + // 第一个步骤应该超时但继续执行 + expect(result.status).toBe('success'); + // 应该有3个步骤(包括初始化步骤) + expect(result.steps.length).toBe(3); + // 初始化步骤应该成功 + expect(result.steps[0].status).toBe('success'); + // 第一个步骤应该超时但继续 + expect(result.steps[1].status).toBe('error-with-continue'); + // 第二个步骤应该成功 + expect(result.steps[2].status).toBe('success'); + // 检查第一个步骤的错误信息 + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + }); + + test('step超时时没有continue-on-error不继续执行', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 1, // 1秒超时 + // 没有 continue-on-error + }, + { + run: 'echo "next step"', + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + // 第一个步骤超时失败,不继续执行 + expect(result.status).toBe('timeout-failure'); + // 应该有3个步骤(包括初始化步骤) + expect(result.steps.length).toBe(3); + // 初始化步骤应该成功 + expect(result.steps[0].status).toBe('success'); + // 第一个步骤应该超时失败 + expect(result.steps[1].status).toBe('timeout-failure'); + // 第二个步骤应该跳过 + expect(result.steps[2].status).toBe('skipped'); + // 检查第一个步骤的错误信息 + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + }); + + test('超时配置优先级:step配置超时优先于全局默认超时', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 2, // step配置超时2秒 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + stepTimeout: 1, // 全局默认超时1秒(小于step配置超时) + logConfig: { logPrefix }, + }); + + // 应该在2秒后超时,因为step配置超时优先于全局默认超时 + const startTime = Date.now(); + const result = await engine.start(); + const duration = Date.now() - startTime; + + expect(result.status).toBe('timeout-failure'); + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 2s'); + // 应该在2秒左右超时 + expect(duration).toBeGreaterThan(1500); + expect(duration).toBeLessThan(3500); + }); + + test('超时配置优先级:step配置超时优先于全局默认超时,此时应该成功', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 4, // step配置超时2秒 + }, + ] as IStepOptions[]; + const engine = new Engine({ + steps, + stepTimeout: 1, // 全局默认超时1秒 + logConfig: { logPrefix }, + }); + const startTime = Date.now(); + const result = await engine.start(); + const duration = Date.now() - startTime; + expect(result.status).toBe('success'); + expect(duration).toBeGreaterThan(2500); + expect(duration).toBeLessThan(4500); + }); + + // 全局超时测试 + test('全局超时:单个步骤执行时间正常但总时间超过全局超时', async () => { + const steps = [ + { + run: 'sleep 2', // 第一个步骤2秒 + }, + { + run: 'sleep 2', // 第二个步骤2秒 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + timeout: 3, // 全局超时3秒(总共需要4秒以上) + stepTimeout: 10, // 单个步骤超时10秒 + logConfig: { logPrefix }, + }); + + const startTime = Date.now(); + const result = await engine.start(); + const duration = Date.now() - startTime; + + expect(result.status).toBe('timeout-failure'); + expect(result.error).toBeInstanceOf(TimeoutError); + expect(result.error!.message).toContain('Global timeout after 3s'); + // 应该在3秒左右超时 + expect(duration).toBeGreaterThan(2500); + expect(duration).toBeLessThan(4000); + }); + + test('全局超时:在第一个步骤中就触发全局超时', async () => { + const steps = [ + { + run: 'sleep 5', // 第一个步骤5秒 + }, + { + run: 'echo "second step"', // 第二个步骤不应该执行 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + timeout: 2, // 全局超时2秒 + stepTimeout: 10, // 单个步骤超时10秒 + logConfig: { logPrefix }, + }); + + const startTime = Date.now(); + const result = await engine.start(); + const duration = Date.now() - startTime; + + expect(result.status).toBe('timeout-failure'); + expect(result.error).toBeInstanceOf(TimeoutError); + expect(result.error!.message).toContain('Global timeout after 2s'); + // 应该在2秒左右超时 + expect(duration).toBeGreaterThan(1500); + expect(duration).toBeLessThan(3000); + // 第二个步骤应该跳过 + expect(result.steps.length).toBe(3); // 初始化 + 第一个步骤 + 第二个步骤(未处理) + expect(result.steps[2].status).toBe('pending'); // 第二个步骤由于全局超时而未被处理 + }); + + test('全局超时:步骤超时优先级高于全局超时', async () => { + const steps = [ + { + run: 'sleep 3', + timeout: 1, // 步骤超时1秒 + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + timeout: 5, // 全局超时5秒 + logConfig: { logPrefix }, + }); + + const startTime = Date.now(); + const result = await engine.start(); + const duration = Date.now() - startTime; + + // 应该是步骤超时,而不是全局超时 + expect(result.status).toBe('timeout-failure'); + expect(result.steps[1].error).toBeInstanceOf(TimeoutError); + expect(result.steps[1].error!.message).toContain('Step'); + expect(result.steps[1].error!.message).toContain('timeout after 1s'); + // 应该在1秒左右超时 + expect(duration).toBeGreaterThan(500); + expect(duration).toBeLessThan(2000); + }); + + test('全局超时:正常执行不超时', async () => { + const steps = [ + { + run: 'echo "step 1"', + }, + { + run: 'echo "step 2"', + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + timeout: 10, // 全局超时10秒 + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('success'); + expect(result.error).toBeUndefined(); + }); + + test('全局超时:全局超时时continue-on-error不生效', async () => { + const steps = [ + { + run: 'sleep 2', + 'continue-on-error': true, + }, + { + run: 'echo "second step"', + }, + ] as IStepOptions[]; + + const engine = new Engine({ + steps, + timeout: 1, // 全局超时1秒 + logConfig: { logPrefix }, + }); + + const result = await engine.start(); + expect(result.status).toBe('timeout-failure'); + expect(result.error).toBeInstanceOf(TimeoutError); + expect(result.error!.message).toContain('Global timeout after 1s'); + // 后续步骤不应该执行 + expect(result.steps.length).toBe(3); // 初始化 + 第一个步骤 + 第二个步骤(未处理) + expect(result.steps[2].status).toBe('pending'); // 第二个步骤由于全局超时而未被处理 + }); +}); \ No newline at end of file diff --git a/packages/engine/package.json b/packages/engine/package.json index 681b464..5499895 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -1,6 +1,6 @@ { "name": "@serverless-cd/engine", - "version": "0.0.54", + "version": "0.0.59-beta.1", "description": "sevrerless-cd engine lib", "main": "./lib/index.js", "scripts": { diff --git a/packages/engine/src/daemon/libs/report.ts b/packages/engine/src/daemon/libs/report.ts index 7cf64eb..8000dee 100644 --- a/packages/engine/src/daemon/libs/report.ts +++ b/packages/engine/src/daemon/libs/report.ts @@ -15,7 +15,16 @@ interface IRecordException { message: string; } -type IReportOptions = IReportCommand | IRecordException; +interface IReportPerformance { + type: EReportType.performance, + userAgent: string; + taskId:string, // 从环境变量获取任务ID + performance: string; // JSON字符串化的性能数据, + timestamp: number; // 当前时间戳 + reportUrl: string; // 用户自定义的性能上报URL +} + +type IReportOptions = IReportCommand | IRecordException | IReportPerformance; class Report { constructor(private options = {} as IReportOptions) { } @@ -27,6 +36,10 @@ class Report { if (type === EReportType.exception) { return await this.reportException(); } + // 新增性能数据上报处理 + if (type === EReportType.performance) { + return await this.reportPerformance(); + } } async reportCommand() { const { type, userAgent, plugin } = this.options as IReportCommand; @@ -38,6 +51,12 @@ class Report { const url = `${REPORT_BASE_URL}?APIVersion=0.6.0&trackerType=${type}&userAgent=${userAgent}&plugin=${plugin}&message=${message}`; await this.report(url) } + // 新增性能数据上报方法 + async reportPerformance() { + const { userAgent, performance, taskId, timestamp, reportUrl } = this.options as IReportPerformance; + const url = `${reportUrl}?APIVersion=0.6.0&trackerType=performance&userAgent=${userAgent}&taskId=${taskId}&performance=${performance}×tamp=${timestamp}`; + await this.report(url) + } async report(url: string) { await axios.get(url, { timeout: 3000 }); } diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 96b5637..d4b8721 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -3,11 +3,12 @@ import { createMachine, interpret } from 'xstate'; import { command } from 'execa'; import * as path from 'path'; import os from 'os'; -import { IStepOptions, IRunOptions, IPluginOptions, IRecord, IStatus, IEngineOptions, IContext, ILogConfig, STEP_STATUS, ISteps, STEP_IF, EReportType } from './types'; -import { parsePlugin, getProcessTime, getDefaultInitLog, getLogPath, getPluginRequirePath, stringify, getUserAgent } from './utils'; -import { INIT_STEP_COUNT, INIT_STEP_NAME, COMPLETED_STEP_COUNT, DEFAULT_COMPLETED_LOG, SERVERLESS_CD_KEY, SERVERLESS_CD_VALUE } from './constants'; +import { IStepOptions, IRunOptions, IPluginOptions, IRecord, IStatus, IEngineOptions, IContext, ILogConfig, STEP_STATUS, STEP_STATUS_BASE,ISteps, STEP_IF, EReportType,TimeoutError,IPerformanceData } from './types'; +import { parsePlugin, getProcessTime, getDefaultInitLog, getLogPath, getPluginRequirePath, stringify, getUserAgent, TAG_MESSAGE } from './utils'; +import { INIT_STEP_COUNT, INIT_STEP_NAME, COMPLETED_STEP_COUNT, DEFAULT_COMPLETED_LOG, SERVERLESS_CD_KEY, SERVERLESS_CD_VALUE, REPORT_BASE_URL } from './constants'; import execDaemon from './exec-daemon'; import { filter, join } from 'lodash'; +import fs from 'fs'; export { IStepOptions, IContext } from './types'; @@ -19,6 +20,10 @@ class Engine { public context = { status: STEP_STATUS.PENING, completed: false } as IContext; private record = { status: STEP_STATUS.PENING, editStatusAble: true } as IRecord; private logger: any; + private stepTimeoutId: NodeJS.Timeout | null = null; // 步骤超时定时器 + private globalTimeoutId: NodeJS.Timeout | null = null; // 全局超时定时器 + private startTime: number = 0; // 任务开始时间 + private initStartTime: number = 0; // 初始化开始时间 constructor(private options: IEngineOptions) { debug('engine start'); debug(`engine options: ${stringify(options)}`); @@ -29,6 +34,9 @@ class Engine { // 记录上下文信息 this.context.cwd = cwd; this.context.inputs = inputs as {}; + this.context.performance = { + stepTimes: {}, + } as IPerformanceData; this.doUnsetEnvs(); } private async doUnsetEnvs() { @@ -40,11 +48,13 @@ class Engine { } } private async doInit() { + this.initStartTime = Date.now(); // 记录初始化开始时间 const { events } = this.options; this.context.status = STEP_STATUS.RUNNING; const startTime = Date.now(); const filePath = getLogPath(INIT_STEP_COUNT); this.logger = this.getLogger(filePath); + this.logger.info(TAG_MESSAGE.INIT_START); this.logger.info(getDefaultInitLog()); try { const res = await events?.onInit?.(this.context, this.logger); @@ -59,7 +69,12 @@ class Engine { }; // 优先读取 doInit 返回的 steps 数据,其次 行参里的 steps 数据 const steps = await parsePlugin(res?.steps || this.options.steps, this); + const initTime = getProcessTime(this.initStartTime); + if (this.context.performance) { + this.context.performance.initTime = initTime; + } await this.doOss(filePath); + this.logger.info(TAG_MESSAGE.INIT_SUCCESS); return { ...res, steps }; } catch (error) { debug(`onInit error: ${error}`); @@ -74,49 +89,61 @@ class Engine { error, }; this.context.error = error as Error; - await this.doOss(filePath); const steps = await parsePlugin(this.options.steps as IStepOptions[], this); + const initTime = getProcessTime(this.initStartTime); + if (this.context.performance) { + this.context.performance.initTime = initTime; + } + await this.doOss(filePath); + this.logger.info(TAG_MESSAGE.INIT_FAIL); return { steps }; } } async start(): Promise { + this.startTime = Date.now(); const { steps } = await this.doInit(); if (isEmpty(steps)) { - this.recordContext(this.record.initData as IStepOptions) + this.recordContext(this.record.initData as IStepOptions); await this.doCompleted(); return this.context; } this.context.steps = map(steps as ISteps[], (item) => { item.status = STEP_STATUS.PENING; + item.timeout = item.timeout || this.options.stepTimeout; return item; }); return new Promise(async (resolve) => { - const states: any = { - init: { - on: { - INIT: get(steps, '[0].stepCount'), + // 全局超时Promise + const globalTimeoutPromise = this.createGlobalTimeoutPromise(); + + // 步骤执行Promise + const stepExecutionPromise = new Promise(async (stepResolve) => { + const states: any = { + init: { + on: { + INIT: get(steps, '[0].stepCount'), + }, }, - }, - final: { - type: 'final', - invoke: { - src: async () => { - // 执行终态是 error-with-continue 的时候,改为 success - const status = - this.record.status === STEP_STATUS.ERROR_WITH_CONTINUE - ? STEP_STATUS.SUCCESS - : this.record.status; - this.context.status = status; - await this.doCompleted(); - if (status === STEP_STATUS.SUCCESS) { - this.report(); - } - debug('engine end'); - resolve(this.context); + final: { + type: 'final', + invoke: { + src: async () => { + // 执行终态是 error-with-continue 的时候,改为 success + const status = + this.record.status === STEP_STATUS.ERROR_WITH_CONTINUE + ? STEP_STATUS.SUCCESS + : this.record.status; + this.context.status = status; + await this.doCompleted(); + if (status === STEP_STATUS.SUCCESS) { + this.report(); + } + debug('engine end'); + stepResolve(this.context); + }, }, }, - }, - }; + }; each(steps, (item, index) => { const target = steps[index + 1] ? get(steps, `[${index + 1}].stepCount`) : 'final'; @@ -162,6 +189,9 @@ class Engine { if (this.record.status === STEP_STATUS.FAILURE) { return this.doSkip(item); } + if (this.record.status === STEP_STATUS.TIMEOUT_FAILURE) { + return this.doSkip(item); + } return this.handleSrc(item); }, onDone: { @@ -172,19 +202,62 @@ class Engine { }; }); - const fetchMachine = createMachine({ - predictableActionArguments: true, - id: 'step', - initial: 'init', - states, - }); + const fetchMachine = createMachine({ + predictableActionArguments: true, + id: 'step', + initial: 'init', + states, + }); - const stepService = interpret(fetchMachine) - .onTransition((state) => { - this.logger?.debug(`step: ${state.value}`); - }) - .start(); - stepService.send('INIT'); + const stepService = interpret(fetchMachine) + .onTransition((state) => { + this.logger?.debug(`step: ${state.value}`); + }) + .start(); + stepService.send('INIT'); + }); + + // Promise竞赛:步骤执行 vs 全局超时 + try { + const result = await Promise.race([stepExecutionPromise, globalTimeoutPromise]); + resolve(result); + } catch (error) { + // 如果是全局超时错误,确保设置正确的状态 + if (error instanceof TimeoutError) { + this.context.status = STEP_STATUS.TIMEOUT_FAILURE; + this.context.error = error; + await this.doCompleted(); + } + resolve(this.context); + } + }); + } + // 创建全局超时Promise + private createGlobalTimeoutPromise(): Promise { + return new Promise((_, reject) => { + const globalTimeout = this.options.timeout; + if (globalTimeout) { + const timeoutInMs = globalTimeout * 1000; + this.globalTimeoutId = setTimeout(() => { + const errorMsg = `Global timeout after ${globalTimeout}s`; + this.logger?.error(errorMsg); + + // 设置全局状态为超时失败 + this.record.status = STEP_STATUS.TIMEOUT_FAILURE; + this.record.editStatusAble = false; + + // 清理定时器 + this.clearAllTimeouts(); + + // 杀死正在执行的子进程 + each(this.childProcess, (item) => { + item.kill(); + }); + + reject(new TimeoutError(errorMsg)); + }, timeoutInMs); + } + // 如果没有设置全局超时,这个Promise永远不会reject,让步骤执行正常进行 }); } private report() { @@ -283,6 +356,7 @@ class Engine { each(this.childProcess, (item) => { item.kill(); }); + this.clearAllTimeouts(); // 清除所有超时定时器 } private getFilterContext() { const { inputs = {} } = this.options; @@ -297,6 +371,7 @@ class Engine { }; } private async doCompleted() { + this.logger.info(TAG_MESSAGE.COMPLETED_START); this.context.completed = true; const filePath = getLogPath(COMPLETED_STEP_COUNT); this.logger = this.getLogger(filePath); @@ -307,14 +382,31 @@ class Engine { await events?.onCompleted?.(this.context, this.logger); } catch (error) { this.outputErrorLog(error as Error); + this.logger.info(TAG_MESSAGE.COMPLETED_FAIL); } } + const totalTime = getProcessTime(this.startTime); + if (this.context.performance) { + this.context.performance.totalTime = totalTime; + this.context.performance.taskStatus = this.record.status; + this.context.performance.stepLength = this.context.steps.length; + } + this.clearAllTimeouts(); // 清除所有超时定时器 await this.doOss(filePath); + this.logger.info(TAG_MESSAGE.COMPLETED_SUCCESS); } private async handleSrc(item: IStepOptions) { + this.logger.debug(`Starting step: ${item.stepCount}`); try { await this.doPreRun(item.stepCount as string); - const response: any = await this.doSrc(item); + // 设置步骤超时定时器 + const stepTimeoutPromise = this.setupStepTimeout(item); + const responsePromise = this.doSrc(item); + const response: any = await Promise.race([responsePromise, stepTimeoutPromise]); + if (this.stepTimeoutId) { + clearTimeout(this.stepTimeoutId); + this.stepTimeoutId = null; + } // 如果已取消且if条件不成功,则不执行该步骤, 并记录状态为 cancelled const isCancel = item.if !== 'true' && this.record.status === STEP_STATUS.CANCEL; if (isCancel) return this.doCancel(item); @@ -334,18 +426,49 @@ class Engine { } const process_time = getProcessTime(this.record.startTime); this.recordContext(item, { status: STEP_STATUS.SUCCESS, outputs: response, process_time }); + // 记录步骤耗时 + if (this.context.performance && this.context.performance.stepTimes) { + this.context.performance.stepTimes[item.stepCount as string] = process_time; + } await this.doPostRun(item); await this.doOss(getLogPath(item.stepCount as string)); } catch (error: any) { - const status = - item['continue-on-error'] === true ? STEP_STATUS.ERROR_WITH_CONTINUE : STEP_STATUS.FAILURE; - // 记录全局的执行状态 - if (this.record.editStatusAble) { - this.record.status = status as IStatus; + if (this.stepTimeoutId) { + clearTimeout(this.stepTimeoutId); + this.stepTimeoutId = null; } - if (status === STEP_STATUS.FAILURE) { - // 全局的执行状态一旦失败,便不可修改 - this.record.editStatusAble = false; + // 检查是否是超时错误 + let status: IStatus; + const isTimeoutError = error instanceof TimeoutError; + if (isTimeoutError) { + // 超时错误也支持continue-on-error + status = + item['continue-on-error'] === true + ? STEP_STATUS.ERROR_WITH_CONTINUE + : STEP_STATUS.TIMEOUT_FAILURE; + // 更新全局状态 + if (this.record.editStatusAble) { + // 只有超时且没有continue-on-error时才设置为TIMEOUT_FAILURE + if (status === STEP_STATUS.TIMEOUT_FAILURE) { + this.record.status = STEP_STATUS.TIMEOUT_FAILURE; + this.record.editStatusAble = false; + } + // 如果是ERROR_WITH_CONTINUE,保持当前状态或SUCCESS + } + } else { + // 非超时错误的原有逻辑 + status = + item['continue-on-error'] === true + ? STEP_STATUS.ERROR_WITH_CONTINUE + : STEP_STATUS.FAILURE; + // 记录全局的执行状态 + if (this.record.editStatusAble) { + this.record.status = status as IStatus; + } + if (status === STEP_STATUS.FAILURE) { + // 全局的执行状态一旦失败,便不可修改 + this.record.editStatusAble = false; + } } if (item.id) { this.record.steps = { @@ -357,17 +480,37 @@ class Engine { } const process_time = getProcessTime(this.record.startTime); const logPath = getLogPath(item.stepCount as string); + if (this.context.performance && this.context.performance.stepTimes) { + this.context.performance.stepTimes[item.stepCount as string] = process_time; + } if (item['continue-on-error']) { - this.recordContext(item, { status, process_time }); + // continue-on-error为true时,无论是超时还是其他错误都继续执行 + this.recordContext(item, { status, error, process_time }); await this.doOss(logPath); } else { this.recordContext(item, { status, error, process_time }); - this.outputErrorLog(error as Error); + if (error instanceof Error) { + this.outputErrorLog(error as Error); + } else { + this.outputErrorLog(new Error(error.stderr as string)); + } await this.doOss(logPath); + const runItem = item as IRunOptions; + const pluginItem = item as IPluginOptions; + if (runItem.run) this.logger.info(TAG_MESSAGE.RUN_FAIL(item.name, item.id, runItem.run)); + else if (pluginItem.plugin) this.logger.info(TAG_MESSAGE.PLUGIN_FAIL(item.name, item.id, pluginItem.plugin)); throw error; } } } + private validateWorkingDirectory(path: string) { + if (!fs.existsSync(path)) { + throw new Error(`Invalid working directory: ${path}`); + } + if (!fs.statSync(path).isDirectory()) { + throw new Error(`Path is not a directory: ${path}`); + } + } private outputErrorLog(error: Error) { const logConfig = this.options.logConfig as ILogConfig; const { customLogger } = logConfig; @@ -385,15 +528,21 @@ class Engine { debug(`run: ${runItem.run}`); let execPath = runItem['working-directory'] || this.context.cwd; execPath = path.isAbsolute(execPath) ? execPath : path.join(this.context.cwd, execPath); + if (execPath) { + this.validateWorkingDirectory(execPath); + } this.logName(item); runItem.run = this.doArtTemplateCompile(runItem.run); + this.logger.info(TAG_MESSAGE.RUN_START(runItem.name, runItem.id, runItem.run)); const cp = command(runItem.run, { cwd: execPath, env: this.parseEnv(runItem), shell: true }); this.childProcess.push(cp); const res = await this.onFinish(cp, runItem.stepCount as string); + this.logger.info(TAG_MESSAGE.RUN_SUCCESS(runItem.name, runItem.id, runItem.run)); return res; } // plugin if (pluginItem.plugin) { + this.logger.info(TAG_MESSAGE.PLUGIN_START(pluginItem.name, pluginItem.id, pluginItem.plugin)); const newEnv = this.parseEnv(runItem); for (const key in newEnv) { process.env[key] = newEnv[key]; @@ -407,9 +556,11 @@ class Engine { debug(`plugin inputs: ${stringify(newInputs)}`); debug(`plugin context: ${stringify(newContext)}`); try { - return pluginItem.type === 'run' - ? await app.run(newInputs, newContext, this.logger) - : await app.postRun(newInputs, newContext, this.logger); + const res = pluginItem.type === 'run' + ? await this.doPluginRun(app, newInputs, newContext, this.logger) + : await this.doPluginRun(app, newInputs, newContext, this.logger, true); + this.logger.info(TAG_MESSAGE.PLUGIN_SUCCESS(pluginItem.name, pluginItem.id, pluginItem.plugin)); + return res; } catch (err) { const error = err as Error; execDaemon('report.js', { type: EReportType.exception, userAgent: getUserAgent(), plugin: pluginItem.info, message: error.message }); @@ -417,6 +568,43 @@ class Engine { } } } + private async doPluginRun(app: any, inputs: any, context: any, logger: EngineLogger, postRun: boolean = false) { + const stdout: string[] = []; + const stderr: string[] = []; + + const originalStdoutWrite = process.stdout.write.bind(process.stdout); + const originalStderrWrite = process.stderr.write.bind(process.stderr); + + process.stdout.write = (chunk: any) => { + stdout.push(chunk.toString()); + return true; + }; + + process.stderr.write = (chunk: any) => { + stderr.push(chunk.toString()); + return true; + }; + try { + if (postRun) { + return await app.postRun(inputs, context, logger); + } else { + return await app.run(inputs, context, logger); + } + } catch (err: any) { + throw { + stdout: stdout.join(''), + stderr: stderr.join(''), + message: err.message + }; + } finally { + // 还原原始写入方法 + process.stdout.write = originalStdoutWrite; + process.stderr.write = originalStderrWrite; + // 将内容写回到process + process.stdout.write(stdout.join('')); + process.stderr.write(stderr.join('')); + } + } private parseEnv(item: IRunOptions) { const { inputs } = this.options; const newEnv = { ...inputs?.env, ...item.env }; @@ -513,10 +701,51 @@ class Engine { cp.on('exit', (code: number) => { code === 0 || this.record.status === STEP_STATUS.CANCEL ? resolve({}) - : reject(new Error(Buffer.concat(stderr).toString())); + : reject({ + stderr: Buffer.concat(stderr as any).toString(), + stdout: Buffer.concat(stdout as any).toString(), + }); }); }); } + // 步骤超时处理逻辑 + private setupStepTimeout(item: IStepOptions): Promise { + return new Promise((_, reject) => { + // 优先使用step自身的超时设置,其次使用默认step超时设置 + const timeout = item.timeout || this.options.stepTimeout; + if (timeout) { + const timeoutInMs = timeout * 1000; + this.stepTimeoutId = setTimeout(() => { + this.handleStepTimeout(item, timeout, reject); + }, timeoutInMs); + } + }); + } + // 步骤超时处理的具体实现 + private handleStepTimeout(item: IStepOptions, timeout: number, reject: (reason?: any) => void) { + const errorMsg = `Step '${item.stepCount}' timeout after ${timeout}s`; + this.logger?.error(errorMsg); + reject(new TimeoutError(errorMsg, item)); + } + // 超时清理逻辑 + private clearTimeout() { + if (this.stepTimeoutId) { + clearTimeout(this.stepTimeoutId); + this.stepTimeoutId = null; + } + } + + // 清理所有超时定时器 + private clearAllTimeouts() { + if (this.stepTimeoutId) { + clearTimeout(this.stepTimeoutId); + this.stepTimeoutId = null; + } + if (this.globalTimeoutId) { + clearTimeout(this.globalTimeoutId); + this.globalTimeoutId = null; + } + } } export default Engine; diff --git a/packages/engine/src/types.ts b/packages/engine/src/types.ts index 5eadb3a..bd3ed68 100644 --- a/packages/engine/src/types.ts +++ b/packages/engine/src/types.ts @@ -1,4 +1,13 @@ import { IOssConfig, LoggerLevel } from '@serverless-cd/core'; + +// 新增超时异常类型 +export class TimeoutError extends Error { + constructor(message: string, public step?: IStepOptions) { + super(message); + this.name = 'TimeoutError'; + } +} + export interface IEngineOptions { steps?: IStepOptions[]; inputs?: Record; @@ -6,6 +15,9 @@ export interface IEngineOptions { cwd?: string; // 当前工作目录 events?: IEvent; unsetEnvs?: string[]; // 需要清除的环境变量 + stepTimeout?: number; // 默认步骤超时时间(秒) + timeout?: number; // 全局超时时间(秒) + reportUrl?: string; // 耗时数据上报URL } interface IEvent { @@ -22,6 +34,7 @@ export interface ILogConfig { customLogger?: any; eol?: string; } + export interface IRunOptions { run: string; stepCount?: string; @@ -31,6 +44,7 @@ export interface IRunOptions { env?: Record; 'continue-on-error'?: boolean; 'working-directory'?: string; + timeout?: number; // 步骤超时时间(秒) } export interface IPluginOptions { @@ -44,6 +58,7 @@ export interface IPluginOptions { inputs?: Record; type?: 'run' | 'postRun'; //内部处理 用于区分是run还是postRun info?: string; // name@version + timeout?: number; // 步骤超时时间(秒) } export type IStepOptions = IRunOptions | IPluginOptions; @@ -62,6 +77,7 @@ export enum STEP_STATUS_BASE { RUNNING = 'running', PENING = 'pending', ERROR_WITH_CONTINUE = 'error-with-continue', + TIMEOUT_FAILURE = 'timeout-failure', // 新增超时失败状态 } export type IStatus = `${STEP_STATUS_BASE}`; @@ -89,6 +105,23 @@ export interface IRecord { isInit: boolean; // 是否将初始化的数据放到context.steps中 } +// 性能数据接口 +export interface IPerformanceData { + initTime?: number; // 初始化耗时(秒) + totalTime?: number; // 总耗时(秒) + stepTimes?: Record; // 每个步骤的耗时(秒) + taskStatus?: IStatus; // 任务状态 + stepLength?: number; // 步骤总数 +} + +// 上报数据接口 +export interface IReportData { + taskId: string; // 任务ID + performance: IPerformanceData; // 性能数据 + timestamp: number; // 时间戳 + userAgent: string; // 用户代理 +} + export interface IContext { cwd: string; // 当前工作目录 stepCount: string; // 记录当前执行的step @@ -98,10 +131,11 @@ export interface IContext { completed: boolean; // 记录task是否执行完成 inputs: Record; // 记录inputs的输入(魔法变量) error: Error; // 记录step的错误信息 + performance?: IPerformanceData; // 性能数据 } - export enum EReportType { command = 'command', - exception = 'exception' + exception = 'exception', + performance = 'performance', // 新增性能数据上报类型 } \ No newline at end of file diff --git a/packages/engine/src/utils/index.ts b/packages/engine/src/utils/index.ts index 57e6bff..1cb37b5 100644 --- a/packages/engine/src/utils/index.ts +++ b/packages/engine/src/utils/index.ts @@ -111,3 +111,18 @@ export const stringify = (value: any) => { return flatted.stringify(value); } }; + +export const TAG_MESSAGE = { + RUN_START: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[group]run:${name}:${id}:${run}`, + RUN_FAIL: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[groupend]run:${name}:${id}:${run}:fail`, + RUN_SUCCESS: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[groupend]run:${name}:${id}:${run}:success`, + PLUGIN_START: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[group]plugin:${name}:${id}:${run}`, + PLUGIN_FAIL: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[groupend]plugin:${name}:${id}:${run}:fail`, + PLUGIN_SUCCESS: (name: string | undefined, id: string | undefined, run: string | undefined) => `##[groupend]plugin:${name}:${id}:${run}:success`, + INIT_START: '##[group]init', + INIT_FAIL: '##[groupend]init:fail', + INIT_SUCCESS: '##[groupend]init:success', + COMPLETED_START: '##[group]completed', + COMPLETED_FAIL: '##[groupend]completed:fail', + COMPLETED_SUCCESS: '##[groupend]completed:success', +};