Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/actionengine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestExecutorApprovalFlowAndStorePersistence(t *testing.T) {
}
if loaded == nil {
t.Fatal("expected persisted execution")
return
}
if loaded.Status != StatusCompleted {
t.Fatalf("persisted status = %s, want %s", loaded.Status, StatusCompleted)
Expand Down
109 changes: 109 additions & 0 deletions internal/api/server_handlers_graph_store_org_policies_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package api

import (
"net/http"
"testing"
"time"

"github.com/writer/cerebro/internal/app"
"github.com/writer/cerebro/internal/graph"
)

func buildGraphStoreOrgPolicyReadTestGraph(t *testing.T) *graph.Graph {
t.Helper()

g := buildOrgPolicyTestGraph()
if _, err := graph.WriteOrganizationalPolicy(g, graph.OrganizationalPolicyWriteRequest{
Title: "Acceptable Use Policy",
PolicyVersion: "2026.03",
OwnerID: "person:owner",
ReviewCycleDays: 365,
RequiredDepartmentIDs: []string{"department:engineering"},
ObservedAt: time.Date(2026, 3, 20, 9, 0, 0, 0, time.UTC),
}); err != nil {
t.Fatalf("seed policy error = %v", err)
}
if _, err := graph.AcknowledgeOrganizationalPolicy(g, graph.OrganizationalPolicyAcknowledgmentRequest{
PolicyID: "policy:acceptable-use-policy",
PersonID: "person:alice",
AcknowledgedAt: time.Date(2026, 3, 20, 10, 0, 0, 0, time.UTC),
}); err != nil {
t.Fatalf("seed policy acknowledgment error = %v", err)
}
return g
}

func newLiveGraphStoreOrgPolicyServer(t *testing.T, g *graph.Graph, store graph.GraphStore) *Server {
t.Helper()
s := NewServerWithDependencies(serverDependencies{
Config: &app.Config{},
graphRuntime: stubGraphRuntime{graph: g, store: store},
})
t.Cleanup(func() { s.Close() })
return s
}

func TestOrgPolicyReadEndpointsUseStoreSubgraphWhenSnapshotsUnavailable(t *testing.T) {
s := newStoreBackedGraphServer(t, nilSnapshotGraphStore{GraphStore: buildGraphStoreOrgPolicyReadTestGraph(t)})

status := do(t, s, http.MethodGet, "/api/v1/org/policies/policy:acceptable-use-policy/acknowledgment-status", nil)
if status.Code != http.StatusOK {
t.Fatalf("expected acknowledgment status 200, got %d: %s", status.Code, status.Body.String())
}
statusBody := decodeJSON(t, status)
if got := int(statusBody["required_people"].(float64)); got != 2 {
t.Fatalf("required_people = %d, want 2", got)
}

assignees := do(t, s, http.MethodGet, "/api/v1/org/policies/policy:acceptable-use-policy/assignees", nil)
if assignees.Code != http.StatusOK {
t.Fatalf("expected assignee roster 200, got %d: %s", assignees.Code, assignees.Body.String())
}
assigneesBody := decodeJSON(t, assignees)
if got := int(assigneesBody["acknowledged_people"].(float64)); got != 1 {
t.Fatalf("acknowledged_people = %d, want 1", got)
}
if got := int(assigneesBody["pending_people"].(float64)); got != 1 {
t.Fatalf("pending_people = %d, want 1", got)
}

reminders := do(t, s, http.MethodGet, "/api/v1/org/policies/policy:acceptable-use-policy/reminders", nil)
if reminders.Code != http.StatusOK {
t.Fatalf("expected reminder report 200, got %d: %s", reminders.Code, reminders.Body.String())
}
remindersBody := decodeJSON(t, reminders)
if got := int(remindersBody["pending_people"].(float64)); got != 1 {
t.Fatalf("pending_people = %d, want 1", got)
}

history := do(t, s, http.MethodGet, "/api/v1/org/policies/policy:acceptable-use-policy/versions", nil)
if history.Code != http.StatusOK {
t.Fatalf("expected version history 200, got %d: %s", history.Code, history.Body.String())
}
historyBody := decodeJSON(t, history)
if got := int(historyBody["count"].(float64)); got != 1 {
t.Fatalf("history count = %d, want 1", got)
}
}

func TestOrgPolicyReadEndpointsPreferLiveGraphWhenAvailable(t *testing.T) {
g := buildGraphStoreOrgPolicyReadTestGraph(t)
store := &countingSnapshotStore{GraphStore: g}
s := newLiveGraphStoreOrgPolicyServer(t, g, store)

paths := []string{
"/api/v1/org/policies/policy:acceptable-use-policy/acknowledgment-status",
"/api/v1/org/policies/policy:acceptable-use-policy/assignees",
"/api/v1/org/policies/policy:acceptable-use-policy/reminders",
"/api/v1/org/policies/policy:acceptable-use-policy/versions",
}
for _, path := range paths {
resp := do(t, s, http.MethodGet, path, nil)
if resp.Code != http.StatusOK {
t.Fatalf("expected %s to return 200, got %d: %s", path, resp.Code, resp.Body.String())
}
}
if got := store.count.Load(); got != 0 {
t.Fatalf("expected live graph reads to avoid snapshot calls, got %d", got)
}
}
2 changes: 2 additions & 0 deletions internal/api/server_handlers_threat_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ func TestTelemetryIngestDedupesAWSVPCFlowLogsByObservationID(t *testing.T) {
}
if run == nil || run.LastCheckpoint == nil {
t.Fatalf("expected duplicate ingest run checkpoint, got %#v", run)
return
}
if got := run.LastCheckpoint.Metadata["duplicate_events"]; got != "1" {
t.Fatalf("duplicate_events = %q, want 1", got)
Expand Down Expand Up @@ -1610,6 +1611,7 @@ func TestRuntimeIngestSessionRecordObservationUsesProcessingTimeForRunUpdates(t
}
if session == nil || session.run == nil {
t.Fatal("expected runtime ingest session")
return
}

historicalObservedAt := time.Date(2024, 1, 2, 3, 4, 5, 0, time.UTC)
Expand Down
35 changes: 31 additions & 4 deletions internal/api/server_services_org_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type serverOrgPolicyService struct {
deps *serverDependencies
}

var orgPolicyReadSubgraphOptions = graph.ExtractSubgraphOptions{
MaxDepth: 2,
Direction: graph.ExtractSubgraphDirectionBoth,
}

func newOrgPolicyService(deps *serverDependencies) orgPolicyService {
return serverOrgPolicyService{deps: deps}
}
Expand Down Expand Up @@ -121,7 +126,7 @@ func (s serverOrgPolicyService) ProgramStatus(ctx context.Context, framework str
}

func (s serverOrgPolicyService) PolicyStatus(ctx context.Context, policyID string) (*graph.OrganizationalPolicyAcknowledgmentReport, error) {
g, err := s.tenantGraph(ctx)
g, err := s.policyGraph(ctx, policyID)
if err != nil {
return nil, err
}
Expand All @@ -130,7 +135,7 @@ func (s serverOrgPolicyService) PolicyStatus(ctx context.Context, policyID strin
}

func (s serverOrgPolicyService) PolicyAssignees(ctx context.Context, policyID string) (*graph.OrganizationalPolicyAssigneeRosterReport, error) {
g, err := s.tenantGraph(ctx)
g, err := s.policyGraph(ctx, policyID)
if err != nil {
return nil, err
}
Expand All @@ -139,7 +144,7 @@ func (s serverOrgPolicyService) PolicyAssignees(ctx context.Context, policyID st
}

func (s serverOrgPolicyService) PolicyReminders(ctx context.Context, policyID string) (*graph.OrganizationalPolicyReminderReport, error) {
g, err := s.tenantGraph(ctx)
g, err := s.policyGraph(ctx, policyID)
if err != nil {
return nil, err
}
Expand All @@ -148,7 +153,7 @@ func (s serverOrgPolicyService) PolicyReminders(ctx context.Context, policyID st
}

func (s serverOrgPolicyService) PolicyVersionHistory(ctx context.Context, policyID string) ([]graph.OrganizationalPolicyVersionHistoryEntry, error) {
g, err := s.tenantGraph(ctx)
g, err := s.policyGraph(ctx, policyID)
if err != nil {
return nil, err
}
Expand All @@ -169,6 +174,28 @@ func (s serverOrgPolicyService) tenantGraph(ctx context.Context) (*graph.Graph,
return currentOrStoredTenantGraphView(ctx, s.deps)
}

func (s serverOrgPolicyService) policyGraph(ctx context.Context, policyID string) (*graph.Graph, error) {
if s.deps == nil {
return nil, graph.ErrStoreUnavailable
}
tenantID := currentTenantScopeID(ctx)
if current := s.deps.CurrentSecurityGraphForTenant(tenantID); current != nil {
return graph.ExtractSubgraph(current, strings.TrimSpace(policyID), orgPolicyReadSubgraphOptions), nil
}
store := s.deps.CurrentSecurityGraphStoreForTenant(tenantID)
if store == nil {
return nil, graph.ErrStoreUnavailable
}
view, err := store.ExtractSubgraph(ctx, strings.TrimSpace(policyID), orgPolicyReadSubgraphOptions)
if err != nil {
return nil, err
}
if view == nil {
return nil, graph.ErrStoreUnavailable
}
return view, nil
}

func wrapOrgPolicyError(op cerrors.Op, err error) error {
if err == nil {
return nil
Expand Down
6 changes: 4 additions & 2 deletions internal/graph/store_neptune.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,8 +1601,9 @@ func neptuneDecodeNode(record map[string]any) (*Node, error) {
if len(record) == 0 {
return nil, nil
}
decodedNodeID := strings.TrimSpace(readString(record, "id"))
node := &Node{
ID: strings.TrimSpace(readString(record, "id")),
ID: decodedNodeID,
Kind: NodeKind(strings.TrimSpace(readString(record, "kind"))),
Name: readString(record, "name"),
TenantID: readString(record, "tenant_id"),
Expand Down Expand Up @@ -1646,8 +1647,9 @@ func neptuneDecodeEdge(record map[string]any) (*Edge, error) {
if len(record) == 0 {
return nil, nil
}
decodedEdgeID := strings.TrimSpace(readString(record, "id"))
edge := &Edge{
ID: strings.TrimSpace(readString(record, "id")),
ID: decodedEdgeID,
Source: strings.TrimSpace(readString(record, "source")),
Target: strings.TrimSpace(readString(record, "target")),
Kind: EdgeKind(strings.TrimSpace(readString(record, "kind"))),
Expand Down
5 changes: 4 additions & 1 deletion internal/graph/vendor_risk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
)

func TestBuildVendorRiskReportFiltersAndAlerts(t *testing.T) {
now := time.Now().UTC().Truncate(time.Second)
now := time.Date(2026, 3, 20, 0, 0, 0, 0, time.UTC)
origNow := temporalNowUTC
defer func() { temporalNowUTC = origNow }()
temporalNowUTC = func() time.Time { return now }
earlier := now.Add(-14 * 24 * time.Hour)
later := now.Add(-24 * time.Hour)

Expand Down