diff --git a/backend/docs/data-enrichment-pipeline.md b/backend/docs/data-enrichment-pipeline.md new file mode 100644 index 0000000..c05f991 --- /dev/null +++ b/backend/docs/data-enrichment-pipeline.md @@ -0,0 +1,56 @@ +# Data Enrichment Pipeline + +Incoming records are enriched before persistence so stored data carries source metadata, rule-based tags, and derived fields that downstream search, alerting, and review workflows can use. + +## Flow + +1. Ingestion normalizes an incoming payload into the service-specific record shape. +2. The enrichment pipeline selects provider adapters that support the record type. +3. Each adapter returns a patch containing `metadata`, `tags`, and/or `derivedFields`. +4. The pipeline merges patches, validates the final enrichment output, and retries transient adapter failures. +5. The enriched record is stored with explicit enrichment columns and embedded source attribution. + +For bridge incidents, enrichment runs in `IncidentIngestionService.ingest()` before duplicate checks, incident inserts, review queue writes, and ingestion history writes. + +## Incident Enrichment + +The default incident adapters add: + +- Metadata: provider, record type, source type, source external ID, source host, receipt timestamp, asset presence, severity weight, and follow-up action count. +- Tags: source, severity, bridge, asset, stablecoin classification, source host, and manual-review workflow tags. +- Derived fields: normalized asset code, source host, occurred-at ISO value, priority score, risk band, and age in milliseconds. + +Persisted incident fields: + +- `bridge_incidents.enrichment_metadata` +- `bridge_incidents.enrichment_tags` +- `bridge_incidents.derived_fields` +- `bridge_incidents.enrichment_validation` +- `bridge_incident_review_queue.enriched_payload` +- `bridge_incident_ingestion_history.enrichment_metadata` +- `bridge_incident_ingestion_history.enrichment_tags` +- `bridge_incident_ingestion_history.derived_fields` + +## Provider Adapters + +Adapters implement `EnrichmentProviderAdapter` and can be added to `createDefaultEnrichmentAdapters()`: + +```ts +export interface EnrichmentProviderAdapter { + name: string; + supports(record: EnrichmentRecord): boolean; + enrich(record: EnrichmentRecord): Promise | EnrichmentPatch; +} +``` + +Adapters should keep external lookups narrow and return only enrichment patches. The pipeline owns retries, merging, and validation. + +## Validation And Retries + +Validation requires: + +- `metadata` is an object. +- `tags` is an array of normalized strings matching `^[a-z0-9:_-]+$`. +- `derivedFields` is an object. + +Adapter failures are classified by `RetryPolicyService`. Transient, timeout, and rate-limit failures are retried with exponential backoff and jitter; permanent failures are surfaced immediately. diff --git a/backend/src/database/migrations/031_incident_enrichment_pipeline.ts b/backend/src/database/migrations/031_incident_enrichment_pipeline.ts new file mode 100644 index 0000000..7d855f9 --- /dev/null +++ b/backend/src/database/migrations/031_incident_enrichment_pipeline.ts @@ -0,0 +1,46 @@ +import type { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable("bridge_incidents", (table) => { + table.jsonb("enrichment_metadata").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + table.specificType("enrichment_tags", "text[]").notNullable().defaultTo(knex.raw("'{}'::text[]")); + table.jsonb("derived_fields").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + table.jsonb("enrichment_validation").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + }); + + await knex.schema.alterTable("bridge_incident_review_queue", (table) => { + table.jsonb("enriched_payload").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + }); + + await knex.schema.alterTable("bridge_incident_ingestion_history", (table) => { + table.jsonb("enrichment_metadata").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + table.specificType("enrichment_tags", "text[]").notNullable().defaultTo(knex.raw("'{}'::text[]")); + table.jsonb("derived_fields").notNullable().defaultTo(knex.raw("'{}'::jsonb")); + }); + + await knex.schema.raw(` + CREATE INDEX IF NOT EXISTS bridge_incidents_enrichment_tags_idx + ON bridge_incidents USING GIN (enrichment_tags) + `); +} + +export async function down(knex: Knex): Promise { + await knex.schema.raw("DROP INDEX IF EXISTS bridge_incidents_enrichment_tags_idx"); + + await knex.schema.alterTable("bridge_incident_ingestion_history", (table) => { + table.dropColumn("enrichment_metadata"); + table.dropColumn("enrichment_tags"); + table.dropColumn("derived_fields"); + }); + + await knex.schema.alterTable("bridge_incident_review_queue", (table) => { + table.dropColumn("enriched_payload"); + }); + + await knex.schema.alterTable("bridge_incidents", (table) => { + table.dropColumn("enrichment_metadata"); + table.dropColumn("enrichment_tags"); + table.dropColumn("derived_fields"); + table.dropColumn("enrichment_validation"); + }); +} diff --git a/backend/src/services/enrichment/enrichmentPipeline.service.ts b/backend/src/services/enrichment/enrichmentPipeline.service.ts new file mode 100644 index 0000000..ea4cbd0 --- /dev/null +++ b/backend/src/services/enrichment/enrichmentPipeline.service.ts @@ -0,0 +1,123 @@ +import { retryPolicyService, type RetryPolicyService } from "../retryPolicy.service.js"; +import type { + EnrichmentPatch, + EnrichmentProviderAdapter, + EnrichmentRecord, + EnrichmentResult, + EnrichmentValidationResult, +} from "./types.js"; +import { createDefaultEnrichmentAdapters } from "./providerAdapters.js"; + +function uniqueTags(tags: string[]): string[] { + return Array.from(new Set(tags.filter(Boolean))).sort(); +} + +function mergePatch(target: EnrichmentPatch, patch: EnrichmentPatch): EnrichmentPatch { + return { + metadata: { + ...(target.metadata ?? {}), + ...(patch.metadata ?? {}), + }, + tags: uniqueTags([...(target.tags ?? []), ...(patch.tags ?? [])]), + derivedFields: { + ...(target.derivedFields ?? {}), + ...(patch.derivedFields ?? {}), + }, + }; +} + +export class EnrichmentValidationError extends Error { + constructor(public readonly validation: EnrichmentValidationResult) { + super("Enrichment validation failed"); + } +} + +export class EnrichmentPipelineService { + constructor( + private readonly adapters: EnrichmentProviderAdapter[] = createDefaultEnrichmentAdapters(), + private readonly retryPolicy: RetryPolicyService = retryPolicyService, + ) {} + + async enrich>(record: EnrichmentRecord): Promise> { + const adapters = this.adapters.filter((adapter) => adapter.supports(record)); + let patch: EnrichmentPatch = { metadata: {}, tags: [], derivedFields: {} }; + let attempts = 0; + + for (const adapter of adapters) { + const adapterPatch = await this.runAdapterWithRetry(adapter, record); + attempts += adapterPatch.attempts; + patch = mergePatch(patch, adapterPatch.patch); + } + + const result: EnrichmentResult = { + record, + metadata: patch.metadata ?? {}, + tags: uniqueTags(patch.tags ?? []), + derivedFields: patch.derivedFields ?? {}, + validation: { valid: true, issues: [] }, + attempts, + }; + + result.validation = this.validate(result); + if (!result.validation.valid) { + throw new EnrichmentValidationError(result.validation); + } + + return result; + } + + validate(result: Pick): EnrichmentValidationResult { + const issues: EnrichmentValidationResult["issues"] = []; + + if (!result.metadata || typeof result.metadata !== "object" || Array.isArray(result.metadata)) { + issues.push({ field: "metadata", code: "invalid_metadata", message: "Metadata must be an object" }); + } + + if (!Array.isArray(result.tags)) { + issues.push({ field: "tags", code: "invalid_tags", message: "Tags must be an array" }); + } else { + result.tags.forEach((tag, index) => { + if (typeof tag !== "string" || !/^[a-z0-9:_-]+$/.test(tag)) { + issues.push({ field: `tags.${index}`, code: "invalid_tag", message: "Tags must be normalized strings" }); + } + }); + } + + if (!result.derivedFields || typeof result.derivedFields !== "object" || Array.isArray(result.derivedFields)) { + issues.push({ field: "derivedFields", code: "invalid_derived_fields", message: "Derived fields must be an object" }); + } + + return { valid: issues.length === 0, issues }; + } + + private async runAdapterWithRetry>( + adapter: EnrichmentProviderAdapter, + record: EnrichmentRecord, + ): Promise<{ patch: EnrichmentPatch; attempts: number }> { + const policy = this.retryPolicy.getPolicy({ operation: `enrichment.${adapter.name}`, maxRetries: 2, baseDelayMs: 25 }); + let attempt = 0; + + while (attempt <= policy.maxRetries) { + attempt += 1; + try { + return { patch: await adapter.enrich(record), attempts: attempt }; + } catch (error) { + const failureClass = this.retryPolicy.classifyFailure(error); + const exhausted = attempt > policy.maxRetries || !this.retryPolicy.isRetryable(error); + this.retryPolicy.recordRetryMetric( + `enrichment.${adapter.name}`, + exhausted ? "exhausted" : "scheduled", + attempt, + failureClass, + ); + + if (exhausted) throw error; + await new Promise((resolve) => setTimeout(resolve, this.retryPolicy.getDelayMs(attempt, policy))); + } + } + + return { patch: {}, attempts: attempt }; + } +} + +export const enrichmentPipelineService = new EnrichmentPipelineService(); diff --git a/backend/src/services/enrichment/index.ts b/backend/src/services/enrichment/index.ts new file mode 100644 index 0000000..04c287c --- /dev/null +++ b/backend/src/services/enrichment/index.ts @@ -0,0 +1,3 @@ +export * from "./types.js"; +export * from "./providerAdapters.js"; +export * from "./enrichmentPipeline.service.js"; diff --git a/backend/src/services/enrichment/providerAdapters.ts b/backend/src/services/enrichment/providerAdapters.ts new file mode 100644 index 0000000..b2dd2c6 --- /dev/null +++ b/backend/src/services/enrichment/providerAdapters.ts @@ -0,0 +1,148 @@ +import type { EnrichmentProviderAdapter, EnrichmentRecord } from "./types.js"; + +function normalizeString(value: unknown): string | null { + if (typeof value !== "string") return null; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function normalizeTag(value: string): string { + return value + .trim() + .toLowerCase() + .replace(/[^a-z0-9:_-]+/g, "-") + .replace(/-+/g, "-") + .replace(/^-|-$/g, ""); +} + +function hostFromUrl(value: unknown): string | null { + const url = normalizeString(value); + if (!url) return null; + + try { + return new URL(url).hostname.toLowerCase(); + } catch { + return null; + } +} + +function severityWeight(severity: unknown): number { + switch (normalizeString(severity)?.toLowerCase()) { + case "critical": + return 100; + case "high": + return 80; + case "medium": + return 50; + case "low": + return 20; + default: + return 50; + } +} + +function riskBand(weight: number): "critical" | "elevated" | "standard" { + if (weight >= 90) return "critical"; + if (weight >= 70) return "elevated"; + return "standard"; +} + +export class IncidentMetadataAdapter implements EnrichmentProviderAdapter { + name = "incident-metadata"; + + supports(record: EnrichmentRecord): boolean { + return record.recordType === "incident"; + } + + enrich(record: EnrichmentRecord) { + const occurredAt = normalizeString(record.data.occurredAt); + const sourceHost = hostFromUrl(record.data.sourceUrl); + const assetCode = normalizeString(record.data.assetCode); + const sourceType = normalizeString(record.data.sourceType) ?? "webhook"; + + return { + metadata: { + provider: record.provider, + recordType: record.recordType, + sourceType, + sourceExternalId: normalizeString(record.data.sourceExternalId), + sourceHost, + receivedAt: new Date().toISOString(), + hasAssetCode: Boolean(assetCode), + }, + derivedFields: { + occurredAtIso: occurredAt, + sourceHost, + normalizedAssetCode: assetCode?.toUpperCase() ?? null, + }, + }; + } +} + +export class IncidentTaggingAdapter implements EnrichmentProviderAdapter { + name = "incident-tagging"; + + supports(record: EnrichmentRecord): boolean { + return record.recordType === "incident"; + } + + enrich(record: EnrichmentRecord) { + const tags = [ + `source:${normalizeString(record.data.sourceType) ?? "webhook"}`, + `severity:${normalizeString(record.data.severity) ?? "medium"}`, + ]; + + const bridgeId = normalizeString(record.data.bridgeId); + const assetCode = normalizeString(record.data.assetCode); + const sourceHost = hostFromUrl(record.data.sourceUrl); + + if (bridgeId) tags.push(`bridge:${bridgeId}`); + if (assetCode) { + tags.push(`asset:${assetCode}`); + if (["USDC", "USDT", "EURC", "DAI"].includes(assetCode.toUpperCase())) { + tags.push("asset:stablecoin"); + } + } + if (sourceHost) tags.push(`source-host:${sourceHost}`); + if (record.data.requiresManualReview === true) tags.push("workflow:manual-review"); + + return { + tags: tags.map(normalizeTag).filter(Boolean), + }; + } +} + +export class IncidentDerivedFieldsAdapter implements EnrichmentProviderAdapter { + name = "incident-derived-fields"; + + supports(record: EnrichmentRecord): boolean { + return record.recordType === "incident"; + } + + enrich(record: EnrichmentRecord) { + const weight = severityWeight(record.data.severity); + const followUpActions = Array.isArray(record.data.followUpActions) ? record.data.followUpActions : []; + const occurredAt = normalizeString(record.data.occurredAt); + const occurredMs = occurredAt ? new Date(occurredAt).getTime() : Number.NaN; + + return { + metadata: { + severityWeight: weight, + followUpActionCount: followUpActions.length, + }, + derivedFields: { + priorityScore: Math.min(100, weight + Math.min(20, followUpActions.length * 5)), + riskBand: riskBand(weight), + ageMs: Number.isNaN(occurredMs) ? null : Math.max(0, Date.now() - occurredMs), + }, + }; + } +} + +export function createDefaultEnrichmentAdapters(): EnrichmentProviderAdapter[] { + return [ + new IncidentMetadataAdapter(), + new IncidentTaggingAdapter(), + new IncidentDerivedFieldsAdapter(), + ]; +} diff --git a/backend/src/services/enrichment/types.ts b/backend/src/services/enrichment/types.ts new file mode 100644 index 0000000..3b22c5d --- /dev/null +++ b/backend/src/services/enrichment/types.ts @@ -0,0 +1,40 @@ +export type EnrichmentRecordType = "incident" | "transaction" | "asset" | "bridge" | string; + +export interface EnrichmentRecord = Record> { + recordType: EnrichmentRecordType; + provider: string; + data: TData; + context?: Record; +} + +export interface EnrichmentPatch { + metadata?: Record; + tags?: string[]; + derivedFields?: Record; +} + +export interface EnrichmentValidationIssue { + field: string; + code: string; + message: string; +} + +export interface EnrichmentValidationResult { + valid: boolean; + issues: EnrichmentValidationIssue[]; +} + +export interface EnrichmentResult = Record> { + record: EnrichmentRecord; + metadata: Record; + tags: string[]; + derivedFields: Record; + validation: EnrichmentValidationResult; + attempts: number; +} + +export interface EnrichmentProviderAdapter = Record> { + name: string; + supports(record: EnrichmentRecord): boolean; + enrich(record: EnrichmentRecord): Promise | EnrichmentPatch; +} diff --git a/backend/src/services/incident.service.ts b/backend/src/services/incident.service.ts index f4e3e4c..f44cd6f 100644 --- a/backend/src/services/incident.service.ts +++ b/backend/src/services/incident.service.ts @@ -1,5 +1,6 @@ import { getDatabase } from "../database/connection.js"; import { logger } from "../utils/logger.js"; +import { enrichmentPipelineService } from "./enrichment/index.js"; export type IncidentSeverity = "critical" | "high" | "medium" | "low"; export type IncidentStatus = "open" | "investigating" | "resolved"; @@ -19,6 +20,10 @@ export interface BridgeIncident { sourceRepoAvatarUrl: string | null; sourceActor: string | null; sourceAttribution: Record; + enrichmentMetadata: Record; + enrichmentTags: string[]; + derivedFields: Record; + enrichmentValidation: Record; requiresManualReview: boolean; ingestionAttemptCount: number; lastIngestionError: string | null; @@ -52,6 +57,10 @@ export interface CreateIncidentPayload { sourceRepoAvatarUrl?: string; sourceActor?: string; sourceAttribution?: Record; + enrichmentMetadata?: Record; + enrichmentTags?: string[]; + derivedFields?: Record; + enrichmentValidation?: Record; followUpActions?: string[]; occurredAt?: string; } @@ -86,6 +95,32 @@ export class IncidentService { } async createIncident(payload: CreateIncidentPayload): Promise { + const enrichment = await enrichmentPipelineService.enrich({ + recordType: "incident", + provider: payload.sourceType ?? "manual", + data: { + sourceType: payload.sourceType ?? "manual", + sourceExternalId: payload.sourceExternalId ?? null, + bridgeId: payload.bridgeId, + assetCode: payload.assetCode ?? null, + severity: payload.severity, + title: payload.title, + description: payload.description, + sourceUrl: payload.sourceUrl ?? null, + occurredAt: payload.occurredAt ?? new Date().toISOString(), + followUpActions: payload.followUpActions ?? [], + requiresManualReview: false, + }, + context: { + rawMetadata: payload.sourceAttribution ?? {}, + }, + }); + + const enrichmentMetadata = { + ...enrichment.metadata, + rawMetadata: payload.sourceAttribution ?? {}, + }; + const [row] = await this.db("bridge_incidents") .insert({ bridge_id: payload.bridgeId, @@ -99,7 +134,23 @@ export class IncidentService { source_repository: payload.sourceRepository ?? null, source_repo_avatar_url: payload.sourceRepoAvatarUrl ?? null, source_actor: payload.sourceActor ?? null, - source_attribution: JSON.stringify(payload.sourceAttribution ?? {}), + source_attribution: JSON.stringify({ + ...(payload.sourceAttribution ?? {}), + enrichment: { + metadata: enrichmentMetadata, + tags: enrichment.tags, + derivedFields: enrichment.derivedFields, + validation: enrichment.validation, + attempts: enrichment.attempts, + }, + }), + enrichment_metadata: JSON.stringify(payload.enrichmentMetadata ?? enrichmentMetadata), + enrichment_tags: payload.enrichmentTags ?? enrichment.tags, + derived_fields: JSON.stringify(payload.derivedFields ?? enrichment.derivedFields), + enrichment_validation: JSON.stringify(payload.enrichmentValidation ?? { + ...enrichment.validation, + attempts: enrichment.attempts, + }), follow_up_actions: JSON.stringify(payload.followUpActions ?? []), occurred_at: payload.occurredAt ? new Date(payload.occurredAt) : new Date(), }) @@ -159,6 +210,18 @@ export class IncidentService { sourceAttribution: typeof row.source_attribution === "object" && row.source_attribution !== null ? (row.source_attribution as Record) : JSON.parse((row.source_attribution as string) || "{}"), + enrichmentMetadata: typeof row.enrichment_metadata === "object" && row.enrichment_metadata !== null + ? (row.enrichment_metadata as Record) + : JSON.parse((row.enrichment_metadata as string) || "{}"), + enrichmentTags: Array.isArray(row.enrichment_tags) + ? (row.enrichment_tags as string[]) + : [], + derivedFields: typeof row.derived_fields === "object" && row.derived_fields !== null + ? (row.derived_fields as Record) + : JSON.parse((row.derived_fields as string) || "{}"), + enrichmentValidation: typeof row.enrichment_validation === "object" && row.enrichment_validation !== null + ? (row.enrichment_validation as Record) + : JSON.parse((row.enrichment_validation as string) || "{}"), requiresManualReview: Boolean(row.requires_manual_review), ingestionAttemptCount: Number(row.ingestion_attempt_count ?? 0), lastIngestionError: (row.last_ingestion_error as string | null) ?? null, diff --git a/backend/src/services/incidentIngestion.service.ts b/backend/src/services/incidentIngestion.service.ts index a6e2aa0..1c0be4a 100644 --- a/backend/src/services/incidentIngestion.service.ts +++ b/backend/src/services/incidentIngestion.service.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import { getDatabase } from "../database/connection.js"; import { logger } from "../utils/logger.js"; +import { enrichmentPipelineService, type EnrichmentResult } from "./enrichment/index.js"; import { IncidentService, type IncidentSeverity, type BridgeIncident } from "./incident.service.js"; export type IncidentSourceType = "github" | "webhook" | "partner" | "manual"; @@ -52,6 +53,10 @@ interface NormalizedIncident { sourceRepoAvatarUrl: string | null; sourceActor: string | null; sourceAttribution: Record; + enrichmentMetadata: Record; + enrichmentTags: string[]; + derivedFields: Record; + enrichmentValidation: Record; normalizedFingerprint: string; requiresManualReview: boolean; reviewReason: string | null; @@ -141,6 +146,10 @@ export class IncidentIngestionService { sourceUrl, metadata: raw.metadata ?? {}, }, + enrichmentMetadata: {}, + enrichmentTags: [], + derivedFields: {}, + enrichmentValidation: {}, normalizedFingerprint, requiresManualReview: missing.length > 0, reviewReason: missing.length > 0 ? missing.join(",") : null, @@ -148,7 +157,7 @@ export class IncidentIngestionService { } async ingest(raw: RawIncidentPayload): Promise { - const normalized = this.normalize(raw); + const normalized = await this.enrichNormalized(this.normalize(raw), raw); if (normalized.requiresManualReview) { await this.enqueueReview(normalized, raw); @@ -284,6 +293,10 @@ export class IncidentIngestionService { source_repo_avatar_url: normalized.sourceRepoAvatarUrl, source_actor: normalized.sourceActor, source_attribution: JSON.stringify(normalized.sourceAttribution), + enrichment_metadata: JSON.stringify(normalized.enrichmentMetadata), + enrichment_tags: normalized.enrichmentTags, + derived_fields: JSON.stringify(normalized.derivedFields), + enrichment_validation: JSON.stringify(normalized.enrichmentValidation), normalized_fingerprint: normalized.normalizedFingerprint, requires_manual_review: false, ingestion_attempt_count: 1, @@ -299,6 +312,12 @@ export class IncidentIngestionService { source_type: normalized.sourceType, source_external_id: normalized.sourceExternalId, raw_payload: JSON.stringify(raw), + enriched_payload: JSON.stringify({ + metadata: normalized.enrichmentMetadata, + tags: normalized.enrichmentTags, + derivedFields: normalized.derivedFields, + validation: normalized.enrichmentValidation, + }), reason: normalized.reviewReason, status: "pending", incident_id: null, @@ -319,12 +338,73 @@ export class IncidentIngestionService { source_external_id: input.normalized.sourceExternalId, event_type: input.eventType, payload: JSON.stringify(input.normalized.sourceAttribution), + enrichment_metadata: JSON.stringify(input.normalized.enrichmentMetadata), + enrichment_tags: input.normalized.enrichmentTags, + derived_fields: JSON.stringify(input.normalized.derivedFields), status: input.status, error_message: input.errorMessage ?? null, attempt_number: input.attemptNumber, }); } + private async enrichNormalized(normalized: NormalizedIncident, raw: RawIncidentPayload): Promise { + const enrichment = await enrichmentPipelineService.enrich({ + recordType: "incident", + provider: normalized.sourceType, + data: { + sourceType: normalized.sourceType, + sourceExternalId: normalized.sourceExternalId, + bridgeId: normalized.bridgeId, + assetCode: normalized.assetCode, + severity: normalized.severity, + title: normalized.title, + description: normalized.description, + sourceUrl: normalized.sourceUrl, + occurredAt: normalized.occurredAt, + followUpActions: normalized.followUpActions, + requiresManualReview: normalized.requiresManualReview, + }, + context: { + rawMetadata: raw.metadata ?? {}, + repository: normalized.sourceRepository, + actor: normalized.sourceActor, + }, + }); + + return this.applyEnrichment(normalized, enrichment); + } + + private applyEnrichment( + normalized: NormalizedIncident, + enrichment: EnrichmentResult, + ): NormalizedIncident { + const enrichmentMetadata = { + ...enrichment.metadata, + rawMetadata: normalized.sourceAttribution.metadata ?? {}, + }; + + return { + ...normalized, + sourceAttribution: { + ...normalized.sourceAttribution, + enrichment: { + metadata: enrichmentMetadata, + tags: enrichment.tags, + derivedFields: enrichment.derivedFields, + validation: enrichment.validation, + attempts: enrichment.attempts, + }, + }, + enrichmentMetadata, + enrichmentTags: enrichment.tags, + derivedFields: enrichment.derivedFields, + enrichmentValidation: { + ...enrichment.validation, + attempts: enrichment.attempts, + }, + }; + } + private mapSeverity(sourceSeverity: string | undefined): IncidentSeverity { const key = this.normalizeString(sourceSeverity)?.toLowerCase(); if (!key) return "medium"; diff --git a/backend/tests/services/enrichment/enrichmentPipeline.service.test.ts b/backend/tests/services/enrichment/enrichmentPipeline.service.test.ts new file mode 100644 index 0000000..2e3c2cc --- /dev/null +++ b/backend/tests/services/enrichment/enrichmentPipeline.service.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it, vi } from "vitest"; +import { EnrichmentPipelineService } from "../../../src/services/enrichment/enrichmentPipeline.service.js"; +import type { EnrichmentProviderAdapter } from "../../../src/services/enrichment/types.js"; + +function retryPolicyStub() { + return { + getPolicy: vi.fn(() => ({ + maxRetries: 2, + baseDelayMs: 1, + maxDelayMs: 1, + backoffMultiplier: 1, + jitterRatio: 0, + })), + classifyFailure: vi.fn(() => "transient"), + isRetryable: vi.fn(() => true), + recordRetryMetric: vi.fn(), + getDelayMs: vi.fn(() => 1), + }; +} + +describe("EnrichmentPipelineService", () => { + it("applies incident metadata, tags, and derived fields", async () => { + const service = new EnrichmentPipelineService(undefined, retryPolicyStub() as any); + + const result = await service.enrich({ + recordType: "incident", + provider: "github", + data: { + sourceType: "github", + sourceExternalId: "evt-1", + bridgeId: "wormhole", + assetCode: "USDC", + severity: "high", + sourceUrl: "https://github.com/StellaBridge/Bridge-Watch/issues/1", + occurredAt: "2026-04-25T10:30:00.000Z", + followUpActions: ["Verify reserves"], + }, + }); + + expect(result.metadata).toMatchObject({ + provider: "github", + recordType: "incident", + sourceType: "github", + sourceHost: "github.com", + severityWeight: 80, + }); + expect(result.tags).toEqual(expect.arrayContaining([ + "asset:stablecoin", + "asset:usdc", + "bridge:wormhole", + "severity:high", + "source:github", + ])); + expect(result.derivedFields).toMatchObject({ + normalizedAssetCode: "USDC", + priorityScore: 85, + riskBand: "elevated", + sourceHost: "github.com", + }); + expect(result.validation.valid).toBe(true); + }); + + it("retries retryable provider adapter failures", async () => { + const retry = retryPolicyStub(); + const adapter: EnrichmentProviderAdapter = { + name: "flaky", + supports: () => true, + enrich: vi.fn() + .mockRejectedValueOnce(new Error("temporary timeout")) + .mockResolvedValueOnce({ tags: ["source:webhook"] }), + }; + const service = new EnrichmentPipelineService([adapter], retry as any); + + const result = await service.enrich({ + recordType: "incident", + provider: "webhook", + data: {}, + }); + + expect(adapter.enrich).toHaveBeenCalledTimes(2); + expect(retry.recordRetryMetric).toHaveBeenCalledWith( + "enrichment.flaky", + "scheduled", + 1, + "transient", + ); + expect(result.tags).toEqual(["source:webhook"]); + expect(result.attempts).toBe(2); + }); +}); diff --git a/backend/tests/services/incidentIngestion.service.test.ts b/backend/tests/services/incidentIngestion.service.test.ts index 1c02639..5320a3d 100644 --- a/backend/tests/services/incidentIngestion.service.test.ts +++ b/backend/tests/services/incidentIngestion.service.test.ts @@ -22,6 +22,19 @@ vi.mock("../../src/utils/logger.js", () => ({ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, })); +vi.mock("../../src/services/enrichment/index.js", () => ({ + enrichmentPipelineService: { + enrich: vi.fn(async () => ({ + metadata: { provider: "github", sourceHost: "github.com" }, + tags: ["source:github", "severity:high", "asset:usdc"], + derivedFields: { normalizedAssetCode: "USDC", priorityScore: 85 }, + validation: { valid: true, issues: [] }, + attempts: 1, + record: {}, + })), + }, +})); + describe("IncidentIngestionService.normalize", () => { const service = new IncidentIngestionService(); @@ -63,3 +76,82 @@ describe("IncidentIngestionService.normalize", () => { expect(normalized.reviewReason).toContain("missing_title"); }); }); + +describe("IncidentIngestionService.ingest", () => { + it("persists enrichment fields before inserting an incident", async () => { + const inserts: Array<{ table: string; payload: Record }> = []; + const incidentRow = { + id: "incident-1", + bridge_id: "wormhole", + asset_code: "USDC", + severity: "high", + status: "open", + title: "Liquidity drift detected", + description: "Pool balance diverged beyond threshold", + source_url: "https://github.com/StellaBridge/Bridge-Watch/issues/1", + source_type: "github", + source_external_id: "evt-123", + source_repository: "StellaBridge/Bridge-Watch", + source_repo_avatar_url: null, + source_actor: "bridge-bot", + source_attribution: "{}", + enrichment_metadata: "{}", + enrichment_tags: ["source:github"], + derived_fields: "{}", + enrichment_validation: "{}", + requires_manual_review: false, + ingestion_attempt_count: 1, + last_ingestion_error: null, + normalized_fingerprint: "a".repeat(64), + follow_up_actions: "[]", + occurred_at: new Date("2026-04-25T10:30:00.000Z"), + resolved_at: null, + created_at: new Date("2026-04-25T10:31:00.000Z"), + updated_at: new Date("2026-04-25T10:31:00.000Z"), + }; + + const db = (table: string) => { + const chain = { + where: vi.fn().mockReturnThis(), + first: vi.fn().mockResolvedValue(null), + insert: vi.fn((payload: Record) => { + inserts.push({ table, payload }); + return chain; + }), + returning: vi.fn().mockResolvedValue([incidentRow]), + orderBy: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + select: vi.fn().mockResolvedValue([]), + }; + return chain; + }; + + const { getDatabase } = await import("../../src/database/connection.js"); + vi.mocked(getDatabase).mockReturnValue(db as any); + const service = new IncidentIngestionService(); + + await service.ingest({ + sourceType: "github", + externalId: "evt-123", + bridgeId: "wormhole", + assetCode: "USDC", + severity: "sev1", + title: "Liquidity drift detected", + description: "Pool balance diverged beyond threshold", + sourceUrl: "https://github.com/StellaBridge/Bridge-Watch/issues/1", + occurredAt: "2026-04-25T10:30:00.000Z", + }); + + const incidentInsert = inserts.find((insert) => insert.table === "bridge_incidents"); + expect(incidentInsert?.payload).toMatchObject({ + enrichment_metadata: JSON.stringify({ + provider: "github", + sourceHost: "github.com", + rawMetadata: {}, + }), + enrichment_tags: ["source:github", "severity:high", "asset:usdc"], + derived_fields: JSON.stringify({ normalizedAssetCode: "USDC", priorityScore: 85 }), + enrichment_validation: JSON.stringify({ valid: true, issues: [], attempts: 1 }), + }); + }); +});