diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index 9d272e89e..781691464 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -301,7 +301,7 @@ export class ActivityModuleImpl implements ActivityModule { this.logger.debug("Initializing the exe-unit for activity", { activityId: activity.id }); try { - await exe.setup(); + await exe.setup(options?.setupSignalOrTimeout); const refreshedActivity = await this.refreshActivity(activity).catch(() => { this.logger.warn("Failed to refresh activity after work context initialization", { activityId: activity.id }); return activity; diff --git a/src/activity/exe-unit/exe-unit.ts b/src/activity/exe-unit/exe-unit.ts index 3569a90b2..51cfe6d56 100644 --- a/src/activity/exe-unit/exe-unit.ts +++ b/src/activity/exe-unit/exe-unit.ts @@ -45,7 +45,17 @@ export interface ExeUnitOptions { /** this function is called before the exe unit is destroyed */ teardown?: LifecycleFunction; executionOptions?: ExecutionOptions; + /** + * Abort signal or timeout for the entire lifecycle of the exe unit. + * If this signal is aborted or timeout is reached at any point, all ongoing operations + * will be stopped, no matter if it's activity deployment, setup, command execution or teardown. + */ signalOrTimeout?: number | AbortSignal; + /** + * Abort signal or timeout for the setup phase only. + * If this signal is aborted or timeout is reached after the setup phase is completed, it will be ignored. + */ + setupSignalOrTimeout?: number | AbortSignal; volumes?: Record; } @@ -115,21 +125,41 @@ export class ExeUnit { * This function initializes the exe unit by deploying the image to the remote machine * and preparing and running the environment. * This process also includes running setup function if the user has defined it + * + * @param signalOrTimeout - Abort signal or timeout for the setup phase only. + * If this signal is aborted or timeout is reached after the setup phase is completed, it will be ignored. */ - async setup(): Promise { + async setup(signalOrTimeout?: number | AbortSignal): Promise { + const setupSignal = createAbortSignalFromTimeout(signalOrTimeout); + const throwIfAborted = () => { + if (this.abortSignal.aborted) { + throw new GolemAbortError("ExeUnit has been aborted", this.abortSignal.reason); + } + if (setupSignal.aborted) { + throw setupSignal.reason.name === "TimeoutError" + ? new GolemTimeoutError("ExeUnit setup has been aborted due to a timeout", setupSignal.reason) + : new GolemAbortError("ExeUnit setup has been aborted", setupSignal.reason); + } + }; try { + throwIfAborted(); let state = await this.fetchState(); + throwIfAborted(); + if (state === ActivityStateEnum.Ready) { await this.setupActivity(); return; } if (state === ActivityStateEnum.Initialized) { - await this.deployActivity(); + await this.deployActivity(setupSignal); + throwIfAborted(); } await sleep(1000, true); + throwIfAborted(); state = await this.fetchState(); + throwIfAborted(); if (state !== ActivityStateEnum.Ready) { throw new GolemWorkError( @@ -141,6 +171,7 @@ export class ExeUnit { ); } await this.setupActivity(); + throwIfAborted(); } catch (error) { if (this.abortSignal.aborted) { throw this.abortSignal.reason.name === "TimeoutError" @@ -164,7 +195,7 @@ export class ExeUnit { } } - private async deployActivity() { + private async deployActivity(setupAbortSignal?: AbortSignal) { try { const executionMetadata = await this.executor.execute( new Script([ @@ -175,7 +206,7 @@ export class ExeUnit { new Start(), ]).getExeScriptRequest(), ); - const result$ = this.executor.getResultsObservable(executionMetadata); + const result$ = this.executor.getResultsObservable(executionMetadata, false, setupAbortSignal); // if any result is an error, throw an error await lastValueFrom( result$.pipe( diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 7774d755d..9f0398cad 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -607,7 +607,7 @@ export class GolemNetwork { * @param options.order - represents the order specifications which will result in access to LeaseProcess. * @param options.poolSize {Object | number} - can be defined as a number or an object with min and max fields, if defined as a number it will be treated as a min parameter. * @param options.poolSize.min - the minimum pool size to achieve ready state (default = 0) - * @param options.poolSize.max - the maximum pool size, if reached, the next pool element will only be available if the borrowed resource is released or destroyed (dafault = 100) + * @param options.poolSize.max - the maximum pool size, if reached, the next pool element will only be available if the borrowed resource is released or destroyed (default = 100) * @param options.setup - an optional function that is called as soon as the exe unit is ready * @param options.teardown - an optional function that is called before the exe unit is destroyed */ diff --git a/src/market/demand/demand.ts b/src/market/demand/demand.ts index e401a64a6..ade918fb6 100644 --- a/src/market/demand/demand.ts +++ b/src/market/demand/demand.ts @@ -88,7 +88,7 @@ export type OrderDemandOptions = Partial<{ /** Demand properties that determine payment related terms & conditions of the agreement */ payment: Partial; }> & - /** Demand properties that determine most common paramters of the agreement (based on golemsp implementation */ + /** Demand properties that determine most common parameters of the agreement (based on golemsp implementation */ Partial; export interface IDemandRepository { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index ab1a6c02f..cf2be7811 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -207,7 +207,7 @@ export interface MarketModule { }): Observable; /** - * Estimate the budget for the given order and maximum numbers of agreemnets. + * Estimate the budget for the given order and maximum numbers of agreements. * Keep in mind that this is just an estimate and the actual cost may vary. * The method returns the estimated budget in GLM. * @param params diff --git a/src/network/network.test.ts b/src/network/network.test.ts index 28f3fcae2..7f5f7ed51 100644 --- a/src/network/network.test.ts +++ b/src/network/network.test.ts @@ -64,7 +64,7 @@ describe("Network", () => { new GolemNetworkError(`Unable to add node network-node-id to removed network`, NetworkErrorCode.NetworkRemoved), ); }); - test("should get first avialble ip adrress", () => { + test("should get first avialble ip address", () => { const network = new Network("network-id", "192.168.0.0/24"); expect(network.getFirstAvailableIpAddress().toString()).toEqual("192.168.0.1"); }); diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 20254cf5b..39887fec5 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -143,18 +143,11 @@ export class ResourceRental { if (this.currentExeUnit !== null) { return this.currentExeUnit; } - const abortController = new AbortController(); - this.finalizeAbortController.signal.addEventListener("abort", () => - abortController.abort(this.finalizeAbortController.signal.reason), - ); - if (signalOrTimeout) { - const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); - abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason)); - if (signalOrTimeout instanceof AbortSignal && signalOrTimeout.aborted) { - abortController.abort(signalOrTimeout.reason); - } + const signal = createAbortSignalFromTimeout(signalOrTimeout); + if (signal.aborted) { + throw new GolemAbortError("Initializing of the exe-unit has been aborted", signal.reason); } - return this.createExeUnit(abortController.signal); + return this.createExeUnit(signal); } /** @@ -194,7 +187,8 @@ export class ResourceRental { storageProvider: this.storageProvider, networkNode: this.resourceRentalOptions?.networkNode, executionOptions: this.resourceRentalOptions?.activity, - signalOrTimeout: abortSignal, + signalOrTimeout: this.finalizeAbortController.signal, + setupSignalOrTimeout: this.setupAbortController.signal, ...this.resourceRentalOptions?.exeUnit, }); this.events.emit("exeUnitCreated", activity); diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index 25f70bd34..e61964f82 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -26,7 +26,7 @@ import { cancelYagnaApiCall } from "../../utils/cancel"; /** * A bit more user-friendly type definition of DemandOfferBaseDTO from ya-ts-client * - * That's probably one of the most confusing elements around Golem Protocol and the API specificiation: + * That's probably one of the most confusing elements around Golem Protocol and the API specification: * * - Providers create Offers * - Requestors create Demands diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index 94177771f..1913d299b 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -220,6 +220,19 @@ describe("ResourceRentalPool", () => { ); }); + it("should not interrupt running commands on the exe-unit when aborting the setup signal after setup is done", async () => { + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { poolSize: 1 }); + const rental = await pool.acquire(); + const abortController = new AbortController(); + const exeUnit = await rental.getExeUnit(abortController.signal); + const runPromise = exeUnit.run("sleep 5 && echo Hello World"); + // wait 2 seconds and abort the signal - the setup is already done so it should not affect the running command + await new Promise((res) => setTimeout(res, 2000)); + abortController.abort(); + const result = await runPromise; + expect(result.stdout?.toString().trim()).toEqual("Hello World"); + }); + it("should abort getting the newly created exe-unit by signal", async () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { poolSize: 1 }); const abortController = new AbortController();