From ef16cf5e7a73d15ca820c149d3102bf6ce6b9201 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:18:53 +0530 Subject: [PATCH 1/4] fix: handle organizationUpdate duplicate start during unmerge --- backend/package.json | 3 +- .../trigger-organization-unmerge-workflow.ts | 90 +++++++++++++++++++ .../src/activities/organizations.ts | 53 ++++++----- 3 files changed, 125 insertions(+), 21 deletions(-) create mode 100644 backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts diff --git a/backend/package.json b/backend/package.json index 6136ed5c37..813decfdf5 100644 --- a/backend/package.json +++ b/backend/package.json @@ -35,7 +35,8 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts", "script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts", "script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts", - "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts" + "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts", + "script:trigger-organization-unmerge-workflow": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/trigger-organization-unmerge-workflow.ts" }, "lint-staged": { "**/*.ts": [ diff --git a/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts b/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts new file mode 100644 index 0000000000..86ebbb68c9 --- /dev/null +++ b/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts @@ -0,0 +1,90 @@ +import commandLineArgs from 'command-line-args' + +import { DEFAULT_TENANT_ID } from '@crowd/common' +import { findOrgById, OrganizationField, pgpQx } from '@crowd/data-access-layer' +import { getDbConnection } from '@crowd/data-access-layer/src/database' +import { getServiceLogger } from '@crowd/logging' +import { getTemporalClient } from '@crowd/temporal' + +import { DB_CONFIG, TEMPORAL_CONFIG } from '@/conf' + +const log = getServiceLogger() + +const options = [ + { + name: 'primaryId', + alias: 'p', + type: String, + description: 'Primary organization id', + }, + { + name: 'secondaryId', + alias: 's', + type: String, + description: 'Secondary organization id', + }, + { + name: 'userId', + alias: 'u', + type: String, + description: 'User id', + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] + +const parameters = commandLineArgs(options) + +setImmediate(async () => { + const primaryId = parameters.primaryId + const secondaryId = parameters.secondaryId + + const userId = parameters.userId + + const db = await getDbConnection({ + host: DB_CONFIG.readHost, + port: DB_CONFIG.port, + database: DB_CONFIG.database, + user: DB_CONFIG.username, + password: DB_CONFIG.password, + }) + + const qx = pgpQx(db) + const temporal = await getTemporalClient(TEMPORAL_CONFIG) + + log.info({ primaryId, secondaryId }, 'Running script with the following parameters!') + + const primaryOrganization = await findOrgById(qx, primaryId, [OrganizationField.ID, OrganizationField.DISPLAY_NAME]) + const secondaryOrganization = await findOrgById(qx, secondaryId, [OrganizationField.ID, OrganizationField.DISPLAY_NAME]) + + if (!primaryOrganization || !secondaryOrganization) { + log.error({ primaryId, secondaryId }, 'Primary or secondary organization not found!') + process.exit(1) + } + + try { + await temporal.workflow.start('finishOrganizationUnmerging', { + taskQueue: 'entity-merging', + workflowId: `finishOrganizationUnmerging/${primaryId}/${secondaryId}`, + retry: { + maximumAttempts: 10, + }, + args: [primaryOrganization.id, secondaryOrganization.id, primaryOrganization.displayName, secondaryOrganization.displayName, userId], + searchAttributes: { + TenantId: [DEFAULT_TENANT_ID], + }, + }) + + // wait till the workflow is finished + await temporal.workflow.result(`finishOrganizationUnmerging/${primaryId}/${secondaryId}`) + } catch (err) { + log.error({ primaryId, secondaryId, err }, 'Failed to trigger workflow for organization unmerge!') + throw err + } + + process.exit(0) +}) diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index e864d92bb4..64800a4f8a 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -1,3 +1,5 @@ +import { WorkflowIdReusePolicy } from '@temporalio/workflow' + import { DEFAULT_TENANT_ID } from '@crowd/common' import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer/src/activityRelations' import { @@ -38,28 +40,39 @@ export async function finishOrganizationMergingUpdateActivities( export async function recalculateActivityAffiliationsOfOrganizationSynchronous( organizationId: string, ): Promise { - await svc.temporal.workflow.start('organizationUpdate', { - taskQueue: 'profiles', - workflowId: `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`, - followRuns: true, - retry: { - maximumAttempts: 10, - }, - args: [ - { - organization: { - id: organizationId, - }, - recalculateAffiliations: true, - syncOptions: { - doSync: false, - withAggs: false, - }, + const workflowId = `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}` + + try { + await svc.temporal.workflow.start('organizationUpdate', { + taskQueue: 'profiles', + workflowId, + workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE, + followRuns: true, + retry: { + maximumAttempts: 10, }, - ], - }) + args: [ + { + organization: { + id: organizationId, + }, + recalculateAffiliations: true, + syncOptions: { + doSync: false, + withAggs: false, + }, + }, + ], + }) + } catch (err) { + if (err.name !== 'WorkflowExecutionAlreadyStartedError') { + throw err + } + } - await svc.temporal.workflow.result(`${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`) + // wait for the workflow to finish + const handle = svc.temporal.workflow.getHandle(workflowId) + await handle.result() } export async function syncOrganization(organizationId: string, syncStart: Date): Promise { From 6c39443209284af40ffbdc2b35d16444bcaa9eb6 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 16 Dec 2025 18:00:48 +0530 Subject: [PATCH 2/4] refactor: improve error handling in organization unmerge workflow --- .../trigger-organization-unmerge-workflow.ts | 43 +++++++++++++------ .../entity_merging_worker/src/activities.ts | 2 +- .../src/activities/organizations.ts | 28 ++++++++---- .../src/workflows/all.ts | 6 +-- 4 files changed, 53 insertions(+), 26 deletions(-) diff --git a/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts b/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts index 86ebbb68c9..f5471cc443 100644 --- a/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts +++ b/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts @@ -1,7 +1,7 @@ import commandLineArgs from 'command-line-args' import { DEFAULT_TENANT_ID } from '@crowd/common' -import { findOrgById, OrganizationField, pgpQx } from '@crowd/data-access-layer' +import { OrganizationField, findOrgById, pgpQx } from '@crowd/data-access-layer' import { getDbConnection } from '@crowd/data-access-layer/src/database' import { getServiceLogger } from '@crowd/logging' import { getTemporalClient } from '@crowd/temporal' @@ -58,33 +58,48 @@ setImmediate(async () => { log.info({ primaryId, secondaryId }, 'Running script with the following parameters!') - const primaryOrganization = await findOrgById(qx, primaryId, [OrganizationField.ID, OrganizationField.DISPLAY_NAME]) - const secondaryOrganization = await findOrgById(qx, secondaryId, [OrganizationField.ID, OrganizationField.DISPLAY_NAME]) + const primaryOrganization = await findOrgById(qx, primaryId, [ + OrganizationField.ID, + OrganizationField.DISPLAY_NAME, + ]) + const secondaryOrganization = await findOrgById(qx, secondaryId, [ + OrganizationField.ID, + OrganizationField.DISPLAY_NAME, + ]) if (!primaryOrganization || !secondaryOrganization) { log.error({ primaryId, secondaryId }, 'Primary or secondary organization not found!') process.exit(1) } - try { + try { await temporal.workflow.start('finishOrganizationUnmerging', { - taskQueue: 'entity-merging', - workflowId: `finishOrganizationUnmerging/${primaryId}/${secondaryId}`, - retry: { + taskQueue: 'entity-merging', + workflowId: `finishOrganizationUnmerging/${primaryId}/${secondaryId}`, + retry: { maximumAttempts: 10, - }, - args: [primaryOrganization.id, secondaryOrganization.id, primaryOrganization.displayName, secondaryOrganization.displayName, userId], - searchAttributes: { + }, + args: [ + primaryOrganization.id, + secondaryOrganization.id, + primaryOrganization.displayName, + secondaryOrganization.displayName, + userId, + ], + searchAttributes: { TenantId: [DEFAULT_TENANT_ID], - }, + }, }) // wait till the workflow is finished await temporal.workflow.result(`finishOrganizationUnmerging/${primaryId}/${secondaryId}`) - } catch (err) { - log.error({ primaryId, secondaryId, err }, 'Failed to trigger workflow for organization unmerge!') + } catch (err) { + log.error( + { primaryId, secondaryId, err }, + 'Failed to trigger workflow for organization unmerge!', + ) throw err - } + } process.exit(0) }) diff --git a/services/apps/entity_merging_worker/src/activities.ts b/services/apps/entity_merging_worker/src/activities.ts index f94a6ecab8..d97deae317 100644 --- a/services/apps/entity_merging_worker/src/activities.ts +++ b/services/apps/entity_merging_worker/src/activities.ts @@ -15,7 +15,7 @@ export { notifyFrontendOrganizationMergeSuccessful, notifyFrontendOrganizationUnmergeSuccessful, syncOrganization, - recalculateActivityAffiliationsOfOrganizationSynchronous, + recalculateActivityAffiliationsOfOrganizationAsync, } from './activities/organizations' export { setMergeAction } from './activities/common' diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index 64800a4f8a..9a3e0b6fd9 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -37,17 +37,30 @@ export async function finishOrganizationMergingUpdateActivities( await moveActivityRelationsToAnotherOrganization(qx, secondaryId, primaryId) } -export async function recalculateActivityAffiliationsOfOrganizationSynchronous( +export async function recalculateActivityAffiliationsOfOrganizationAsync( organizationId: string, ): Promise { const workflowId = `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}` + try { + const handle = svc.temporal.workflow.getHandle(workflowId) + const { status } = await handle.describe() + + if (status.name === 'RUNNING') { + await handle.result() + } + } catch (err) { + if (err.name !== 'WorkflowNotFoundError') { + svc.log.error({ err }, 'Failed to check workflow state') + throw err + } + } + try { await svc.temporal.workflow.start('organizationUpdate', { taskQueue: 'profiles', workflowId, workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE, - followRuns: true, retry: { maximumAttempts: 10, }, @@ -65,14 +78,13 @@ export async function recalculateActivityAffiliationsOfOrganizationSynchronous( ], }) } catch (err) { - if (err.name !== 'WorkflowExecutionAlreadyStartedError') { - throw err + if (err.name === 'WorkflowExecutionAlreadyStartedError') { + svc.log.info({ workflowId }, 'Workflow already started, skipping') + return } - } - // wait for the workflow to finish - const handle = svc.temporal.workflow.getHandle(workflowId) - await handle.result() + throw err + } } export async function syncOrganization(organizationId: string, syncStart: Date): Promise { diff --git a/services/apps/entity_merging_worker/src/workflows/all.ts b/services/apps/entity_merging_worker/src/workflows/all.ts index 06d60229cf..e68d0e77bd 100644 --- a/services/apps/entity_merging_worker/src/workflows/all.ts +++ b/services/apps/entity_merging_worker/src/workflows/all.ts @@ -11,7 +11,7 @@ const { notifyFrontendOrganizationMergeSuccessful, notifyFrontendOrganizationUnmergeSuccessful, recalculateActivityAffiliationsOfMemberAsync, - recalculateActivityAffiliationsOfOrganizationSynchronous, + recalculateActivityAffiliationsOfOrganizationAsync, setMergeAction, syncMember, syncOrganization, @@ -118,8 +118,8 @@ export async function finishOrganizationUnmerging( await setMergeAction(primaryId, secondaryId, { step: MergeActionStep.UNMERGE_ASYNC_STARTED, }) - await recalculateActivityAffiliationsOfOrganizationSynchronous(primaryId) - await recalculateActivityAffiliationsOfOrganizationSynchronous(secondaryId) + await recalculateActivityAffiliationsOfOrganizationAsync(primaryId) + await recalculateActivityAffiliationsOfOrganizationAsync(secondaryId) const syncStart = new Date() await syncOrganization(primaryId, syncStart) await syncOrganization(secondaryId, syncStart) From f9dd20e6b70ffdb53c28573c3785b5b00bd60f47 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 16 Dec 2025 19:49:13 +0530 Subject: [PATCH 3/4] fix: update workflow ID policies to allow duplicates and handle conflicts --- .../apps/entity_merging_worker/src/activities/members.ts | 6 +++--- .../entity_merging_worker/src/activities/organizations.ts | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index aac7aa9dfa..84d6c38532 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -1,4 +1,4 @@ -import { WorkflowIdReusePolicy } from '@temporalio/workflow' +import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow' import { DEFAULT_TENANT_ID } from '@crowd/common' import { @@ -47,8 +47,8 @@ export async function recalculateActivityAffiliationsOfMemberAsync( await svc.temporal.workflow.start('memberUpdate', { taskQueue: 'profiles', workflowId, - // if the workflow is already running, this policy will throw an error - workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE, + workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE, + workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL, retry: { maximumAttempts: 10, }, diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index 9a3e0b6fd9..bf35486d11 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -1,4 +1,4 @@ -import { WorkflowIdReusePolicy } from '@temporalio/workflow' +import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow' import { DEFAULT_TENANT_ID } from '@crowd/common' import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer/src/activityRelations' @@ -60,7 +60,8 @@ export async function recalculateActivityAffiliationsOfOrganizationAsync( await svc.temporal.workflow.start('organizationUpdate', { taskQueue: 'profiles', workflowId, - workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE, + workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE, + workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL, retry: { maximumAttempts: 10, }, From e621c63d21fecfaa885d9f9d04eaff7dd33c9812 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 16 Dec 2025 20:11:03 +0530 Subject: [PATCH 4/4] chore: remove trigger-organization-unmerge-workflow script --- backend/package.json | 3 +- .../trigger-organization-unmerge-workflow.ts | 105 ------------------ 2 files changed, 1 insertion(+), 107 deletions(-) delete mode 100644 backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts diff --git a/backend/package.json b/backend/package.json index 813decfdf5..6136ed5c37 100644 --- a/backend/package.json +++ b/backend/package.json @@ -35,8 +35,7 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts", "script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts", "script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts", - "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts", - "script:trigger-organization-unmerge-workflow": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/trigger-organization-unmerge-workflow.ts" + "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts" }, "lint-staged": { "**/*.ts": [ diff --git a/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts b/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts deleted file mode 100644 index f5471cc443..0000000000 --- a/backend/src/bin/scripts/trigger-organization-unmerge-workflow.ts +++ /dev/null @@ -1,105 +0,0 @@ -import commandLineArgs from 'command-line-args' - -import { DEFAULT_TENANT_ID } from '@crowd/common' -import { OrganizationField, findOrgById, pgpQx } from '@crowd/data-access-layer' -import { getDbConnection } from '@crowd/data-access-layer/src/database' -import { getServiceLogger } from '@crowd/logging' -import { getTemporalClient } from '@crowd/temporal' - -import { DB_CONFIG, TEMPORAL_CONFIG } from '@/conf' - -const log = getServiceLogger() - -const options = [ - { - name: 'primaryId', - alias: 'p', - type: String, - description: 'Primary organization id', - }, - { - name: 'secondaryId', - alias: 's', - type: String, - description: 'Secondary organization id', - }, - { - name: 'userId', - alias: 'u', - type: String, - description: 'User id', - }, - { - name: 'help', - alias: 'h', - type: Boolean, - description: 'Print this usage guide.', - }, -] - -const parameters = commandLineArgs(options) - -setImmediate(async () => { - const primaryId = parameters.primaryId - const secondaryId = parameters.secondaryId - - const userId = parameters.userId - - const db = await getDbConnection({ - host: DB_CONFIG.readHost, - port: DB_CONFIG.port, - database: DB_CONFIG.database, - user: DB_CONFIG.username, - password: DB_CONFIG.password, - }) - - const qx = pgpQx(db) - const temporal = await getTemporalClient(TEMPORAL_CONFIG) - - log.info({ primaryId, secondaryId }, 'Running script with the following parameters!') - - const primaryOrganization = await findOrgById(qx, primaryId, [ - OrganizationField.ID, - OrganizationField.DISPLAY_NAME, - ]) - const secondaryOrganization = await findOrgById(qx, secondaryId, [ - OrganizationField.ID, - OrganizationField.DISPLAY_NAME, - ]) - - if (!primaryOrganization || !secondaryOrganization) { - log.error({ primaryId, secondaryId }, 'Primary or secondary organization not found!') - process.exit(1) - } - - try { - await temporal.workflow.start('finishOrganizationUnmerging', { - taskQueue: 'entity-merging', - workflowId: `finishOrganizationUnmerging/${primaryId}/${secondaryId}`, - retry: { - maximumAttempts: 10, - }, - args: [ - primaryOrganization.id, - secondaryOrganization.id, - primaryOrganization.displayName, - secondaryOrganization.displayName, - userId, - ], - searchAttributes: { - TenantId: [DEFAULT_TENANT_ID], - }, - }) - - // wait till the workflow is finished - await temporal.workflow.result(`finishOrganizationUnmerging/${primaryId}/${secondaryId}`) - } catch (err) { - log.error( - { primaryId, secondaryId, err }, - 'Failed to trigger workflow for organization unmerge!', - ) - throw err - } - - process.exit(0) -})