Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,144 changes: 881 additions & 263 deletions gen/proto/p2pstream/v1/management.pb.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions gen/proto/p2pstream/v1/p2pstreamv1connect/management.connect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 80 additions & 1 deletion internal/db/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func (db *DB) migrate() error {
status_code INTEGER NOT NULL,
duration_ms INTEGER NOT NULL,
error_kind TEXT NOT NULL DEFAULT '',
method TEXT NOT NULL DEFAULT '',
host TEXT NOT NULL DEFAULT '',
path_prefix TEXT NOT NULL DEFAULT '',
listener_id INTEGER,
route_target_id INTEGER,
route_id INTEGER,
Expand Down Expand Up @@ -266,7 +269,23 @@ func (db *DB) migrate() error {
response_bytes INTEGER NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (bucket_unix_millis, listener_id, route_target_id, route_id, agent_id, error_kind, status_class)
PRIMARY KEY (bucket_unix_millis, listener_id, route_target_id, route_id, agent_id, error_kind, status_class)
);

CREATE TABLE IF NOT EXISTS proxy_request_status_rollup_minutes (
bucket_unix_millis INTEGER NOT NULL,
status_code INTEGER NOT NULL,
requests INTEGER NOT NULL DEFAULT 0,
success INTEGER NOT NULL DEFAULT 0,
client_error INTEGER NOT NULL DEFAULT 0,
server_error INTEGER NOT NULL DEFAULT 0,
internal_error INTEGER NOT NULL DEFAULT 0,
duration_ms_sum INTEGER NOT NULL DEFAULT 0,
request_bytes INTEGER NOT NULL DEFAULT 0,
response_bytes INTEGER NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (bucket_unix_millis, status_code)
);

CREATE TABLE IF NOT EXISTS agent_stat_rollup_minutes (
Expand Down Expand Up @@ -605,6 +624,9 @@ func (db *DB) migrate() error {
`ALTER TABLE proxy_request_events ADD COLUMN cache_rule_id INTEGER`,
`ALTER TABLE proxy_request_events ADD COLUMN cache_status TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE proxy_request_events ADD COLUMN cache_bytes INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE proxy_request_events ADD COLUMN method TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE proxy_request_events ADD COLUMN host TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE proxy_request_events ADD COLUMN path_prefix TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE public_routes ADD COLUMN target_load_balancing TEXT NOT NULL DEFAULT 'round_robin'`,
`ALTER TABLE public_routes ADD COLUMN is_default INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE public_tls_certificates ADD COLUMN source TEXT NOT NULL DEFAULT 'manual'`,
Expand Down Expand Up @@ -655,6 +677,9 @@ func (db *DB) migrate() error {
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_proxy_request_events_agent_id ON proxy_request_events (agent_id)`); err != nil {
return err
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_proxy_request_events_recent_problem ON proxy_request_events (occurred_at DESC) WHERE status_code >= 400 OR error_kind != ''`); err != nil {
return err
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_agent_stats_agent_id ON agent_stats (agent_id)`); err != nil {
return err
}
Expand Down Expand Up @@ -1112,6 +1137,22 @@ func (db *DB) migrateObservabilityRollups() error {
PRIMARY KEY (bucket_unix_millis, listener_id, route_target_id, route_id, agent_id, error_kind, status_class)
);

CREATE TABLE IF NOT EXISTS proxy_request_status_rollup_minutes (
bucket_unix_millis INTEGER NOT NULL,
status_code INTEGER NOT NULL,
requests INTEGER NOT NULL DEFAULT 0,
success INTEGER NOT NULL DEFAULT 0,
client_error INTEGER NOT NULL DEFAULT 0,
server_error INTEGER NOT NULL DEFAULT 0,
internal_error INTEGER NOT NULL DEFAULT 0,
duration_ms_sum INTEGER NOT NULL DEFAULT 0,
request_bytes INTEGER NOT NULL DEFAULT 0,
response_bytes INTEGER NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (bucket_unix_millis, status_code)
);

CREATE TABLE IF NOT EXISTS agent_stat_rollup_minutes (
bucket_unix_millis INTEGER PRIMARY KEY,
samples INTEGER NOT NULL DEFAULT 0,
Expand Down Expand Up @@ -1155,6 +1196,40 @@ func (db *DB) migrateObservabilityRollups() error {
if err != nil {
return err
}
var statusRollupRows, proxyBackfilledThroughID, proxyBackfillUpperID int64
if err := db.QueryRow(`
SELECT
(SELECT COUNT(*) FROM proxy_request_status_rollup_minutes),
proxy_backfilled_through_id,
proxy_backfill_upper_id
FROM observability_rollup_state
WHERE id = 1
`).Scan(&statusRollupRows, &proxyBackfilledThroughID, &proxyBackfillUpperID); err != nil {
return err
}
if statusRollupRows == 0 && proxyBackfilledThroughID >= proxyBackfillUpperID {
if _, err := db.Exec(`
INSERT INTO proxy_request_status_rollup_minutes (
bucket_unix_millis, status_code, requests, success, client_error, server_error,
internal_error, duration_ms_sum, request_bytes, response_bytes
)
SELECT
CAST((unixepoch(occurred_at) / 60) * 60 * 1000 AS INTEGER) AS bucket_unix_millis,
status_code,
COUNT(*) AS requests,
CAST(COALESCE(SUM(CASE WHEN status_code >= 200 AND status_code < 400 THEN 1 ELSE 0 END), 0) AS INTEGER) AS success,
CAST(COALESCE(SUM(CASE WHEN status_code >= 400 AND status_code < 500 THEN 1 ELSE 0 END), 0) AS INTEGER) AS client_error,
CAST(COALESCE(SUM(CASE WHEN status_code >= 500 THEN 1 ELSE 0 END), 0) AS INTEGER) AS server_error,
CAST(COALESCE(SUM(CASE WHEN error_kind != '' THEN 1 ELSE 0 END), 0) AS INTEGER) AS internal_error,
CAST(COALESCE(SUM(duration_ms), 0) AS INTEGER) AS duration_ms_sum,
CAST(COALESCE(SUM(request_bytes), 0) AS INTEGER) AS request_bytes,
CAST(COALESCE(SUM(response_bytes), 0) AS INTEGER) AS response_bytes
FROM proxy_request_events
GROUP BY bucket_unix_millis, status_code
`); err != nil {
return err
}
}
return db.migrateProxyObservabilityTargetOnly()
}

Expand Down Expand Up @@ -1184,6 +1259,7 @@ func (db *DB) migrateProxyObservabilityTargetOnly() error {
DELETE FROM proxy_request_events;
DELETE FROM proxy_request_rollup_minutes;
DELETE FROM proxy_request_tuple_rollup_minutes;
DELETE FROM proxy_request_status_rollup_minutes;
UPDATE observability_rollup_state
SET proxy_backfill_upper_id = 0,
proxy_backfilled_through_id = 0,
Expand All @@ -1201,6 +1277,9 @@ func (db *DB) migrateProxyObservabilityTargetOnly() error {
status_code INTEGER NOT NULL,
duration_ms INTEGER NOT NULL,
error_kind TEXT NOT NULL DEFAULT '',
method TEXT NOT NULL DEFAULT '',
host TEXT NOT NULL DEFAULT '',
path_prefix TEXT NOT NULL DEFAULT '',
listener_id INTEGER,
route_target_id INTEGER,
route_id INTEGER,
Expand Down
16 changes: 11 additions & 5 deletions internal/db/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMigrationCreatesMultiAgentRoutingSchema(t *testing.T) {
}
defer func() { _ = database.Close() }()

for _, table := range []string{"agents", "public_agent_labels", "public_route_targets", "public_route_target_upstream_headers", "public_route_target_response_headers", "public_waf_captcha_providers", "public_waf_rules", "public_waf_settings", "public_cache_settings", "public_cache_rules", "public_cache_entries", "proxy_request_rollup_minutes", "proxy_request_tuple_rollup_minutes", "agent_stat_rollup_minutes", "observability_rollup_state"} {
for _, table := range []string{"agents", "public_agent_labels", "public_route_targets", "public_route_target_upstream_headers", "public_route_target_response_headers", "public_waf_captcha_providers", "public_waf_rules", "public_waf_settings", "public_cache_settings", "public_cache_rules", "public_cache_entries", "proxy_request_rollup_minutes", "proxy_request_tuple_rollup_minutes", "proxy_request_status_rollup_minutes", "agent_stat_rollup_minutes", "observability_rollup_state"} {
var name string
if err := database.QueryRowContext(context.Background(), `SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?`, table).Scan(&name); err != nil {
t.Fatalf("expected table %s: %v", table, err)
Expand All @@ -97,7 +97,7 @@ func TestMigrationCreatesMultiAgentRoutingSchema(t *testing.T) {
}

proxyEventColumns := tableColumns(t, database, "proxy_request_events")
for _, column := range []string{"request_bytes", "response_bytes", "waf_rule_id", "waf_action", "cache_rule_id", "cache_status", "cache_bytes"} {
for _, column := range []string{"request_bytes", "response_bytes", "waf_rule_id", "waf_action", "cache_rule_id", "cache_status", "cache_bytes", "method", "host", "path_prefix"} {
if !containsString(proxyEventColumns, column) {
t.Fatalf("proxy_request_events missing column %s in %v", column, proxyEventColumns)
}
Expand All @@ -116,6 +116,7 @@ func TestMigrationCreatesMultiAgentRoutingSchema(t *testing.T) {
for _, index := range []string{
"idx_proxy_request_events_route_id",
"idx_proxy_request_events_agent_id",
"idx_proxy_request_events_recent_problem",
"idx_proxy_request_events_waf_rule_id",
"idx_proxy_request_events_cache_rule_id",
"idx_public_waf_rules_priority",
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestMigrationUpgradesLegacySchemaWithAgentColumns(t *testing.T) {
for table, columns := range map[string][]string{
"connections": {"agent_id"},
"agent_stats": {"agent_id", "req_internal_error", "cpu_percent"},
"proxy_request_events": {"agent_id", "listener_id", "route_id", "route_target_id", "waf_rule_id", "waf_action", "request_bytes", "response_bytes", "cache_rule_id", "cache_status", "cache_bytes"},
"proxy_request_events": {"agent_id", "listener_id", "route_id", "route_target_id", "waf_rule_id", "waf_action", "request_bytes", "response_bytes", "cache_rule_id", "cache_status", "cache_bytes", "method", "host", "path_prefix"},
"public_cache_rules": {"allow_cookie_requests"},
} {
got := tableColumns(t, database, table)
Expand Down Expand Up @@ -455,14 +456,15 @@ func TestMigrationUpgradesLegacySchemaWithAgentColumns(t *testing.T) {
for _, index := range []string{
"idx_proxy_request_events_route_id",
"idx_proxy_request_events_agent_id",
"idx_proxy_request_events_recent_problem",
"idx_proxy_request_events_waf_rule_id",
"idx_proxy_request_events_cache_rule_id",
} {
if !indexExists(t, database, index) {
t.Fatalf("expected %s after migration", index)
}
}
for _, table := range []string{"public_cache_settings", "public_cache_rules", "public_cache_entries", "proxy_request_rollup_minutes", "proxy_request_tuple_rollup_minutes", "agent_stat_rollup_minutes", "observability_rollup_state"} {
for _, table := range []string{"public_cache_settings", "public_cache_rules", "public_cache_entries", "proxy_request_rollup_minutes", "proxy_request_tuple_rollup_minutes", "proxy_request_status_rollup_minutes", "agent_stat_rollup_minutes", "observability_rollup_state"} {
var name string
if err := database.QueryRowContext(context.Background(), `SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?`, table).Scan(&name); err != nil {
t.Fatalf("expected migrated table %s: %v", table, err)
Expand Down Expand Up @@ -637,6 +639,9 @@ func TestMigrationResetsProxyObservabilityAndRenamesWAFRouteTargetTrigger(t *tes
if countRows(t, database, `SELECT COUNT(*) FROM proxy_request_tuple_rollup_minutes`) != 0 {
t.Fatal("expected proxy request tuple rollups to be reset")
}
if countRows(t, database, `SELECT COUNT(*) FROM proxy_request_status_rollup_minutes`) != 0 {
t.Fatal("expected proxy request status rollups to be reset")
}
if countRows(t, database, `SELECT COUNT(*) FROM agent_stats`) != 1 {
t.Fatal("expected agent stats to be preserved")
}
Expand Down Expand Up @@ -674,7 +679,8 @@ func TestMigrationResetsProxyObservabilityAndRenamesWAFRouteTargetTrigger(t *tes
defer func() { _ = database.Close() }()
if countRows(t, database, `SELECT COUNT(*) FROM proxy_request_events`) != 0 ||
countRows(t, database, `SELECT COUNT(*) FROM proxy_request_rollup_minutes`) != 0 ||
countRows(t, database, `SELECT COUNT(*) FROM proxy_request_tuple_rollup_minutes`) != 0 {
countRows(t, database, `SELECT COUNT(*) FROM proxy_request_tuple_rollup_minutes`) != 0 ||
countRows(t, database, `SELECT COUNT(*) FROM proxy_request_status_rollup_minutes`) != 0 {
t.Fatal("expected proxy observability reset migration to be idempotent")
}
if countRows(t, database, `SELECT COUNT(*) FROM agent_stats`) != 1 {
Expand Down
Loading