diff --git a/services/apps/entity_merging_worker/src/activities.ts b/services/apps/entity_merging_worker/src/activities.ts index f94a6ecab8..d97deae317 100644 --- a/services/apps/entity_merging_worker/src/activities.ts +++ b/services/apps/entity_merging_worker/src/activities.ts @@ -15,7 +15,7 @@ export { notifyFrontendOrganizationMergeSuccessful, notifyFrontendOrganizationUnmergeSuccessful, syncOrganization, - recalculateActivityAffiliationsOfOrganizationSynchronous, + recalculateActivityAffiliationsOfOrganizationAsync, } from './activities/organizations' export { setMergeAction } from './activities/common' diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index aac7aa9dfa..84d6c38532 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -1,4 +1,4 @@ -import { WorkflowIdReusePolicy } from '@temporalio/workflow' +import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow' import { DEFAULT_TENANT_ID } from '@crowd/common' import { @@ -47,8 +47,8 @@ export async function recalculateActivityAffiliationsOfMemberAsync( await svc.temporal.workflow.start('memberUpdate', { taskQueue: 'profiles', workflowId, - // if the workflow is already running, this policy will throw an error - workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE, + workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE, + workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL, retry: { maximumAttempts: 10, }, diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index e864d92bb4..bf35486d11 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -1,3 +1,5 @@ +import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow' + import { DEFAULT_TENANT_ID } from '@crowd/common' import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer/src/activityRelations' import { @@ -35,31 +37,55 @@ export async function finishOrganizationMergingUpdateActivities( await moveActivityRelationsToAnotherOrganization(qx, secondaryId, primaryId) } -export async function recalculateActivityAffiliationsOfOrganizationSynchronous( +export async function recalculateActivityAffiliationsOfOrganizationAsync( organizationId: string, ): Promise { - await svc.temporal.workflow.start('organizationUpdate', { - taskQueue: 'profiles', - workflowId: `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`, - followRuns: true, - retry: { - maximumAttempts: 10, - }, - args: [ - { - organization: { - id: organizationId, - }, - recalculateAffiliations: true, - syncOptions: { - doSync: false, - withAggs: false, - }, + const workflowId = `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}` + + try { + const handle = svc.temporal.workflow.getHandle(workflowId) + const { status } = await handle.describe() + + if (status.name === 'RUNNING') { + await handle.result() + } + } catch (err) { + if (err.name !== 'WorkflowNotFoundError') { + svc.log.error({ err }, 'Failed to check workflow state') + throw err + } + } + + try { + await svc.temporal.workflow.start('organizationUpdate', { + taskQueue: 'profiles', + workflowId, + workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE, + workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL, + retry: { + maximumAttempts: 10, }, - ], - }) + args: [ + { + organization: { + id: organizationId, + }, + recalculateAffiliations: true, + syncOptions: { + doSync: false, + withAggs: false, + }, + }, + ], + }) + } catch (err) { + if (err.name === 'WorkflowExecutionAlreadyStartedError') { + svc.log.info({ workflowId }, 'Workflow already started, skipping') + return + } - await svc.temporal.workflow.result(`${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`) + throw err + } } export async function syncOrganization(organizationId: string, syncStart: Date): Promise { diff --git a/services/apps/entity_merging_worker/src/workflows/all.ts b/services/apps/entity_merging_worker/src/workflows/all.ts index 06d60229cf..e68d0e77bd 100644 --- a/services/apps/entity_merging_worker/src/workflows/all.ts +++ b/services/apps/entity_merging_worker/src/workflows/all.ts @@ -11,7 +11,7 @@ const { notifyFrontendOrganizationMergeSuccessful, notifyFrontendOrganizationUnmergeSuccessful, recalculateActivityAffiliationsOfMemberAsync, - recalculateActivityAffiliationsOfOrganizationSynchronous, + recalculateActivityAffiliationsOfOrganizationAsync, setMergeAction, syncMember, syncOrganization, @@ -118,8 +118,8 @@ export async function finishOrganizationUnmerging( await setMergeAction(primaryId, secondaryId, { step: MergeActionStep.UNMERGE_ASYNC_STARTED, }) - await recalculateActivityAffiliationsOfOrganizationSynchronous(primaryId) - await recalculateActivityAffiliationsOfOrganizationSynchronous(secondaryId) + await recalculateActivityAffiliationsOfOrganizationAsync(primaryId) + await recalculateActivityAffiliationsOfOrganizationAsync(secondaryId) const syncStart = new Date() await syncOrganization(primaryId, syncStart) await syncOrganization(secondaryId, syncStart)