Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ func setLogStorage(t *testing.T, c *config.Config, logStorage LogStorageType) er
case LogStorageTypePostgres:
postgresURL := testinfra.NewPostgresConfig(t)
c.PostgresURL = postgresURL
// case LogStorageTypeClickHouse:
// clickHouseConfig := testinfra.NewClickHouseConfig(t)
// c.ClickHouse.Addr = clickHouseConfig.Addr
// c.ClickHouse.Username = clickHouseConfig.Username
// c.ClickHouse.Password = clickHouseConfig.Password
// c.ClickHouse.Database = clickHouseConfig.Database
case LogStorageTypeClickHouse:
clickHouseConfig := testinfra.NewClickHouseConfig(t)
c.ClickHouse.Addr = clickHouseConfig.Addr
c.ClickHouse.Username = clickHouseConfig.Username
c.ClickHouse.Password = clickHouseConfig.Password
c.ClickHouse.Database = clickHouseConfig.Database
default:
return fmt.Errorf("invalid log storage type: %s", logStorage)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/e2e/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ func (s *basicSuite) TearDownSuite() {
s.e2eSuite.TearDownSuite()
}

// func TestCHBasicSuite(t *testing.T) {
// t.Parallel()
// if testing.Short() {
// t.Skip("skipping e2e test")
// }
// suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse})
// }
func TestBasicSuiteWithCH(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping e2e test")
}
suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse})
}

func TestPGBasicSuite(t *testing.T) {
t.Parallel()
Expand Down
1 change: 1 addition & 0 deletions internal/apirouter/log_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (h *LogHandlers) ListDeliveryByEvent(c *gin.Context) {
return
}
deliveries, err := h.logStore.ListDelivery(c.Request.Context(), logstore.ListDeliveryRequest{
TenantID: event.TenantID,
EventID: event.ID,
DestinationID: c.Query("destination_id"),
})
Expand Down
2 changes: 1 addition & 1 deletion internal/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func New(config *ClickHouseConfig) (DB, error) {

// Debug: true,
// Debugf: func(format string, v ...any) {
// fmt.Printf(format+"\n", v...)
// fmt.Printf("[CH DEBUG] "+format+"\n", v...)
// },
})
return conn, err
Expand Down
71 changes: 36 additions & 35 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/caarlos0/env/v9"
"github.com/hookdeck/outpost/internal/backoff"
"github.com/hookdeck/outpost/internal/clickhouse"
"github.com/hookdeck/outpost/internal/migrator"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/telemetry"
Expand Down Expand Up @@ -61,10 +62,10 @@ type Config struct {
HTTPUserAgent string `yaml:"http_user_agent" env:"HTTP_USER_AGENT" desc:"Custom HTTP User-Agent string for outgoing webhook deliveries. If unset, a default (OrganizationName/Version) is used." required:"N"`

// Infrastructure
Redis RedisConfig `yaml:"redis"`
// ClickHouse ClickHouseConfig `yaml:"clickhouse"`
PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"Y"`
MQs *MQsConfig `yaml:"mqs"`
Redis RedisConfig `yaml:"redis"`
ClickHouse ClickHouseConfig `yaml:"clickhouse"`
PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"N"`
MQs *MQsConfig `yaml:"mqs"`

// PublishMQ
PublishMQ PublishMQConfig `yaml:"publishmq"`
Expand Down Expand Up @@ -131,9 +132,9 @@ func (c *Config) InitDefaults() {
Host: "127.0.0.1",
Port: 6379,
}
// c.ClickHouse = ClickHouseConfig{
// Database: "outpost",
// }
c.ClickHouse = ClickHouseConfig{
Database: "outpost",
}
c.MQs = &MQsConfig{
RabbitMQ: RabbitMQConfig{
Exchange: "outpost",
Expand Down Expand Up @@ -378,24 +379,24 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig {
}
}

// type ClickHouseConfig struct {
// Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'. Required if ClickHouse is used for log storage." required:"C"`
// Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
// Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
// Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
// }

// func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
// if c.Addr == "" {
// return nil
// }
// return &clickhouse.ClickHouseConfig{
// Addr: c.Addr,
// Username: c.Username,
// Password: c.Password,
// Database: c.Database,
// }
// }
type ClickHouseConfig struct {
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
}

func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
if c.Addr == "" {
return nil
}
return &clickhouse.ClickHouseConfig{
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
}
}

type AlertConfig struct {
CallbackURL string `yaml:"callback_url" env:"ALERT_CALLBACK_URL" desc:"URL to which Outpost will send a POST request when an alert is triggered (e.g., for destination failures)." required:"N"`
Expand Down Expand Up @@ -447,10 +448,10 @@ func (c *Config) ToTelemetryApplicationInfo() telemetry.ApplicationInfo {
portalEnabled := c.APIKey != "" && c.APIJWTSecret != ""

entityStore := "redis"
logStore := "TODO"
// if c.ClickHouse.Addr != "" {
// logStore = "clickhouse"
// }
logStore := ""
if c.ClickHouse.Addr != "" {
logStore = "clickhouse"
}
if c.PostgresURL != "" {
logStore = "postgres"
}
Expand All @@ -471,11 +472,11 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts {
PG: migrator.MigrationOptsPG{
URL: c.PostgresURL,
},
// CH: migrator.MigrationOptsCH{
// Addr: c.ClickHouse.Addr,
// Username: c.ClickHouse.Username,
// Password: c.ClickHouse.Password,
// Database: c.ClickHouse.Database,
// },
CH: migrator.MigrationOptsCH{
Addr: c.ClickHouse.Addr,
Username: c.ClickHouse.Username,
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
},
}
}
2 changes: 1 addition & 1 deletion internal/config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func maskPostgresURLHost(url string) string {
if url == "" {
return ""
}

// postgres://user:password@host:port/database?params
if idx := strings.Index(url, "@"); idx != -1 {
rest := url[idx+1:]
Expand Down
9 changes: 1 addition & 8 deletions internal/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,10 @@ func (c *Config) validateRedis() error {
}

// validateLogStorage validates the ClickHouse / PG configuration
// Temporary: disable CH as it's not fully supported yet
func (c *Config) validateLogStorage() error {
// if c.ClickHouse.Addr == "" && c.PostgresURL == "" {
// return ErrMissingLogStorage
// }
if c.PostgresURL == "" {
if c.ClickHouse.Addr == "" && c.PostgresURL == "" {
return ErrMissingLogStorage
}
// if c.ClickHouse.Addr != "" {
// return fmt.Errorf("ClickHouse is not currently supported")
// }
return nil
}

Expand Down
94 changes: 47 additions & 47 deletions internal/deliverymq/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
)

type RetryDeliveryMQSuite struct {
ctx context.Context
mqConfig *mqs.QueueConfig
retryMaxCount int
retryBackoff backoff.Backoff
ctx context.Context
mqConfig *mqs.QueueConfig
retryMaxCount int
retryBackoff backoff.Backoff
schedulerPollBackoff time.Duration
publisher deliverymq.Publisher
eventGetter deliverymq.EventGetter
logPublisher deliverymq.LogPublisher
destGetter deliverymq.DestinationGetter
alertMonitor deliverymq.AlertMonitor
deliveryMQ *deliverymq.DeliveryMQ
teardown func()
publisher deliverymq.Publisher
eventGetter deliverymq.EventGetter
logPublisher deliverymq.LogPublisher
destGetter deliverymq.DestinationGetter
alertMonitor deliverymq.AlertMonitor
deliveryMQ *deliverymq.DeliveryMQ
teardown func()
}

func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) {
Expand Down Expand Up @@ -136,15 +136,15 @@ func TestDeliveryMQRetry_EligibleForRetryFalse(t *testing.T) {
eventGetter.registerEvent(&event)

suite := &RetryDeliveryMQSuite{
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
schedulerPollBackoff: 10 * time.Millisecond,
}
suite.SetupTest(t)
Expand Down Expand Up @@ -199,15 +199,15 @@ func TestDeliveryMQRetry_EligibleForRetryTrue(t *testing.T) {
eventGetter.registerEvent(&event)

suite := &RetryDeliveryMQSuite{
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
schedulerPollBackoff: 10 * time.Millisecond,
}
suite.SetupTest(t)
Expand Down Expand Up @@ -257,15 +257,15 @@ func TestDeliveryMQRetry_SystemError(t *testing.T) {
eventGetter.registerEvent(&event)

suite := &RetryDeliveryMQSuite{
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: newMockPublisher(nil), // publisher won't be called due to early error
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: destGetter,
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: newMockPublisher(nil), // publisher won't be called due to early error
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: destGetter,
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 10,
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
schedulerPollBackoff: 10 * time.Millisecond,
}
suite.SetupTest(t)
Expand Down Expand Up @@ -327,15 +327,15 @@ func TestDeliveryMQRetry_RetryMaxCount(t *testing.T) {
eventGetter.registerEvent(&event)

suite := &RetryDeliveryMQSuite{
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 2, // 1 initial + 2 retries = 3 total attempts
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
ctx: ctx,
mqConfig: &mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}},
publisher: publisher,
eventGetter: eventGetter,
logPublisher: newMockLogPublisher(nil),
destGetter: &mockDestinationGetter{dest: &destination},
alertMonitor: newMockAlertMonitor(),
retryMaxCount: 2, // 1 initial + 2 retries = 3 total attempts
retryBackoff: &backoff.ConstantBackoff{Interval: 50 * time.Millisecond},
schedulerPollBackoff: 10 * time.Millisecond,
}
suite.SetupTest(t)
Expand Down
Loading
Loading