From e6d9cea28434acf2cc0c8c40113dea6a2a7ea25a Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 11:31:10 -0500 Subject: [PATCH 1/7] Add retry mechanism and try catch for unhandle errors in schedule tasks --- src/index.ts | 54 ++++++--- src/scripts/pull_and_save_block_events.ts | 136 +++++++++++----------- 2 files changed, 110 insertions(+), 80 deletions(-) diff --git a/src/index.ts b/src/index.ts index 00afde4..602b88d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -137,19 +137,45 @@ createConnection(ormConfig as ConnectionOptions) process.exit(1); }); -async function schedule(connection: Connection | null, producer: Producer | null, func: any, funcName: string) { - const start = new Date().getTime(); - await func(connection, producer); - const end = new Date().getTime(); - const duration = end - start; - let wait: number; - if (duration > SECONDS_BETWEEN_RUNS * 1000) { - wait = 0; - logger.warn(`${funcName} is taking longer than desiered interval`); - } else { - wait = SECONDS_BETWEEN_RUNS * 1000 - duration; +async function schedule( + connection: Connection | null, + producer: Producer | null, + func: any, + funcName: string, + consecutiveErrors: number = 0 +) { + try { + const start = new Date().getTime(); + await func(connection, producer); + const end = new Date().getTime(); + const duration = end - start; + + if (consecutiveErrors > 0) { + logger.info(`${funcName} recovered after ${consecutiveErrors} consecutive errors`); + } + + let wait: number; + if (duration > SECONDS_BETWEEN_RUNS * 1000) { + wait = 0; + logger.warn(`${funcName} is taking longer than desired interval`); + } else { + wait = SECONDS_BETWEEN_RUNS * 1000 - duration; + } + setTimeout(() => { + schedule(connection, producer, func, funcName, 0); + }, wait); + } catch (error) { + consecutiveErrors++; + logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}):`, error); + + const backoffDelay = Math.min( + SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, Math.min(consecutiveErrors - 1, 5)), + 60000 + ); + + logger.warn(`Retrying ${funcName} in ${backoffDelay / 1000}s`); + setTimeout(() => { + schedule(connection, producer, func, funcName, consecutiveErrors); + }, backoffDelay); } - setTimeout(() => { - schedule(connection, producer, func, funcName); - }, wait); } diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index bf33417..a40d6be 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -272,87 +272,91 @@ async function getParseSaveBlocksTransactionsEvents( requiredTxnList?: Set, ): Promise { const blockNumbers = newBlocks.map((newBlock) => newBlock.number!); - const blockRanges = findRanges(blockNumbers); logger.info(`Pulling Block Events for blocks: ${JSON.stringify(blockRanges)}`); - const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); - - const filteredNewBlocksReceipts = newBlocksReceipts.filter( - (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, - ); - - if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { - if (!allowPartialSuccess) { - return false; - } - const { nullOnlyAtEnd } = newBlocksReceipts.reduce( - (state, blockReceipts) => { - if (state.hasSeenNull && blockReceipts !== null) { - state.nullOnlyAtEnd = false; - } + try { + const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); - if (blockReceipts === null) { - state.hasSeenNull = true; - } - return state; - }, - { hasSeenNull: false, nullOnlyAtEnd: true }, + const filteredNewBlocksReceipts = newBlocksReceipts.filter( + (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, ); - if (nullOnlyAtEnd) { - logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); - } else { - logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); - logger.error(newBlocksReceipts); - return false; - } - } + if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { + if (!allowPartialSuccess) { + return false; + } + const { nullOnlyAtEnd } = newBlocksReceipts.reduce( + (state, blockReceipts) => { + if (state.hasSeenNull && blockReceipts !== null) { + state.nullOnlyAtEnd = false; + } - if (filteredNewBlocksReceipts.length > 0) { - const fullBlocks: FullBlock[] = filteredNewBlocksReceipts.map((newBlockReceipts, blockIndex): FullBlock => { - const transactionsWithLogs = newBlockReceipts.map( - (txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => { - if (txReceipt.blockHash !== newBlocks[blockIndex].hash) { - throw new BlockHashMismatchError('Wrong Block hash'); + if (blockReceipts === null) { + state.hasSeenNull = true; } - return { - ...newBlocks[blockIndex].transactions[txIndex], - ...txReceipt, - type: newBlocks[blockIndex].transactions[txIndex].type, - }; + return state; }, + { hasSeenNull: false, nullOnlyAtEnd: true }, ); - return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; - }); - const parsedFullBlocks = fullBlocks.map((fullBlock) => - parseBlockTransactionsEvents(fullBlock, requiredTxnList), - ); + if (nullOnlyAtEnd) { + logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); + } else { + logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); + logger.error(newBlocksReceipts); + return false; + } + } + + if (filteredNewBlocksReceipts.length > 0) { + const fullBlocks: FullBlock[] = filteredNewBlocksReceipts.map((newBlockReceipts, blockIndex): FullBlock => { + const transactionsWithLogs = newBlockReceipts.map( + (txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => { + if (txReceipt.blockHash !== newBlocks[blockIndex].hash) { + throw new BlockHashMismatchError('Wrong Block hash'); + } + return { + ...newBlocks[blockIndex].transactions[txIndex], + ...txReceipt, + type: newBlocks[blockIndex].transactions[txIndex].type, + }; + }, + ); + return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; + }); + + const parsedFullBlocks = fullBlocks.map((fullBlock) => + parseBlockTransactionsEvents(fullBlock, requiredTxnList), + ); - const eventTables = eventScrperProps - .filter((props) => props.enabled) - .map((props: EventScraperProps) => props.table); - - await saveFullBlocks(connection, eventTables, parsedFullBlocks); - - if (FEAT_TOKENS_FROM_TRANSFERS) { - const tokensFromTransfers = [ - ...new Set( - filteredNewBlocksReceipts - .flat() - .map((tx) => tx.logs) - .flat() - .filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0) - .map((log) => log.address), - ), - ]; - await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers); + const eventTables = eventScrperProps + .filter((props) => props.enabled) + .map((props: EventScraperProps) => props.table); + + await saveFullBlocks(connection, eventTables, parsedFullBlocks); + + if (FEAT_TOKENS_FROM_TRANSFERS) { + const tokensFromTransfers = [ + ...new Set( + filteredNewBlocksReceipts + .flat() + .map((tx) => tx.logs) + .flat() + .filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0) + .map((log) => log.address), + ), + ]; + await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers); + } + return true; } - return true; + return false; + } catch (error) { + logger.error('Blocks error:', error); + throw error; // Re-throw to be caught by the schedule function } - return false; } export class BlockEventsScraper { From 67e155be703eb8117c8f83208a31bf8dd960081f Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 11:40:03 -0500 Subject: [PATCH 2/7] Remove not necessary changes --- src/scripts/pull_and_save_block_events.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index a40d6be..ee1b9b3 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -272,11 +272,13 @@ async function getParseSaveBlocksTransactionsEvents( requiredTxnList?: Set, ): Promise { const blockNumbers = newBlocks.map((newBlock) => newBlock.number!); + const blockRanges = findRanges(blockNumbers); logger.info(`Pulling Block Events for blocks: ${JSON.stringify(blockRanges)}`); try { + const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); const filteredNewBlocksReceipts = newBlocksReceipts.filter( @@ -355,7 +357,7 @@ async function getParseSaveBlocksTransactionsEvents( return false; } catch (error) { logger.error('Blocks error:', error); - throw error; // Re-throw to be caught by the schedule function + throw error; } } From 91f0e3ad9c1ba3ebca00c61d606dbc92ef148444 Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 11:55:15 -0500 Subject: [PATCH 3/7] Remove try catch --- src/scripts/pull_and_save_block_events.ts | 134 +++++++++++----------- 1 file changed, 64 insertions(+), 70 deletions(-) diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index ee1b9b3..bf33417 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -277,88 +277,82 @@ async function getParseSaveBlocksTransactionsEvents( logger.info(`Pulling Block Events for blocks: ${JSON.stringify(blockRanges)}`); - try { + const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); + + const filteredNewBlocksReceipts = newBlocksReceipts.filter( + (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, + ); - const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); + if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { + if (!allowPartialSuccess) { + return false; + } + const { nullOnlyAtEnd } = newBlocksReceipts.reduce( + (state, blockReceipts) => { + if (state.hasSeenNull && blockReceipts !== null) { + state.nullOnlyAtEnd = false; + } - const filteredNewBlocksReceipts = newBlocksReceipts.filter( - (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, + if (blockReceipts === null) { + state.hasSeenNull = true; + } + return state; + }, + { hasSeenNull: false, nullOnlyAtEnd: true }, ); - if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { - if (!allowPartialSuccess) { - return false; - } - const { nullOnlyAtEnd } = newBlocksReceipts.reduce( - (state, blockReceipts) => { - if (state.hasSeenNull && blockReceipts !== null) { - state.nullOnlyAtEnd = false; - } + if (nullOnlyAtEnd) { + logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); + } else { + logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); + logger.error(newBlocksReceipts); + return false; + } + } - if (blockReceipts === null) { - state.hasSeenNull = true; + if (filteredNewBlocksReceipts.length > 0) { + const fullBlocks: FullBlock[] = filteredNewBlocksReceipts.map((newBlockReceipts, blockIndex): FullBlock => { + const transactionsWithLogs = newBlockReceipts.map( + (txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => { + if (txReceipt.blockHash !== newBlocks[blockIndex].hash) { + throw new BlockHashMismatchError('Wrong Block hash'); } - return state; + return { + ...newBlocks[blockIndex].transactions[txIndex], + ...txReceipt, + type: newBlocks[blockIndex].transactions[txIndex].type, + }; }, - { hasSeenNull: false, nullOnlyAtEnd: true }, ); + return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; + }); - if (nullOnlyAtEnd) { - logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); - } else { - logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); - logger.error(newBlocksReceipts); - return false; - } - } - - if (filteredNewBlocksReceipts.length > 0) { - const fullBlocks: FullBlock[] = filteredNewBlocksReceipts.map((newBlockReceipts, blockIndex): FullBlock => { - const transactionsWithLogs = newBlockReceipts.map( - (txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => { - if (txReceipt.blockHash !== newBlocks[blockIndex].hash) { - throw new BlockHashMismatchError('Wrong Block hash'); - } - return { - ...newBlocks[blockIndex].transactions[txIndex], - ...txReceipt, - type: newBlocks[blockIndex].transactions[txIndex].type, - }; - }, - ); - return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; - }); - - const parsedFullBlocks = fullBlocks.map((fullBlock) => - parseBlockTransactionsEvents(fullBlock, requiredTxnList), - ); + const parsedFullBlocks = fullBlocks.map((fullBlock) => + parseBlockTransactionsEvents(fullBlock, requiredTxnList), + ); - const eventTables = eventScrperProps - .filter((props) => props.enabled) - .map((props: EventScraperProps) => props.table); - - await saveFullBlocks(connection, eventTables, parsedFullBlocks); - - if (FEAT_TOKENS_FROM_TRANSFERS) { - const tokensFromTransfers = [ - ...new Set( - filteredNewBlocksReceipts - .flat() - .map((tx) => tx.logs) - .flat() - .filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0) - .map((log) => log.address), - ), - ]; - await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers); - } - return true; + const eventTables = eventScrperProps + .filter((props) => props.enabled) + .map((props: EventScraperProps) => props.table); + + await saveFullBlocks(connection, eventTables, parsedFullBlocks); + + if (FEAT_TOKENS_FROM_TRANSFERS) { + const tokensFromTransfers = [ + ...new Set( + filteredNewBlocksReceipts + .flat() + .map((tx) => tx.logs) + .flat() + .filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0) + .map((log) => log.address), + ), + ]; + await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers); } - return false; - } catch (error) { - logger.error('Blocks error:', error); - throw error; + return true; } + return false; } export class BlockEventsScraper { From 6d688ebca0a82bfe1c5053a0dd356c6ad683a04b Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 12:08:17 -0500 Subject: [PATCH 4/7] Change schedule retry logic --- src/index.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index 602b88d..3dc07bc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -150,6 +150,7 @@ async function schedule( const end = new Date().getTime(); const duration = end - start; + // Success! Reset error counter if (consecutiveErrors > 0) { logger.info(`${funcName} recovered after ${consecutiveErrors} consecutive errors`); } @@ -167,13 +168,19 @@ async function schedule( } catch (error) { consecutiveErrors++; logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}):`, error); - + + const MAX_CONSECUTIVE_ERRORS = 5; + if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + logger.fatal(`${funcName} failed ${MAX_CONSECUTIVE_ERRORS} times consecutively. Stopping.`); + process.exit(1); + } + const backoffDelay = Math.min( - SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, Math.min(consecutiveErrors - 1, 5)), - 60000 + SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, consecutiveErrors - 1), + 12000 ); - logger.warn(`Retrying ${funcName} in ${backoffDelay / 1000}s`); + logger.warn(`Retrying ${funcName} in ${backoffDelay / 1000}s (consecutive errors: ${consecutiveErrors})`); setTimeout(() => { schedule(connection, producer, func, funcName, consecutiveErrors); }, backoffDelay); From e9ff5958e615500d66b116fcf4d0c3ca712ccb9c Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 12:09:04 -0500 Subject: [PATCH 5/7] Remove comment --- src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 3dc07bc..57804f8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -150,7 +150,6 @@ async function schedule( const end = new Date().getTime(); const duration = end - start; - // Success! Reset error counter if (consecutiveErrors > 0) { logger.info(`${funcName} recovered after ${consecutiveErrors} consecutive errors`); } From 1b4ae4a143facaa760299e9c4a7b9a9590e4093c Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 15:13:15 -0500 Subject: [PATCH 6/7] Fix resquested changes --- src/index.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index 57804f8..d843456 100644 --- a/src/index.ts +++ b/src/index.ts @@ -168,16 +168,13 @@ async function schedule( consecutiveErrors++; logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}):`, error); - const MAX_CONSECUTIVE_ERRORS = 5; + const MAX_CONSECUTIVE_ERRORS = 3; if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { logger.fatal(`${funcName} failed ${MAX_CONSECUTIVE_ERRORS} times consecutively. Stopping.`); process.exit(1); } - const backoffDelay = Math.min( - SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, consecutiveErrors - 1), - 12000 - ); + const backoffDelay = SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, consecutiveErrors); logger.warn(`Retrying ${funcName} in ${backoffDelay / 1000}s (consecutive errors: ${consecutiveErrors})`); setTimeout(() => { From 3d4d10b1550157b9d6fcadc1e932ef35f4320503 Mon Sep 17 00:00:00 2001 From: ElianaArjona Date: Thu, 6 Nov 2025 20:09:01 -0500 Subject: [PATCH 7/7] Fix msg error parsing --- src/index.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index d843456..e5c64ac 100644 --- a/src/index.ts +++ b/src/index.ts @@ -164,10 +164,14 @@ async function schedule( setTimeout(() => { schedule(connection, producer, func, funcName, 0); }, wait); - } catch (error) { + + } catch (err: any) { consecutiveErrors++; - logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}):`, error); + const errorMessage = err?.message || err?.toString() || 'Unknown error'; + + logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}): ${errorMessage}`); + const MAX_CONSECUTIVE_ERRORS = 3; if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { logger.fatal(`${funcName} failed ${MAX_CONSECUTIVE_ERRORS} times consecutively. Stopping.`);