Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions apps/middleman-workflows/src/activities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,28 +455,33 @@ export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain,
/**
* Verifies the transaction status by the given transaction hash.
*
* @param {string} hash - The hash of the transaction to be verified.
* @return {Promise<readonly [boolean, number, string]>} A promise that resolves to a tuple containing the success status (boolean), the transaction code (number), and the gas used (string). Throws an error if the transaction data is incomplete or not found.
* Returns `[success, code, gasUsed]` when the tx is found on-chain. Throws a retriable
* `ApplicationFailure` when the tx is not (yet) found — the workflow's activity retry
* policy handles re-checking across blocks until the expiration window closes.
*/
async verifyTransaction(hash: string, height?: number, operatorAddress?: string) {
async verifyTransaction(
hash: string,
height?: number,
): Promise<readonly [boolean, number, string]> {
const tx = await pocketRpcClient.getTransaction(hash, height)
if (tx) {
return [tx.success, tx.code, tx.gasUsed?.toString() || '0'] as const
}

// Tier 4: all lookup methods failed — check supplier state directly
if (operatorAddress) {
log.warn('TX not found via any method, checking supplier state', { hash, operatorAddress })
const supplier = await pocketRpcClient.getSupplier(operatorAddress)
if (supplier) {
log.info('Supplier exists on-chain, marking TX as success', { hash, operatorAddress })
return [true, 0, '0'] as const
}
log.warn('Supplier not found on-chain, marking TX as failure', { hash, operatorAddress })
return [false, -1, '0'] as const
}

throw new Error('Transaction data is incomplete or not found')
throw ApplicationFailure.retryable(
'Transaction not found on-chain',
'TX_NOT_FOUND',
{ hash, height },
)
},
/**
* Checks whether a supplier exists on-chain with the given operator address.
* Used as a Tier 4 positive-only fallback for Stake transactions when `verifyTransaction`
* exhausts its retries without finding the tx hash.
*/
async checkSupplierOnChain(operatorAddress: string): Promise<boolean> {
const supplier = await pocketRpcClient.getSupplier(operatorAddress)
return !!supplier
},
/**
* Creates new nodes based on the data extracted from a provided transaction.
Expand Down
9 changes: 8 additions & 1 deletion apps/middleman-workflows/src/lib/blockchain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ export class Blockchain implements IBlockchain {
const maxBlocks = 30;
try {
const comet = await connectComet(this.rpcUrl);
for (let h = height; h < height + maxBlocks; h++) {
const status = await comet.status();
const latestHeight = status.syncInfo.latestBlockHeight;
// Only scan blocks that already exist on-chain. Future heights throw and would
// waste the scan budget; the workflow-level poll loop handles waiting for new
// blocks. Tier 3's role is to recover from a lagging/broken tx_index within
// already-produced blocks, not to wait for the chain to advance.
const endHeight = Math.min(height + maxBlocks, latestHeight);
for (let h = height; h <= endHeight; h++) {
try {
const block = await comet.block(h);
const txs = block.block.txs;
Expand Down
64 changes: 57 additions & 7 deletions apps/middleman-workflows/src/workflows/ExecuteTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export async function ExecuteTransaction(args: TransactionArgs) {
updateTransaction,
executeTransaction,
getBlockHeight,
verifyTransaction,
checkSupplierOnChain,
createNewNodesFromTransaction,
notifyProviderOfStakedAddresses,
notifyProviderOfFailedStakes,
Expand All @@ -57,6 +57,18 @@ export async function ExecuteTransaction(args: TransactionArgs) {
},
});

// verifyTransaction polls the chain on its own retry schedule: one attempt per block
// (pocket block time ≈ 1 min) up to TX_EXPIRATION_BLOCKS, matching the on-chain
// mempool expiration window. Throws retriable `TX_NOT_FOUND` until the tx lands.
const { verifyTransaction } = proxyActivities<ReturnType<typeof delegatorActivities>>({
startToCloseTimeout: "30s",
retry: {
initialInterval: "60s",
backoffCoefficient: 1,
maximumAttempts: TX_EXPIRATION_BLOCKS,
},
});

const transaction = await getTransaction(transactionId);

if (transaction.status !== TransactionStatus.Pending) {
Expand Down Expand Up @@ -127,20 +139,58 @@ export async function ExecuteTransaction(args: TransactionArgs) {
await waitForNextBlock(txHeight);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The txHeight passed here comes from line 78 (const txHeight = await getBlockHeight()), which is captured BEFORE executeTransaction. If the chain advances during the broadcast — perfectly possible under load with variable block time — txHeight is stale.

waitForNextBlock(txHeight) checks currentHeight >= txHeight + 1. Since we already advanced, it returns immediately without waiting — exactly the wait the rest of the fix relies on. verifyTransaction then goes to look for the tx in the same block where it was included, where the tx_index probably hasn't indexed it yet.

The retries with 60s backoff eventually recover, but we're burning minutes of overhead that waitForNextBlock was meant to avoid.

Fix: re-capture the height post-broadcast before calling waitForNextBlock, or wait for txHeight + 2 to have a one-block margin.

}

const [success, code, gasUsed] = await verifyTransaction(
result?.transactionHash || transaction.hash!,
transaction.executionHeight || txHeight,
extractOperatorAddress(transaction),
);
const txHash = result?.transactionHash || transaction.hash!;
const baseHeight = transaction.executionHeight || txHeight;
const operatorAddress = extractOperatorAddress(transaction);

const txStatus = success ? TransactionStatus.Success : TransactionStatus.Failure;
let success = false;
let code = -1;
let gasUsed = '0';
let txFoundOnChain = false;
let supplierFallbackHit = false;

try {
// Retries are driven by Temporal's activity retry policy — one attempt per block
// up to TX_EXPIRATION_BLOCKS. A found tx returns the tuple; a missing tx throws
// retriable TX_NOT_FOUND until the policy is exhausted.
[success, code, gasUsed] = await verifyTransaction(txHash, baseHeight);
txFoundOnChain = true;
} catch {
if (operatorAddress) {
try {
const supplierExists = await checkSupplierOnChain(operatorAddress);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkSupplierOnChain only checks whether a supplier exists for the operator address — it doesn't validate ownership. Since this path is the fallback after verifyTransaction exhausts its retries (30 attempts × 60s ≈ 30min), we land here precisely in suspicious scenarios: tx was never found on-chain via hash nor block scan.

Failure case: another operator (or the same owner via a parallel route) stakes the same operator address while our original tx was lost or failed. Tier 4 sees a supplier exists with that operator and returns true. The workflow then marks the tx as Success, code = 0, and fires createNewNodesFromTransaction + notifyProviderOfStakedAddresses below, creating node records in our DB that point to a supplier we don't own.

Minimum: have checkSupplierOnChain receive the expected ownerAddress and validate supplier.ownerAddress === expectedOwner before returning true. If owner doesn't match, return false (or an explicit differentOwner signal) and leave the tx as Failure.

if (supplierExists) {
success = true;
code = 0;
gasUsed = '0';
supplierFallbackHit = true;
}
} catch {
// supplier check also failed — tx stays marked as Failure (success stays false)
}
}
}

const txStatus = success ? TransactionStatus.Success : TransactionStatus.Failure;
const verificationHeight = await getBlockHeight();

let verificationLog: string | undefined;
if (txFoundOnChain) {
if (code !== 0) {
verificationLog = `verification failed with code ${code}`;
}
} else if (supplierFallbackHit) {
verificationLog = 'verified via supplier state fallback (tx hash not found)';
} else {
verificationLog = `tx not found on-chain after ${TX_EXPIRATION_BLOCKS} retries (baseHeight=${baseHeight})`;
}

await updateTransaction(transactionId, {
status: txStatus,
verificationHeight,
consumedFee: Number(gasUsed || 0),
code,
log: verificationLog,
});

if (transaction.type === TransactionType.Stake) {
Expand Down
Loading