From a916faec89696974b88d8c4695de8541fadf50f1 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 30 Nov 2025 16:26:18 +0700 Subject: [PATCH 1/8] feat: unoptimized implementation of chlogstore that conforms to test suite --- internal/logstore/chlogstore/chlogstore.go | 627 ++++++++++++++---- .../logstore/chlogstore/chlogstore_test.go | 9 +- 2 files changed, 505 insertions(+), 131 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index f347ba48..f8126f83 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -2,6 +2,10 @@ package chlogstore import ( "context" + "encoding/json" + "fmt" + "strings" + "time" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/logstore/driver" @@ -19,127 +23,383 @@ func NewLogStore(chDB clickhouse.DB) driver.LogStore { } func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRequest) (driver.ListEventResponse, error) { - // TODO: implement - return driver.ListEventResponse{}, nil - - // var ( - // query string - // queryOpts []any - // ) - - // var cursor string - // if cursorTime, err := time.Parse(time.RFC3339, request.Cursor); err == nil { - // cursor = cursorTime.Format("2006-01-02T15:04:05") // RFC3339 without timezone - // } - - // if cursor == "" { - // query = ` - // SELECT - // id, - // tenant_id, - // destination_id, - // time, - // topic, - // eligible_for_retry, - // data, - // metadata - // FROM events - // WHERE tenant_id = ? - // AND (? = 0 OR destination_id IN ?) - // ORDER BY time DESC - // LIMIT ? - // ` - // queryOpts = []any{request.TenantID, len(request.DestinationIDs), request.DestinationIDs, request.Limit} - // } else { - // query = ` - // SELECT - // id, - // tenant_id, - // destination_id, - // time, - // topic, - // eligible_for_retry, - // data, - // metadata - // FROM events - // WHERE tenant_id = ? AND time < ? - // AND (? = 0 OR destination_id IN ?) - // ORDER BY time DESC - // LIMIT ? - // ` - // queryOpts = []any{request.TenantID, cursor, len(request.DestinationIDs), request.DestinationIDs, request.Limit} - // } - // rows, err := s.chDB.Query(ctx, query, queryOpts...) - // if err != nil { - // return driver.ListEventResponse{}, err - // } - // defer rows.Close() - - // var events []*models.Event - // for rows.Next() { - // event := &models.Event{} - // if err := rows.Scan( - // &event.ID, - // &event.TenantID, - // &event.DestinationID, - // &event.Time, - // &event.Topic, - // &event.EligibleForRetry, - // &event.Data, - // &event.Metadata, - // ); err != nil { - // return driver.ListEventResponse{}, err - // } - // events = append(events, event) - // } - // var nextCursor string - // if len(events) > 0 { - // nextCursor = events[len(events)-1].Time.Format(time.RFC3339) - // } - - // return driver.ListEventResponse{ - // Data: events, - // Next: nextCursor, - // Count: int64(len(events)), - // }, nil -} + // Build the main query with CTE to get events with their latest delivery status + // Decision: Use CTEs similar to PG approach for clarity, ClickHouse's argMax is perfect for getting latest status -func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error) { - rows, err := s.chDB.Query(ctx, ` - SELECT - id, - tenant_id, - destination_id, - time, - topic, - eligible_for_retry, - data, - metadata - FROM events - WHERE tenant_id = ? AND id = ? - `, tenantID, eventID, - ) + // Set default time range + start := request.Start + end := request.End + if start == nil && end == nil { + // Default to last 1 hour + now := time.Now() + oneHourAgo := now.Add(-1 * time.Hour) + start = &oneHourAgo + end = &now + } else if start == nil && end != nil { + // Default start to end - 1 hour + oneHourBefore := end.Add(-1 * time.Hour) + start = &oneHourBefore + } else if start != nil && end == nil { + // Default end to now + now := time.Now() + end = &now + } + + limit := request.Limit + if limit == 0 { + limit = 100 // Default limit + } + + // Build dynamic query parts using simple ? placeholders + // We'll provide args in order and duplicate where needed + var destFilterSubquery, topicFilterSubquery string + var destFilterMain, topicFilterMain string + var cursorFilter string + + var args []interface{} + + // Base args for subquery: tenant_id, start, end + args = append(args, request.TenantID, *start, *end) + + // Add destination filter for subquery + if len(request.DestinationIDs) > 0 { + args = append(args, request.DestinationIDs) + destFilterSubquery = " AND destination_id IN (?)" + } + + // Add topic filter for subquery + if len(request.Topics) > 0 { + args = append(args, request.Topics) + topicFilterSubquery = " AND topic IN (?)" + } + + // Now add args for main query: tenant_id, start, end (duplicated) + args = append(args, request.TenantID, *start, *end) + + // Add destination filter for main query + if len(request.DestinationIDs) > 0 { + args = append(args, request.DestinationIDs) + destFilterMain = " AND e.destination_id IN (?)" + } + + // Add topic filter for main query + if len(request.Topics) > 0 { + args = append(args, request.Topics) + topicFilterMain = " AND e.topic IN (?)" + } + + // Add status filter (will use HAVING since status is an aggregate) + var havingFilter string + if request.Status != "" { + args = append(args, request.Status) + havingFilter = " HAVING status = ?" + } + + // Add cursor filter and determine sort order + // For Prev cursor: query ascending to get the right window, then reverse in code + // For Next/no cursor: query descending + var orderBy string + if request.Prev != "" { + cursorTime, cursorID, err := parseCursor(request.Prev) + if err != nil { + return driver.ListEventResponse{}, fmt.Errorf("invalid prev cursor: %w", err) + } + args = append(args, cursorTime, cursorTime, cursorID) + cursorFilter = " WHERE (time > ? OR (time = ? AND id > ?))" + orderBy = "ORDER BY time ASC, id ASC" // Ascending for Prev to get right window + } else { + if request.Next != "" { + cursorTime, cursorID, err := parseCursor(request.Next) + if err != nil { + return driver.ListEventResponse{}, fmt.Errorf("invalid next cursor: %w", err) + } + args = append(args, cursorTime, cursorTime, cursorID) + cursorFilter = " WHERE (time < ? OR (time = ? AND id < ?))" + } + orderBy = "ORDER BY time DESC, id DESC" // Descending for Next/first page + } + + query := fmt.Sprintf(` + WITH latest_deliveries AS ( + SELECT + event_id, + argMax(status, time) as status, + max(time) as delivery_time + FROM deliveries + WHERE event_id IN ( + SELECT DISTINCT id + FROM events + WHERE tenant_id = ? + AND time >= ? + AND time <= ? + %s + %s + ) + GROUP BY event_id + ), + events_with_status AS ( + SELECT + e.id, + argMax(e.tenant_id, e.time) as tenant_id, + argMax(e.destination_id, e.time) as destination_id, + max(e.time) as time, + argMax(e.topic, e.time) as topic, + argMax(e.eligible_for_retry, e.time) as eligible_for_retry, + argMax(e.metadata, e.time) as metadata, + argMax(e.data, e.time) as data, + argMax(COALESCE(ld.status, 'pending'), e.time) as status, + argMax(COALESCE(ld.delivery_time, e.time), e.time) as delivery_time + FROM events e + LEFT JOIN latest_deliveries ld ON e.id = ld.event_id + WHERE e.tenant_id = ? + AND e.time >= ? + AND e.time <= ? + %s + %s + GROUP BY e.id + %s + ) + SELECT * FROM events_with_status + %s + %s + LIMIT %d + `, destFilterSubquery, topicFilterSubquery, destFilterMain, topicFilterMain, havingFilter, cursorFilter, orderBy, limit+1) + + rows, err := s.chDB.Query(ctx, query, args...) if err != nil { - return nil, err + return driver.ListEventResponse{}, fmt.Errorf("query failed: %w", err) } defer rows.Close() - if !rows.Next() { - return nil, nil + var events []*models.Event + for rows.Next() { + var metadataStr, dataStr string + var deliveryTime time.Time + event := &models.Event{} + + if err := rows.Scan( + &event.ID, + &event.TenantID, + &event.DestinationID, + &event.Time, + &event.Topic, + &event.EligibleForRetry, + &metadataStr, + &dataStr, + &event.Status, + &deliveryTime, + ); err != nil { + return driver.ListEventResponse{}, fmt.Errorf("scan failed: %w", err) + } + + // Unmarshal JSON strings + if metadataStr != "" { + if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { + return driver.ListEventResponse{}, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + } + if dataStr != "" { + if err := json.Unmarshal([]byte(dataStr), &event.Data); err != nil { + return driver.ListEventResponse{}, fmt.Errorf("failed to unmarshal data: %w", err) + } + } + + events = append(events, event) + } + + if err := rows.Err(); err != nil { + return driver.ListEventResponse{}, fmt.Errorf("rows iteration error: %w", err) + } + + // Handle pagination + var hasNext, hasPrev bool + if request.Prev != "" { + // Going backward - we came backwards, so definitely more ahead + // Events are in ASC order from query, trim last (oldest), then reverse to DESC + hasNext = true + hasPrev = len(events) > int(limit) + if hasPrev { + events = events[:len(events)-1] // Trim last item (oldest in ASC order) + } + // Reverse the slice to get DESC order + for i := 0; i < len(events)/2; i++ { + events[i], events[len(events)-1-i] = events[len(events)-1-i], events[i] + } + } else if request.Next != "" { + // Going forward - we came forwards, so definitely more behind + // Events are already in DESC order from query + hasPrev = true + hasNext = len(events) > int(limit) + if hasNext { + events = events[:limit] // Trim the extra item + } + } else { + // First page - events are in DESC order from query + hasPrev = false + hasNext = len(events) > int(limit) + if hasNext { + events = events[:limit] // Trim the extra item + } + } + + // Get total count (separate query) + // Decision: Use a separate count query for accuracy (ClickHouse is fast at this) + countQuery := fmt.Sprintf(` + WITH latest_deliveries AS ( + SELECT + event_id, + argMax(status, time) as status + FROM deliveries + WHERE event_id IN ( + SELECT DISTINCT id + FROM events + WHERE tenant_id = ? + AND time >= ? + AND time <= ? + %s + %s + ) + GROUP BY event_id + ) + SELECT COUNT(*) FROM ( + SELECT e.id, argMax(COALESCE(ld.status, 'pending'), e.time) as status + FROM events e + LEFT JOIN latest_deliveries ld ON e.id = ld.event_id + WHERE e.tenant_id = ? + AND e.time >= ? + AND e.time <= ? + %s + %s + GROUP BY e.id + %s + ) + `, destFilterSubquery, topicFilterSubquery, destFilterMain, topicFilterMain, havingFilter) + + // Build count args (same as query args but without cursor) + var countArgs []interface{} + countArgs = append(countArgs, request.TenantID, *start, *end) + if len(request.DestinationIDs) > 0 { + countArgs = append(countArgs, request.DestinationIDs) + } + if len(request.Topics) > 0 { + countArgs = append(countArgs, request.Topics) + } + countArgs = append(countArgs, request.TenantID, *start, *end) + if len(request.DestinationIDs) > 0 { + countArgs = append(countArgs, request.DestinationIDs) + } + if len(request.Topics) > 0 { + countArgs = append(countArgs, request.Topics) + } + if request.Status != "" { + countArgs = append(countArgs, request.Status) + } + + var totalCount uint64 + if err := s.chDB.QueryRow(ctx, countQuery, countArgs...).Scan(&totalCount); err != nil { + return driver.ListEventResponse{}, fmt.Errorf("count query failed: %w", err) + } + + // Build cursors + var nextCursor, prevCursor string + if len(events) > 0 { + if hasNext { + lastEvent := events[len(events)-1] + nextCursor = formatCursor(lastEvent.Time, lastEvent.ID) + } + if hasPrev { + firstEvent := events[0] + prevCursor = formatCursor(firstEvent.Time, firstEvent.ID) + } + } + + return driver.ListEventResponse{ + Data: events, + Next: nextCursor, + Prev: prevCursor, + Count: int64(totalCount), + }, nil +} + +// formatCursor creates a cursor from time and ID +// Decision: Use simple "timestamp|id" format +func formatCursor(t time.Time, id string) string { + return fmt.Sprintf("%d|%s", t.Unix(), id) +} + +// parseCursor extracts time and ID from cursor +func parseCursor(cursor string) (time.Time, string, error) { + parts := strings.Split(cursor, "|") + if len(parts) != 2 { + return time.Time{}, "", fmt.Errorf("invalid cursor format") } + var unixTime int64 + if _, err := fmt.Sscanf(parts[0], "%d", &unixTime); err != nil { + return time.Time{}, "", fmt.Errorf("invalid timestamp in cursor: %w", err) + } + timestamp := time.Unix(unixTime, 0) + + return timestamp, parts[1], nil +} + +func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error) { + // Query event with status calculation + // Decision: Use a CTE to get the latest delivery status, similar to PG approach + query := ` + WITH latest_delivery AS ( + SELECT argMax(status, time) as status + FROM deliveries + WHERE event_id = ? + ) + SELECT + e.id, + e.tenant_id, + e.destination_id, + e.time, + e.topic, + e.eligible_for_retry, + e.metadata, + e.data, + COALESCE(ld.status, 'pending') as status + FROM events e + LEFT JOIN latest_delivery ld ON true + WHERE e.tenant_id = ? AND e.id = ? + ` + + row := s.chDB.QueryRow(ctx, query, eventID, tenantID, eventID) + + var metadataStr, dataStr string event := &models.Event{} - if err := rows.Scan( + err := row.Scan( &event.ID, &event.TenantID, &event.DestinationID, &event.Time, &event.Topic, &event.EligibleForRetry, - &event.Data, - &event.Metadata, - ); err != nil { - return nil, err + &metadataStr, + &dataStr, + &event.Status, + ) + if err != nil { + // ClickHouse returns an error when no rows, not sql.ErrNoRows + if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "no rows") { + return nil, nil + } + return nil, fmt.Errorf("query failed: %w", err) + } + + // Unmarshal JSON strings + if metadataStr != "" { + if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + } + if dataStr != "" { + if err := json.Unmarshal([]byte(dataStr), &event.Data); err != nil { + return nil, fmt.Errorf("failed to unmarshal data: %w", err) + } } return event, nil @@ -182,23 +442,36 @@ func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeli } func (s *logStoreImpl) InsertManyEvent(ctx context.Context, events []*models.Event) error { + if len(events) == 0 { + return nil + } + batch, err := s.chDB.PrepareBatch(ctx, - "INSERT INTO events (id, tenant_id, destination_id, time, topic, eligible_for_retry, metadata, data) VALUES (?, ?, ?, ?, ?, ?)", + "INSERT INTO events (id, tenant_id, destination_id, topic, eligible_for_retry, time, metadata, data)", ) if err != nil { return err } for _, event := range events { + metadataJSON, err := json.Marshal(event.Metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + dataJSON, err := json.Marshal(event.Data) + if err != nil { + return fmt.Errorf("failed to marshal data: %w", err) + } + if err := batch.Append( - &event.ID, - &event.TenantID, - &event.DestinationID, - &event.Time, - &event.Topic, - &event.EligibleForRetry, - &event.Metadata, - &event.Data, + event.ID, + event.TenantID, + event.DestinationID, + event.Topic, + event.EligibleForRetry, + event.Time, + string(metadataJSON), + string(dataJSON), ); err != nil { return err } @@ -208,12 +481,20 @@ func (s *logStoreImpl) InsertManyEvent(ctx context.Context, events []*models.Eve return err } + // Decision: Force ClickHouse to merge data immediately for testing + // NOT production-ready, but ensures data is immediately visible + _ = s.chDB.Exec(ctx, "OPTIMIZE TABLE events FINAL") + return nil } func (s *logStoreImpl) InsertManyDelivery(ctx context.Context, deliveries []*models.Delivery) error { + if len(deliveries) == 0 { + return nil + } + batch, err := s.chDB.PrepareBatch(ctx, - "INSERT INTO deliveries (id, delivery_event_id, event_id, destination_id, status, time) VALUES (?, ?, ?, ?, ?, ?)", + "INSERT INTO deliveries (id, delivery_event_id, event_id, destination_id, status, time)", ) if err != nil { return err @@ -221,12 +502,12 @@ func (s *logStoreImpl) InsertManyDelivery(ctx context.Context, deliveries []*mod for _, delivery := range deliveries { if err := batch.Append( - &delivery.ID, - &delivery.DeliveryEventID, - &delivery.EventID, - &delivery.DestinationID, - &delivery.Status, - &delivery.Time, + delivery.ID, + delivery.DeliveryEventID, + delivery.EventID, + delivery.DestinationID, + delivery.Status, + delivery.Time, ); err != nil { return err } @@ -236,15 +517,109 @@ func (s *logStoreImpl) InsertManyDelivery(ctx context.Context, deliveries []*mod return err } + // Decision: Force ClickHouse to merge data immediately for testing + // NOT production-ready, but ensures data is immediately visible + _ = s.chDB.Exec(ctx, "OPTIMIZE TABLE deliveries FINAL") + return nil } func (s *logStoreImpl) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { - // TODO: implement + if len(deliveryEvents) == 0 { + return nil + } + + // Insert events + events := make([]*models.Event, len(deliveryEvents)) + for i, de := range deliveryEvents { + events[i] = &de.Event + } + if err := s.InsertManyEvent(ctx, events); err != nil { + return fmt.Errorf("failed to insert events: %w", err) + } + + // Insert deliveries + deliveries := make([]*models.Delivery, 0, len(deliveryEvents)) + for _, de := range deliveryEvents { + if de.Delivery != nil { + deliveries = append(deliveries, de.Delivery) + } else { + // Create a pending delivery if none exists + deliveries = append(deliveries, &models.Delivery{ + ID: de.ID, + DeliveryEventID: de.ID, + EventID: de.Event.ID, + DestinationID: de.DestinationID, + Status: "pending", + Time: time.Now(), + }) + } + } + if err := s.InsertManyDelivery(ctx, deliveries); err != nil { + return fmt.Errorf("failed to insert deliveries: %w", err) + } + return nil } func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, destinationID, eventID string) (*models.Event, error) { - // TODO: implement - return nil, nil + // Query event with destination-specific status + // Decision: Get the latest delivery status for this specific destination + query := ` + WITH latest_delivery AS ( + SELECT argMax(status, time) as status + FROM deliveries + WHERE event_id = ? AND destination_id = ? + ) + SELECT + e.id, + e.tenant_id, + ? as destination_id, + e.time, + e.topic, + e.eligible_for_retry, + e.metadata, + e.data, + COALESCE(ld.status, 'pending') as status + FROM events e + LEFT JOIN latest_delivery ld ON true + WHERE e.tenant_id = ? AND e.id = ? + ` + + row := s.chDB.QueryRow(ctx, query, eventID, destinationID, destinationID, tenantID, eventID) + + var metadataStr, dataStr string + event := &models.Event{} + err := row.Scan( + &event.ID, + &event.TenantID, + &event.DestinationID, + &event.Time, + &event.Topic, + &event.EligibleForRetry, + &metadataStr, + &dataStr, + &event.Status, + ) + if err != nil { + // ClickHouse returns an error when no rows, not sql.ErrNoRows + if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "no rows") { + return nil, nil + } + return nil, fmt.Errorf("query failed: %w", err) + } + + // Unmarshal JSON strings + if metadataStr != "" { + if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + } + if dataStr != "" { + if err := json.Unmarshal([]byte(dataStr), &event.Data); err != nil { + return nil, fmt.Errorf("failed to unmarshal data: %w", err) + } + } + + return event, nil } diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 194dc50c..2fab1d2d 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -12,12 +12,11 @@ import ( "github.com/stretchr/testify/require" ) -// func TestConformance(t *testing.T) { -// testutil.CheckIntegrationTest(t) -// t.Parallel() +func TestConformance(t *testing.T) { + t.Parallel() -// drivertest.RunConformanceTests(t, newHarness) -// } + drivertest.RunConformanceTests(t, newHarness) +} type harness struct { chDB clickhouse.DB From a7e77261fef6fa2f9405e20d2e198180822f6dbe Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 30 Nov 2025 19:23:24 +0700 Subject: [PATCH 2/8] chore: remove OPTIMIZE TABLE FINAL --- internal/logstore/chlogstore/chlogstore.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index f8126f83..61a34097 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -481,10 +481,6 @@ func (s *logStoreImpl) InsertManyEvent(ctx context.Context, events []*models.Eve return err } - // Decision: Force ClickHouse to merge data immediately for testing - // NOT production-ready, but ensures data is immediately visible - _ = s.chDB.Exec(ctx, "OPTIMIZE TABLE events FINAL") - return nil } @@ -517,10 +513,6 @@ func (s *logStoreImpl) InsertManyDelivery(ctx context.Context, deliveries []*mod return err } - // Decision: Force ClickHouse to merge data immediately for testing - // NOT production-ready, but ensures data is immediately visible - _ = s.chDB.Exec(ctx, "OPTIMIZE TABLE deliveries FINAL") - return nil } From b247b1e95630f27d8309dbce2ae63d745301b712 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 1 Dec 2025 00:05:13 +0700 Subject: [PATCH 3/8] test: ensure distinct delivery timestamp --- internal/logstore/drivertest/drivertest.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/logstore/drivertest/drivertest.go b/internal/logstore/drivertest/drivertest.go index 8c5f8e71..9f3753c5 100644 --- a/internal/logstore/drivertest/drivertest.go +++ b/internal/logstore/drivertest/drivertest.go @@ -133,12 +133,18 @@ func testIntegrationLogStore_EventCRUD(t *testing.T, newHarness HarnessMaker) { } var delivery *models.Delivery + // Use explicit delivery times to ensure proper ordering. + // The "init" delivery (for retry cases) should have an earlier time than the "final" delivery. + initDeliveryTime := time.Now() + finalDeliveryTime := initDeliveryTime.Add(time.Millisecond) // Ensure final is always later + if shouldRetry { delivery = testutil.DeliveryFactory.AnyPointer( testutil.DeliveryFactory.WithID(fmt.Sprintf("del_%02d_init", i)), testutil.DeliveryFactory.WithEventID(event.ID), testutil.DeliveryFactory.WithDestinationID(destinationID), testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(initDeliveryTime), ) deliveryEvents = append(deliveryEvents, &models.DeliveryEvent{ ID: fmt.Sprintf("de_%02d_init", i), @@ -159,6 +165,7 @@ func testIntegrationLogStore_EventCRUD(t *testing.T, newHarness HarnessMaker) { testutil.DeliveryFactory.WithEventID(event.ID), testutil.DeliveryFactory.WithDestinationID(destinationID), testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(finalDeliveryTime), ) } else { statusEvents["failed"] = append(statusEvents["failed"], event) @@ -168,6 +175,7 @@ func testIntegrationLogStore_EventCRUD(t *testing.T, newHarness HarnessMaker) { testutil.DeliveryFactory.WithEventID(event.ID), testutil.DeliveryFactory.WithDestinationID(destinationID), testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(finalDeliveryTime), ) } @@ -622,6 +630,7 @@ func testIntegrationLogStore_DeliveryCRUD(t *testing.T, newHarness HarnessMaker) t.Run("list delivery empty", func(t *testing.T) { queriedDeliveries, err := logStore.ListDelivery(ctx, driver.ListDeliveryRequest{ + TenantID: event.TenantID, EventID: "unknown", DestinationID: "", }) @@ -631,6 +640,7 @@ func testIntegrationLogStore_DeliveryCRUD(t *testing.T, newHarness HarnessMaker) t.Run("list delivery", func(t *testing.T) { queriedDeliveries, err := logStore.ListDelivery(ctx, driver.ListDeliveryRequest{ + TenantID: event.TenantID, EventID: event.ID, DestinationID: destinationID, }) From c30d13a3ff9052e34b08a662de9f46934d3f9f16 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 1 Dec 2025 00:05:40 +0700 Subject: [PATCH 4/8] refactor: implement chlogstore with denormalized table schema --- internal/clickhouse/clickhouse.go | 2 +- internal/logstore/chlogstore/chlogstore.go | 635 ++++++++---------- .../clickhouse/000001_init.down.sql | 4 +- .../migrations/clickhouse/000001_init.up.sql | 40 +- 4 files changed, 300 insertions(+), 381 deletions(-) diff --git a/internal/clickhouse/clickhouse.go b/internal/clickhouse/clickhouse.go index 68e8b6b0..90ac7dd6 100644 --- a/internal/clickhouse/clickhouse.go +++ b/internal/clickhouse/clickhouse.go @@ -29,7 +29,7 @@ func New(config *ClickHouseConfig) (DB, error) { // Debug: true, // Debugf: func(format string, v ...any) { - // fmt.Printf(format+"\n", v...) + // fmt.Printf("[CH DEBUG] "+format+"\n", v...) // }, }) return conn, err diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 61a34097..995acfab 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -23,147 +23,117 @@ func NewLogStore(chDB clickhouse.DB) driver.LogStore { } func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRequest) (driver.ListEventResponse, error) { - // Build the main query with CTE to get events with their latest delivery status - // Decision: Use CTEs similar to PG approach for clarity, ClickHouse's argMax is perfect for getting latest status - // Set default time range start := request.Start end := request.End if start == nil && end == nil { - // Default to last 1 hour now := time.Now() oneHourAgo := now.Add(-1 * time.Hour) start = &oneHourAgo end = &now } else if start == nil && end != nil { - // Default start to end - 1 hour oneHourBefore := end.Add(-1 * time.Hour) start = &oneHourBefore } else if start != nil && end == nil { - // Default end to now now := time.Now() end = &now } limit := request.Limit if limit == 0 { - limit = 100 // Default limit + limit = 100 } - // Build dynamic query parts using simple ? placeholders - // We'll provide args in order and duplicate where needed - var destFilterSubquery, topicFilterSubquery string - var destFilterMain, topicFilterMain string - var cursorFilter string - + // Build query - single table, no JOINs needed + // We get the latest row per event_id (max delivery_time) + // Use Unix milliseconds for DateTime64(3) columns to preserve precision var args []interface{} + args = append(args, request.TenantID, start.UnixMilli(), end.UnixMilli()) - // Base args for subquery: tenant_id, start, end - args = append(args, request.TenantID, *start, *end) + // Build filter clauses for WHERE (on raw table columns with alias) + var destFilter, topicFilter string - // Add destination filter for subquery if len(request.DestinationIDs) > 0 { args = append(args, request.DestinationIDs) - destFilterSubquery = " AND destination_id IN (?)" + destFilter = " AND e.destination_id IN (?)" } - // Add topic filter for subquery if len(request.Topics) > 0 { args = append(args, request.Topics) - topicFilterSubquery = " AND topic IN (?)" + topicFilter = " AND e.topic IN (?)" } - // Now add args for main query: tenant_id, start, end (duplicated) - args = append(args, request.TenantID, *start, *end) - - // Add destination filter for main query - if len(request.DestinationIDs) > 0 { - args = append(args, request.DestinationIDs) - destFilterMain = " AND e.destination_id IN (?)" - } + // Handle cursor pagination and status filter + // Since status is computed via argMax in SELECT, we filter in HAVING clause + var havingClauses []string + var havingArgs []interface{} + isBackward := false // true when using Prev cursor (going backward) - // Add topic filter for main query - if len(request.Topics) > 0 { - args = append(args, request.Topics) - topicFilterMain = " AND e.topic IN (?)" - } - - // Add status filter (will use HAVING since status is an aggregate) - var havingFilter string + // Status filter - uses HAVING since status is an aggregate + // Reference the SELECT alias 'status' which is argMax(e.status, e.delivery_time) if request.Status != "" { - args = append(args, request.Status) - havingFilter = " HAVING status = ?" + havingClauses = append(havingClauses, "status = ?") + havingArgs = append(havingArgs, request.Status) } - // Add cursor filter and determine sort order - // For Prev cursor: query ascending to get the right window, then reverse in code - // For Next/no cursor: query descending - var orderBy string - if request.Prev != "" { + if request.Next != "" { + cursorTime, cursorID, err := parseCursor(request.Next) + if err != nil { + return driver.ListEventResponse{}, fmt.Errorf("invalid next cursor: %w", err) + } + // For next page (DESC order): get records with time < cursor OR (time == cursor AND id < cursor_id) + havingClauses = append(havingClauses, "(event_time < fromUnixTimestamp64Milli(?) OR (event_time = fromUnixTimestamp64Milli(?) AND e.event_id < ?))") + havingArgs = append(havingArgs, cursorTime.UnixMilli(), cursorTime.UnixMilli(), cursorID) + } else if request.Prev != "" { cursorTime, cursorID, err := parseCursor(request.Prev) if err != nil { return driver.ListEventResponse{}, fmt.Errorf("invalid prev cursor: %w", err) } - args = append(args, cursorTime, cursorTime, cursorID) - cursorFilter = " WHERE (time > ? OR (time = ? AND id > ?))" - orderBy = "ORDER BY time ASC, id ASC" // Ascending for Prev to get right window - } else { - if request.Next != "" { - cursorTime, cursorID, err := parseCursor(request.Next) - if err != nil { - return driver.ListEventResponse{}, fmt.Errorf("invalid next cursor: %w", err) - } - args = append(args, cursorTime, cursorTime, cursorID) - cursorFilter = " WHERE (time < ? OR (time = ? AND id < ?))" - } - orderBy = "ORDER BY time DESC, id DESC" // Descending for Next/first page + // For prev page: get records with time > cursor OR (time == cursor AND id > cursor_id) + // We'll query in ASC order and reverse the results + havingClauses = append(havingClauses, "(event_time > fromUnixTimestamp64Milli(?) OR (event_time = fromUnixTimestamp64Milli(?) AND e.event_id > ?))") + havingArgs = append(havingArgs, cursorTime.UnixMilli(), cursorTime.UnixMilli(), cursorID) + isBackward = true + } + + // Build HAVING clause + var havingClause string + if len(havingClauses) > 0 { + havingClause = " HAVING " + strings.Join(havingClauses, " AND ") } + orderBy := "ORDER BY event_time DESC, event_id DESC" + if isBackward { + orderBy = "ORDER BY event_time ASC, event_id ASC" + } + + // Query to get latest status per event + // Use table alias to avoid confusion between raw columns and aggregated output query := fmt.Sprintf(` - WITH latest_deliveries AS ( - SELECT - event_id, - argMax(status, time) as status, - max(time) as delivery_time - FROM deliveries - WHERE event_id IN ( - SELECT DISTINCT id - FROM events - WHERE tenant_id = ? - AND time >= ? - AND time <= ? - %s - %s - ) - GROUP BY event_id - ), - events_with_status AS ( - SELECT - e.id, - argMax(e.tenant_id, e.time) as tenant_id, - argMax(e.destination_id, e.time) as destination_id, - max(e.time) as time, - argMax(e.topic, e.time) as topic, - argMax(e.eligible_for_retry, e.time) as eligible_for_retry, - argMax(e.metadata, e.time) as metadata, - argMax(e.data, e.time) as data, - argMax(COALESCE(ld.status, 'pending'), e.time) as status, - argMax(COALESCE(ld.delivery_time, e.time), e.time) as delivery_time - FROM events e - LEFT JOIN latest_deliveries ld ON e.id = ld.event_id - WHERE e.tenant_id = ? - AND e.time >= ? - AND e.time <= ? - %s - %s - GROUP BY e.id + SELECT + e.event_id, + any(e.tenant_id) as tenant_id, + any(e.destination_id) as destination_id, + any(e.topic) as topic, + any(e.eligible_for_retry) as eligible_for_retry, + max(e.event_time) as event_time, + any(e.metadata) as metadata, + any(e.data) as data, + argMax(e.status, e.delivery_time) as status + FROM event_log AS e + WHERE e.tenant_id = ? + AND e.event_time >= fromUnixTimestamp64Milli(?) + AND e.event_time <= fromUnixTimestamp64Milli(?) + %s %s - ) - SELECT * FROM events_with_status + GROUP BY e.event_id %s %s LIMIT %d - `, destFilterSubquery, topicFilterSubquery, destFilterMain, topicFilterMain, havingFilter, cursorFilter, orderBy, limit+1) + `, destFilter, topicFilter, havingClause, orderBy, limit+1) + + // Append having args after the main args + args = append(args, havingArgs...) rows, err := s.chDB.Query(ctx, query, args...) if err != nil { @@ -174,25 +144,22 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe var events []*models.Event for rows.Next() { var metadataStr, dataStr string - var deliveryTime time.Time event := &models.Event{} if err := rows.Scan( &event.ID, &event.TenantID, &event.DestinationID, - &event.Time, &event.Topic, &event.EligibleForRetry, + &event.Time, &metadataStr, &dataStr, &event.Status, - &deliveryTime, ); err != nil { return driver.ListEventResponse{}, fmt.Errorf("scan failed: %w", err) } - // Unmarshal JSON strings if metadataStr != "" { if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { return driver.ListEventResponse{}, fmt.Errorf("failed to unmarshal metadata: %w", err) @@ -212,87 +179,86 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe } // Handle pagination - var hasNext, hasPrev bool - if request.Prev != "" { - // Going backward - we came backwards, so definitely more ahead - // Events are in ASC order from query, trim last (oldest), then reverse to DESC - hasNext = true - hasPrev = len(events) > int(limit) - if hasPrev { - events = events[:len(events)-1] // Trim last item (oldest in ASC order) - } - // Reverse the slice to get DESC order - for i := 0; i < len(events)/2; i++ { - events[i], events[len(events)-1-i] = events[len(events)-1-i], events[i] - } - } else if request.Next != "" { - // Going forward - we came forwards, so definitely more behind - // Events are already in DESC order from query - hasPrev = true - hasNext = len(events) > int(limit) - if hasNext { - events = events[:limit] // Trim the extra item + hasMore := len(events) > int(limit) + if hasMore { + events = events[:limit] + } + + // For backward pagination, reverse the results to maintain DESC order + if isBackward { + for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 { + events[i], events[j] = events[j], events[i] } + } + + // Determine hasNext/hasPrev based on pagination direction + var hasNext, hasPrev bool + if isBackward { + // Going backward: hasMore means there are more prev pages, and we came from a next cursor so hasNext is true + hasPrev = hasMore + hasNext = request.Prev != "" // We used Prev cursor to get here, so there's always a next page } else { - // First page - events are in DESC order from query - hasPrev = false - hasNext = len(events) > int(limit) + // Going forward: hasMore means there are more next pages + hasNext = hasMore + hasPrev = request.Next != "" // We used Next cursor to get here, so there's always a prev page + } + + // Build cursors + var nextCursor, prevCursor string + if len(events) > 0 { if hasNext { - events = events[:limit] // Trim the extra item + lastEvent := events[len(events)-1] + nextCursor = formatCursor(lastEvent.Time, lastEvent.ID) + } + if hasPrev { + firstEvent := events[0] + prevCursor = formatCursor(firstEvent.Time, firstEvent.ID) } } - // Get total count (separate query) - // Decision: Use a separate count query for accuracy (ClickHouse is fast at this) - countQuery := fmt.Sprintf(` - WITH latest_deliveries AS ( - SELECT - event_id, - argMax(status, time) as status - FROM deliveries - WHERE event_id IN ( - SELECT DISTINCT id - FROM events - WHERE tenant_id = ? - AND time >= ? - AND time <= ? - %s - %s - ) - GROUP BY event_id - ) - SELECT COUNT(*) FROM ( - SELECT e.id, argMax(COALESCE(ld.status, 'pending'), e.time) as status - FROM events e - LEFT JOIN latest_deliveries ld ON e.id = ld.event_id - WHERE e.tenant_id = ? - AND e.time >= ? - AND e.time <= ? - %s - %s - GROUP BY e.id - %s - ) - `, destFilterSubquery, topicFilterSubquery, destFilterMain, topicFilterMain, havingFilter) - - // Build count args (same as query args but without cursor) + // Count query var countArgs []interface{} - countArgs = append(countArgs, request.TenantID, *start, *end) - if len(request.DestinationIDs) > 0 { - countArgs = append(countArgs, request.DestinationIDs) - } - if len(request.Topics) > 0 { - countArgs = append(countArgs, request.Topics) - } - countArgs = append(countArgs, request.TenantID, *start, *end) + countArgs = append(countArgs, request.TenantID, start.UnixMilli(), end.UnixMilli()) + + countDestFilter := "" if len(request.DestinationIDs) > 0 { countArgs = append(countArgs, request.DestinationIDs) + countDestFilter = " AND destination_id IN (?)" } + + countTopicFilter := "" if len(request.Topics) > 0 { countArgs = append(countArgs, request.Topics) + countTopicFilter = " AND topic IN (?)" } + + // Build count query - if status filter is present, we need to count after grouping + var countQuery string if request.Status != "" { countArgs = append(countArgs, request.Status) + countQuery = fmt.Sprintf(` + SELECT count(*) FROM ( + SELECT event_id + FROM event_log + WHERE tenant_id = ? + AND event_time >= fromUnixTimestamp64Milli(?) + AND event_time <= fromUnixTimestamp64Milli(?) + %s + %s + GROUP BY event_id + HAVING argMax(status, delivery_time) = ? + ) + `, countDestFilter, countTopicFilter) + } else { + countQuery = fmt.Sprintf(` + SELECT count(DISTINCT event_id) + FROM event_log + WHERE tenant_id = ? + AND event_time >= fromUnixTimestamp64Milli(?) + AND event_time <= fromUnixTimestamp64Milli(?) + %s + %s + `, countDestFilter, countTopicFilter) } var totalCount uint64 @@ -300,19 +266,6 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe return driver.ListEventResponse{}, fmt.Errorf("count query failed: %w", err) } - // Build cursors - var nextCursor, prevCursor string - if len(events) > 0 { - if hasNext { - lastEvent := events[len(events)-1] - nextCursor = formatCursor(lastEvent.Time, lastEvent.ID) - } - if hasPrev { - firstEvent := events[0] - prevCursor = formatCursor(firstEvent.Time, firstEvent.ID) - } - } - return driver.ListEventResponse{ Data: events, Next: nextCursor, @@ -321,53 +274,44 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe }, nil } -// formatCursor creates a cursor from time and ID -// Decision: Use simple "timestamp|id" format func formatCursor(t time.Time, id string) string { - return fmt.Sprintf("%d|%s", t.Unix(), id) + return fmt.Sprintf("%d|%s", t.UnixMilli(), id) } -// parseCursor extracts time and ID from cursor func parseCursor(cursor string) (time.Time, string, error) { parts := strings.Split(cursor, "|") if len(parts) != 2 { return time.Time{}, "", fmt.Errorf("invalid cursor format") } - var unixTime int64 - if _, err := fmt.Sscanf(parts[0], "%d", &unixTime); err != nil { + var unixMilli int64 + if _, err := fmt.Sscanf(parts[0], "%d", &unixMilli); err != nil { return time.Time{}, "", fmt.Errorf("invalid timestamp in cursor: %w", err) } - timestamp := time.Unix(unixTime, 0) + timestamp := time.UnixMilli(unixMilli) return timestamp, parts[1], nil } func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error) { - // Query event with status calculation - // Decision: Use a CTE to get the latest delivery status, similar to PG approach query := ` - WITH latest_delivery AS ( - SELECT argMax(status, time) as status - FROM deliveries - WHERE event_id = ? - ) SELECT - e.id, - e.tenant_id, - e.destination_id, - e.time, - e.topic, - e.eligible_for_retry, - e.metadata, - e.data, - COALESCE(ld.status, 'pending') as status - FROM events e - LEFT JOIN latest_delivery ld ON true - WHERE e.tenant_id = ? AND e.id = ? + event_id, + tenant_id, + destination_id, + topic, + eligible_for_retry, + event_time, + metadata, + data, + argMax(status, delivery_time) as status + FROM event_log + WHERE tenant_id = ? AND event_id = ? + GROUP BY event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data + LIMIT 1 ` - row := s.chDB.QueryRow(ctx, query, eventID, tenantID, eventID) + row := s.chDB.QueryRow(ctx, query, tenantID, eventID) var metadataStr, dataStr string event := &models.Event{} @@ -375,22 +319,74 @@ func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID stri &event.ID, &event.TenantID, &event.DestinationID, + &event.Topic, + &event.EligibleForRetry, &event.Time, + &metadataStr, + &dataStr, + &event.Status, + ) + if err != nil { + if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "no rows") { + return nil, nil + } + return nil, fmt.Errorf("query failed: %w", err) + } + + if metadataStr != "" { + if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + } + if dataStr != "" { + if err := json.Unmarshal([]byte(dataStr), &event.Data); err != nil { + return nil, fmt.Errorf("failed to unmarshal data: %w", err) + } + } + + return event, nil +} + +func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, destinationID, eventID string) (*models.Event, error) { + query := ` + SELECT + event_id, + tenant_id, + destination_id, + topic, + eligible_for_retry, + event_time, + metadata, + data, + argMax(status, delivery_time) as status + FROM event_log + WHERE tenant_id = ? AND destination_id = ? AND event_id = ? + GROUP BY event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data + LIMIT 1 + ` + + row := s.chDB.QueryRow(ctx, query, tenantID, destinationID, eventID) + + var metadataStr, dataStr string + event := &models.Event{} + err := row.Scan( + &event.ID, + &event.TenantID, + &event.DestinationID, &event.Topic, &event.EligibleForRetry, + &event.Time, &metadataStr, &dataStr, &event.Status, ) if err != nil { - // ClickHouse returns an error when no rows, not sql.ErrNoRows if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "no rows") { return nil, nil } return nil, fmt.Errorf("query failed: %w", err) } - // Unmarshal JSON strings if metadataStr != "" { if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) @@ -408,15 +404,20 @@ func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID stri func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeliveryRequest) ([]*models.Delivery, error) { query := ` SELECT - id, + delivery_id, + delivery_event_id, event_id, destination_id, status, - time - FROM deliveries + delivery_time, + code, + response_data + FROM event_log WHERE event_id = ? - ORDER BY time DESC + AND delivery_id != '' + ORDER BY delivery_time DESC ` + rows, err := s.chDB.Query(ctx, query, request.EventID) if err != nil { return nil, err @@ -425,193 +426,107 @@ func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeli var deliveries []*models.Delivery for rows.Next() { + var responseDataStr string + var code string delivery := &models.Delivery{} if err := rows.Scan( &delivery.ID, + &delivery.DeliveryEventID, &delivery.EventID, &delivery.DestinationID, &delivery.Status, &delivery.Time, + &code, + &responseDataStr, ); err != nil { return nil, err } + delivery.Code = code + if responseDataStr != "" { + if err := json.Unmarshal([]byte(responseDataStr), &delivery.ResponseData); err != nil { + return nil, fmt.Errorf("failed to unmarshal response_data: %w", err) + } + } deliveries = append(deliveries, delivery) } return deliveries, nil } -func (s *logStoreImpl) InsertManyEvent(ctx context.Context, events []*models.Event) error { - if len(events) == 0 { +func (s *logStoreImpl) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { + if len(deliveryEvents) == 0 { return nil } batch, err := s.chDB.PrepareBatch(ctx, - "INSERT INTO events (id, tenant_id, destination_id, topic, eligible_for_retry, time, metadata, data)", + `INSERT INTO event_log ( + event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data, + delivery_id, delivery_event_id, status, delivery_time, code, response_data + )`, ) if err != nil { return err } - for _, event := range events { - metadataJSON, err := json.Marshal(event.Metadata) + for _, de := range deliveryEvents { + metadataJSON, err := json.Marshal(de.Event.Metadata) if err != nil { return fmt.Errorf("failed to marshal metadata: %w", err) } - dataJSON, err := json.Marshal(event.Data) + dataJSON, err := json.Marshal(de.Event.Data) if err != nil { return fmt.Errorf("failed to marshal data: %w", err) } - if err := batch.Append( - event.ID, - event.TenantID, - event.DestinationID, - event.Topic, - event.EligibleForRetry, - event.Time, - string(metadataJSON), - string(dataJSON), - ); err != nil { - return err - } - } - - if err := batch.Send(); err != nil { - return err - } - - return nil -} - -func (s *logStoreImpl) InsertManyDelivery(ctx context.Context, deliveries []*models.Delivery) error { - if len(deliveries) == 0 { - return nil - } - - batch, err := s.chDB.PrepareBatch(ctx, - "INSERT INTO deliveries (id, delivery_event_id, event_id, destination_id, status, time)", - ) - if err != nil { - return err - } - - for _, delivery := range deliveries { - if err := batch.Append( - delivery.ID, - delivery.DeliveryEventID, - delivery.EventID, - delivery.DestinationID, - delivery.Status, - delivery.Time, - ); err != nil { - return err - } - } - - if err := batch.Send(); err != nil { - return err - } - - return nil -} - -func (s *logStoreImpl) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { - if len(deliveryEvents) == 0 { - return nil - } - - // Insert events - events := make([]*models.Event, len(deliveryEvents)) - for i, de := range deliveryEvents { - events[i] = &de.Event - } - if err := s.InsertManyEvent(ctx, events); err != nil { - return fmt.Errorf("failed to insert events: %w", err) - } + // Determine delivery fields + var deliveryID, deliveryEventID, status, code string + var deliveryTime time.Time + var responseDataJSON []byte - // Insert deliveries - deliveries := make([]*models.Delivery, 0, len(deliveryEvents)) - for _, de := range deliveryEvents { if de.Delivery != nil { - deliveries = append(deliveries, de.Delivery) + deliveryID = de.Delivery.ID + deliveryEventID = de.Delivery.DeliveryEventID + status = de.Delivery.Status + deliveryTime = de.Delivery.Time + code = de.Delivery.Code + responseDataJSON, err = json.Marshal(de.Delivery.ResponseData) + if err != nil { + return fmt.Errorf("failed to marshal response_data: %w", err) + } } else { - // Create a pending delivery if none exists - deliveries = append(deliveries, &models.Delivery{ - ID: de.ID, - DeliveryEventID: de.ID, - EventID: de.Event.ID, - DestinationID: de.DestinationID, - Status: "pending", - Time: time.Now(), - }) + // Pending event - no delivery yet. + // We set delivery_time = event_time as a placeholder. This is semantically + // incorrect (there's no actual delivery), but necessary because we use + // argMax(status, delivery_time) to determine the latest status. By using + // event_time, pending status will always "lose" to any real delivery + // (which will have a later delivery_time), ensuring correct status resolution. + deliveryID = "" + deliveryEventID = de.ID + status = "pending" + deliveryTime = de.Event.Time + code = "" + responseDataJSON = []byte("{}") } - } - if err := s.InsertManyDelivery(ctx, deliveries); err != nil { - return fmt.Errorf("failed to insert deliveries: %w", err) - } - return nil -} - -func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, destinationID, eventID string) (*models.Event, error) { - // Query event with destination-specific status - // Decision: Get the latest delivery status for this specific destination - query := ` - WITH latest_delivery AS ( - SELECT argMax(status, time) as status - FROM deliveries - WHERE event_id = ? AND destination_id = ? - ) - SELECT - e.id, - e.tenant_id, - ? as destination_id, - e.time, - e.topic, - e.eligible_for_retry, - e.metadata, - e.data, - COALESCE(ld.status, 'pending') as status - FROM events e - LEFT JOIN latest_delivery ld ON true - WHERE e.tenant_id = ? AND e.id = ? - ` - - row := s.chDB.QueryRow(ctx, query, eventID, destinationID, destinationID, tenantID, eventID) - - var metadataStr, dataStr string - event := &models.Event{} - err := row.Scan( - &event.ID, - &event.TenantID, - &event.DestinationID, - &event.Time, - &event.Topic, - &event.EligibleForRetry, - &metadataStr, - &dataStr, - &event.Status, - ) - if err != nil { - // ClickHouse returns an error when no rows, not sql.ErrNoRows - if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "no rows") { - return nil, nil - } - return nil, fmt.Errorf("query failed: %w", err) - } - - // Unmarshal JSON strings - if metadataStr != "" { - if err := json.Unmarshal([]byte(metadataStr), &event.Metadata); err != nil { - return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) - } - } - if dataStr != "" { - if err := json.Unmarshal([]byte(dataStr), &event.Data); err != nil { - return nil, fmt.Errorf("failed to unmarshal data: %w", err) + if err := batch.Append( + de.Event.ID, + de.Event.TenantID, + de.DestinationID, + de.Event.Topic, + de.Event.EligibleForRetry, + de.Event.Time, + string(metadataJSON), + string(dataJSON), + deliveryID, + deliveryEventID, + status, + deliveryTime, + code, + string(responseDataJSON), + ); err != nil { + return err } } - return event, nil + return batch.Send() } diff --git a/internal/migrator/migrations/clickhouse/000001_init.down.sql b/internal/migrator/migrations/clickhouse/000001_init.down.sql index 1bc93eb7..f3db6fbf 100644 --- a/internal/migrator/migrations/clickhouse/000001_init.down.sql +++ b/internal/migrator/migrations/clickhouse/000001_init.down.sql @@ -1,3 +1 @@ -DROP TABLE IF EXISTS deliveries; - -DROP TABLE IF EXISTS events; \ No newline at end of file +DROP TABLE IF EXISTS event_log; diff --git a/internal/migrator/migrations/clickhouse/000001_init.up.sql b/internal/migrator/migrations/clickhouse/000001_init.up.sql index ab5e207c..311412e1 100644 --- a/internal/migrator/migrations/clickhouse/000001_init.up.sql +++ b/internal/migrator/migrations/clickhouse/000001_init.up.sql @@ -1,23 +1,29 @@ -CREATE TABLE IF NOT EXISTS events ( - id String, +-- Single denormalized table for events and deliveries +-- Each row represents a delivery attempt (or pending state) for an event +-- This avoids JOINs and leverages ClickHouse's columnar storage efficiently + +CREATE TABLE IF NOT EXISTS event_log ( + -- Event fields + event_id String, tenant_id String, destination_id String, topic String, eligible_for_retry Bool, - time DateTime, - metadata String, - data String -) ENGINE = MergeTree -ORDER BY - (id, time); + event_time DateTime64(3), + metadata String, -- JSON serialized + data String, -- JSON serialized -CREATE TABLE IF NOT EXISTS deliveries ( - id String, + -- Delivery fields (nullable for pending events) + delivery_id String, delivery_event_id String, - event_id String, - destination_id String, - status String, - time DateTime -) ENGINE = ReplacingMergeTree -ORDER BY - (id, time); \ No newline at end of file + status String, -- 'pending', 'success', 'failed' + delivery_time DateTime64(3), + code String, + response_data String, -- JSON serialized + + -- Indexes for common filter patterns + INDEX idx_topic topic TYPE bloom_filter GRANULARITY 4, + INDEX idx_status status TYPE set(100) GRANULARITY 4 +) ENGINE = MergeTree +PARTITION BY toYYYYMMDD(event_time) +ORDER BY (tenant_id, destination_id, event_time, event_id, delivery_time); From 8f3bb2b88d89c26610dd55d09672b9ed8c1c2a72 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 1 Dec 2025 18:59:09 +0700 Subject: [PATCH 5/8] chore: include tenant id in list delivery query --- internal/apirouter/log_handlers.go | 1 + internal/logstore/chlogstore/chlogstore.go | 5 +++-- internal/logstore/driver/driver.go | 1 + internal/logstore/pglogstore/pglogstore.go | 13 ++++++++----- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/apirouter/log_handlers.go b/internal/apirouter/log_handlers.go index e827021b..b6a70350 100644 --- a/internal/apirouter/log_handlers.go +++ b/internal/apirouter/log_handlers.go @@ -159,6 +159,7 @@ func (h *LogHandlers) ListDeliveryByEvent(c *gin.Context) { return } deliveries, err := h.logStore.ListDelivery(c.Request.Context(), logstore.ListDeliveryRequest{ + TenantID: event.TenantID, EventID: event.ID, DestinationID: c.Query("destination_id"), }) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 995acfab..581c0b21 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -413,12 +413,13 @@ func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeli code, response_data FROM event_log - WHERE event_id = ? + WHERE tenant_id = ? + AND event_id = ? AND delivery_id != '' ORDER BY delivery_time DESC ` - rows, err := s.chDB.Query(ctx, query, request.EventID) + rows, err := s.chDB.Query(ctx, query, request.TenantID, request.EventID) if err != nil { return nil, err } diff --git a/internal/logstore/driver/driver.go b/internal/logstore/driver/driver.go index f5dfa472..abbe41f2 100644 --- a/internal/logstore/driver/driver.go +++ b/internal/logstore/driver/driver.go @@ -36,6 +36,7 @@ type ListEventByDestinationRequest struct { } type ListDeliveryRequest struct { + TenantID string // required - always filter by tenant for security EventID string DestinationID string } diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index c6e8628c..fc1aaa42 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -326,13 +326,16 @@ func (s *logStore) RetrieveEventByDestination(ctx context.Context, tenantID, des func (s *logStore) ListDelivery(ctx context.Context, req driver.ListDeliveryRequest) ([]*models.Delivery, error) { query := ` - SELECT id, event_id, destination_id, status, time, code, response_data - FROM deliveries - WHERE event_id = $1 - AND ($2 = '' OR destination_id = $2) - ORDER BY time DESC` + SELECT d.id, d.event_id, d.destination_id, d.status, d.time, d.code, d.response_data + FROM deliveries d + JOIN events e ON d.event_id = e.id + WHERE e.tenant_id = $1 + AND d.event_id = $2 + AND ($3 = '' OR d.destination_id = $3) + ORDER BY d.time DESC` rows, err := s.db.Query(ctx, query, + req.TenantID, req.EventID, req.DestinationID) if err != nil { From 1f10d03c96005ab4ec304f86faa5498fb7af77cb Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 1 Dec 2025 18:59:40 +0700 Subject: [PATCH 6/8] chore: gofmt --- internal/config/logging.go | 2 +- internal/deliverymq/retry_test.go | 94 +++++++++++++++---------------- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/internal/config/logging.go b/internal/config/logging.go index 09710186..e877eb77 100644 --- a/internal/config/logging.go +++ b/internal/config/logging.go @@ -167,7 +167,7 @@ func maskPostgresURLHost(url string) string { if url == "" { return "" } - + // postgres://user:password@host:port/database?params if idx := strings.Index(url, "@"); idx != -1 { rest := url[idx+1:] diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index e6b641cb..10a1b012 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -19,18 +19,18 @@ import ( ) type RetryDeliveryMQSuite struct { - ctx context.Context - mqConfig *mqs.QueueConfig - retryMaxCount int - retryBackoff backoff.Backoff + ctx context.Context + mqConfig *mqs.QueueConfig + retryMaxCount int + retryBackoff backoff.Backoff schedulerPollBackoff time.Duration - publisher deliverymq.Publisher - eventGetter deliverymq.EventGetter - logPublisher deliverymq.LogPublisher - destGetter deliverymq.DestinationGetter - alertMonitor deliverymq.AlertMonitor - deliveryMQ *deliverymq.DeliveryMQ - teardown func() + publisher deliverymq.Publisher + eventGetter deliverymq.EventGetter + logPublisher deliverymq.LogPublisher + destGetter deliverymq.DestinationGetter + alertMonitor deliverymq.AlertMonitor + deliveryMQ *deliverymq.DeliveryMQ + teardown func() } func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { @@ -136,15 +136,15 @@ func TestDeliveryMQRetry_EligibleForRetryFalse(t *testing.T) { eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ - ctx: ctx, - mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, - publisher: publisher, - eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), - destGetter: &mockDestinationGetter{dest: &destination}, - alertMonitor: newMockAlertMonitor(), - retryMaxCount: 10, - retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, + ctx: ctx, + mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, + publisher: publisher, + eventGetter: eventGetter, + logPublisher: newMockLogPublisher(nil), + destGetter: &mockDestinationGetter{dest: &destination}, + alertMonitor: newMockAlertMonitor(), + retryMaxCount: 10, + retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, schedulerPollBackoff: 10 * time.Millisecond, } suite.SetupTest(t) @@ -199,15 +199,15 @@ func TestDeliveryMQRetry_EligibleForRetryTrue(t *testing.T) { eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ - ctx: ctx, - mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, - publisher: publisher, - eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), - destGetter: &mockDestinationGetter{dest: &destination}, - alertMonitor: newMockAlertMonitor(), - retryMaxCount: 10, - retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, + ctx: ctx, + mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, + publisher: publisher, + eventGetter: eventGetter, + logPublisher: newMockLogPublisher(nil), + destGetter: &mockDestinationGetter{dest: &destination}, + alertMonitor: newMockAlertMonitor(), + retryMaxCount: 10, + retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, schedulerPollBackoff: 10 * time.Millisecond, } suite.SetupTest(t) @@ -257,15 +257,15 @@ func TestDeliveryMQRetry_SystemError(t *testing.T) { eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ - ctx: ctx, - mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, - publisher: newMockPublisher(nil), // publisher won't be called due to early error - eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), - destGetter: destGetter, - alertMonitor: newMockAlertMonitor(), - retryMaxCount: 10, - retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, + ctx: ctx, + mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, + publisher: newMockPublisher(nil), // publisher won't be called due to early error + eventGetter: eventGetter, + logPublisher: newMockLogPublisher(nil), + destGetter: destGetter, + alertMonitor: newMockAlertMonitor(), + retryMaxCount: 10, + retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, schedulerPollBackoff: 10 * time.Millisecond, } suite.SetupTest(t) @@ -327,15 +327,15 @@ func TestDeliveryMQRetry_RetryMaxCount(t *testing.T) { eventGetter.registerEvent(&event) suite := &RetryDeliveryMQSuite{ - ctx: ctx, - mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, - publisher: publisher, - eventGetter: eventGetter, - logPublisher: newMockLogPublisher(nil), - destGetter: &mockDestinationGetter{dest: &destination}, - alertMonitor: newMockAlertMonitor(), - retryMaxCount: 2, // 1 initial + 2 retries = 3 total attempts - retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, + ctx: ctx, + mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}}, + publisher: publisher, + eventGetter: eventGetter, + logPublisher: newMockLogPublisher(nil), + destGetter: &mockDestinationGetter{dest: &destination}, + alertMonitor: newMockAlertMonitor(), + retryMaxCount: 2, // 1 initial + 2 retries = 3 total attempts + retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond}, schedulerPollBackoff: 10 * time.Millisecond, } suite.SetupTest(t) From 6ae3548ebe57432eb390881680340019276f5912 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 2 Dec 2025 12:52:44 +0700 Subject: [PATCH 7/8] chore: configure logstore with ch driver --- internal/config/config.go | 71 ++++++++++++++++++----------------- internal/config/validation.go | 9 +---- internal/logstore/logstore.go | 18 ++++----- internal/services/builder.go | 3 +- 4 files changed, 48 insertions(+), 53 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index b31aec01..79901389 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( "github.com/caarlos0/env/v9" "github.com/hookdeck/outpost/internal/backoff" + "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/migrator" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/telemetry" @@ -61,10 +62,10 @@ type Config struct { HTTPUserAgent string `yaml:"http_user_agent" env:"HTTP_USER_AGENT" desc:"Custom HTTP User-Agent string for outgoing webhook deliveries. If unset, a default (OrganizationName/Version) is used." required:"N"` // Infrastructure - Redis RedisConfig `yaml:"redis"` - // ClickHouse ClickHouseConfig `yaml:"clickhouse"` - PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"Y"` - MQs *MQsConfig `yaml:"mqs"` + Redis RedisConfig `yaml:"redis"` + ClickHouse ClickHouseConfig `yaml:"clickhouse"` + PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"N"` + MQs *MQsConfig `yaml:"mqs"` // PublishMQ PublishMQ PublishMQConfig `yaml:"publishmq"` @@ -131,9 +132,9 @@ func (c *Config) InitDefaults() { Host: "127.0.0.1", Port: 6379, } - // c.ClickHouse = ClickHouseConfig{ - // Database: "outpost", - // } + c.ClickHouse = ClickHouseConfig{ + Database: "outpost", + } c.MQs = &MQsConfig{ RabbitMQ: RabbitMQConfig{ Exchange: "outpost", @@ -378,24 +379,24 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig { } } -// type ClickHouseConfig struct { -// Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'. Required if ClickHouse is used for log storage." required:"C"` -// Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"` -// Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"` -// Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"` -// } - -// func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig { -// if c.Addr == "" { -// return nil -// } -// return &clickhouse.ClickHouseConfig{ -// Addr: c.Addr, -// Username: c.Username, -// Password: c.Password, -// Database: c.Database, -// } -// } +type ClickHouseConfig struct { + Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"` + Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"` + Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"` + Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"` +} + +func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig { + if c.Addr == "" { + return nil + } + return &clickhouse.ClickHouseConfig{ + Addr: c.Addr, + Username: c.Username, + Password: c.Password, + Database: c.Database, + } +} type AlertConfig struct { CallbackURL string `yaml:"callback_url" env:"ALERT_CALLBACK_URL" desc:"URL to which Outpost will send a POST request when an alert is triggered (e.g., for destination failures)." required:"N"` @@ -447,10 +448,10 @@ func (c *Config) ToTelemetryApplicationInfo() telemetry.ApplicationInfo { portalEnabled := c.APIKey != "" && c.APIJWTSecret != "" entityStore := "redis" - logStore := "TODO" - // if c.ClickHouse.Addr != "" { - // logStore = "clickhouse" - // } + logStore := "" + if c.ClickHouse.Addr != "" { + logStore = "clickhouse" + } if c.PostgresURL != "" { logStore = "postgres" } @@ -471,11 +472,11 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts { PG: migrator.MigrationOptsPG{ URL: c.PostgresURL, }, - // CH: migrator.MigrationOptsCH{ - // Addr: c.ClickHouse.Addr, - // Username: c.ClickHouse.Username, - // Password: c.ClickHouse.Password, - // Database: c.ClickHouse.Database, - // }, + CH: migrator.MigrationOptsCH{ + Addr: c.ClickHouse.Addr, + Username: c.ClickHouse.Username, + Password: c.ClickHouse.Password, + Database: c.ClickHouse.Database, + }, } } diff --git a/internal/config/validation.go b/internal/config/validation.go index 705ef3e8..5b9a7c5b 100644 --- a/internal/config/validation.go +++ b/internal/config/validation.go @@ -89,17 +89,10 @@ func (c *Config) validateRedis() error { } // validateLogStorage validates the ClickHouse / PG configuration -// Temporary: disable CH as it's not fully supported yet func (c *Config) validateLogStorage() error { - // if c.ClickHouse.Addr == "" && c.PostgresURL == "" { - // return ErrMissingLogStorage - // } - if c.PostgresURL == "" { + if c.ClickHouse.Addr == "" && c.PostgresURL == "" { return ErrMissingLogStorage } - // if c.ClickHouse.Addr != "" { - // return fmt.Errorf("ClickHouse is not currently supported") - // } return nil } diff --git a/internal/logstore/logstore.go b/internal/logstore/logstore.go index a94239ff..dad027e1 100644 --- a/internal/logstore/logstore.go +++ b/internal/logstore/logstore.go @@ -51,20 +51,20 @@ func NewLogStore(ctx context.Context, driverOpts DriverOpts) (LogStore, error) { } type Config struct { - // ClickHouse *clickhouse.ClickHouseConfig - Postgres *string + ClickHouse *clickhouse.ClickHouseConfig + Postgres *string } func MakeDriverOpts(cfg Config) (DriverOpts, error) { driverOpts := DriverOpts{} - // if cfg.ClickHouse != nil { - // chDB, err := clickhouse.New(cfg.ClickHouse) - // if err != nil { - // return DriverOpts{}, err - // } - // driverOpts.CH = chDB - // } + if cfg.ClickHouse != nil { + chDB, err := clickhouse.New(cfg.ClickHouse) + if err != nil { + return DriverOpts{}, err + } + driverOpts.CH = chDB + } if cfg.Postgres != nil && *cfg.Postgres != "" { pgDB, err := pgxpool.New(context.Background(), *cfg.Postgres) diff --git a/internal/services/builder.go b/internal/services/builder.go index 0594fcbd..ac570ddd 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -502,7 +502,8 @@ func (s *serviceInstance) initRedis(ctx context.Context, cfg *config.Config, log func (s *serviceInstance) initLogStore(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { logger.Debug("configuring log store driver", zap.String("service", s.name)) logStoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ - Postgres: &cfg.PostgresURL, + ClickHouse: cfg.ClickHouse.ToConfig(), + Postgres: &cfg.PostgresURL, }) if err != nil { logger.Error("log store driver configuration failed", zap.String("service", s.name), zap.Error(err)) From 49a435969a8f2a4e01401c0da26f2b3d58306ac4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 2 Dec 2025 12:52:58 +0700 Subject: [PATCH 8/8] test: e2e suite with ch --- cmd/e2e/configs/basic.go | 12 ++++++------ cmd/e2e/suites_test.go | 14 +++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index 65054bec..db90677e 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -148,12 +148,12 @@ func setLogStorage(t *testing.T, c *config.Config, logStorage LogStorageType) er case LogStorageTypePostgres: postgresURL := testinfra.NewPostgresConfig(t) c.PostgresURL = postgresURL - // case LogStorageTypeClickHouse: - // clickHouseConfig := testinfra.NewClickHouseConfig(t) - // c.ClickHouse.Addr = clickHouseConfig.Addr - // c.ClickHouse.Username = clickHouseConfig.Username - // c.ClickHouse.Password = clickHouseConfig.Password - // c.ClickHouse.Database = clickHouseConfig.Database + case LogStorageTypeClickHouse: + clickHouseConfig := testinfra.NewClickHouseConfig(t) + c.ClickHouse.Addr = clickHouseConfig.Addr + c.ClickHouse.Username = clickHouseConfig.Username + c.ClickHouse.Password = clickHouseConfig.Password + c.ClickHouse.Database = clickHouseConfig.Database default: return fmt.Errorf("invalid log storage type: %s", logStorage) } diff --git a/cmd/e2e/suites_test.go b/cmd/e2e/suites_test.go index 9bb1f0bb..89d5d75c 100644 --- a/cmd/e2e/suites_test.go +++ b/cmd/e2e/suites_test.go @@ -178,13 +178,13 @@ func (s *basicSuite) TearDownSuite() { s.e2eSuite.TearDownSuite() } -// func TestCHBasicSuite(t *testing.T) { -// t.Parallel() -// if testing.Short() { -// t.Skip("skipping e2e test") -// } -// suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse}) -// } +func TestBasicSuiteWithCH(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip("skipping e2e test") + } + suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse}) +} func TestPGBasicSuite(t *testing.T) { t.Parallel()