diff --git a/docs/status-conditions.md b/docs/status-conditions.md index 92aa3ed6..81079194 100644 --- a/docs/status-conditions.md +++ b/docs/status-conditions.md @@ -301,6 +301,14 @@ These events are emitted during proactive primary failovers (e.g., when a primar | `FailoverTimeout` | Warning | The failover did not complete within the allowed timeout | | `FailoverCompleted` | Normal | The replica has successfully become the new primary | +### Recovery events + +These events are emitted when the operator intervenes to recover the cluster. + +| Event Type | Type | Description | +|---|---|---| +| `ReplicasTakenOver` | Normal | Orphaned replicas promoted via `CLUSTER FAILOVER TAKEOVER` after quorum loss | + ### Maintenance events These events are emitted during cluster maintenance operations. diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 590affb3..72ef0fe3 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -82,27 +82,31 @@ type ValkeyClusterReconciler struct { // cluster one step closer to the desired state described by the ValkeyCluster // spec. The pipeline runs in the following order: // -// 1. Ensure the headless Service exists (upsertService). -// 2. Ensure the ConfigMap with valkey.conf and health-check scripts exists +// - Ensure the headless Service exists (upsertService). +// - Ensure PodDisruptionBudget exists (reconcilePodDisruptionBudget). +// - Ensure internal ACL users are configured (reconcileUsersAcl). +// - Ensure the ConfigMap with valkey.conf and health-check scripts exists // (upsertConfigMap). -// 3. Ensure one ValkeyNode per (shard, node) pair exists, creating missing +// - Ensure one ValkeyNode per (shard, node) pair exists, creating missing // nodes and propagating spec changes one at a time in shard order with // replicas updated before the primary (reconcileValkeyNodes). -// 4. List all ValkeyNodes and build the Valkey cluster state by connecting to each -// node and scraping CLUSTER INFO / CLUSTER NODES. -// 5. Forget stale nodes that no longer have a backing ValkeyNode. -// 6. Phase 1 – MEET: batch-introduce all isolated pending nodes to the -// cluster via CLUSTER MEET. Requeue to let gossip propagate. -// 7. Phase 2 – Assign slots: batch-assign hash-slot ranges to all -// primary-labeled pending nodes via CLUSTER ADDSLOTSRANGE. -// 8. Phase 3 – Replicate: batch-attach all replica-labeled pending nodes -// to their matching primaries via CLUSTER REPLICATE. -// 9. Scale-in: if the cluster has more shards than desired, drain slots +// - Build the Valkey cluster state by connecting to each node and scraping +// CLUSTER INFO / CLUSTER NODES. +// - Promote orphaned replicas via CLUSTER FAILOVER TAKEOVER when quorum +// is lost (promoteOrphanedReplicas). +// - Forget stale nodes that no longer have a backing ValkeyNode. +// - MEET: batch-introduce all isolated pending nodes to the cluster via +// CLUSTER MEET. Requeue to let gossip propagate. +// - Assign slots: batch-assign hash-slot ranges to all primary-labeled +// pending nodes via CLUSTER ADDSLOTSRANGE. +// - Replicate: batch-attach all replica-labeled pending nodes to their +// matching primaries via CLUSTER REPLICATE. +// - Scale-in: if the cluster has more shards than desired, drain slots // from excess shards via CLUSTER MIGRATESLOTS and delete their // ValkeyNodes once fully drained. -// 10. Verify that the expected number of shards and replicas exist. -// 11. Verify that all 16384 hash slots are assigned. -// 12. If everything is healthy, mark the cluster Ready and requeue after 30s +// - Verify that the expected number of shards and replicas exist. +// - Verify that all 16384 hash slots are assigned. +// - If everything is healthy, mark the cluster Ready and requeue after 30s // for periodic health checks. // // For more details, check Reconcile and its Result here: @@ -188,6 +192,12 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques state := r.getValkeyClusterState(ctx, cluster, nodes, operatorUser, operatorPassword) defer state.CloseClients() + // Promote replicas of dead primaries when quorum is lost. + // TAKEOVER before FORGET so slots remain continuously owned. + if result, handled := r.promoteOrphanedReplicas(ctx, cluster, state); handled { + return result, nil + } + r.forgetStaleNodes(ctx, cluster, state, nodes) // --- Phase 1: MEET all isolated nodes in one batch --- @@ -973,6 +983,59 @@ func (r *ValkeyClusterReconciler) replicateToShardPrimary(ctx context.Context, c return nil } +// promoteOrphanedReplicas issues CLUSTER FAILOVER TAKEOVER to replicas whose +// primary is dead and failover quorum is unreachable. With persistence enabled +// the pod will return with the same node ID, so TAKEOVER is skipped. +// Returns (result, true) if a promotion was made and reconcile should requeue. +func (r *ValkeyClusterReconciler) promoteOrphanedReplicas(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState) (ctrl.Result, bool) { + if cluster.Spec.Persistence != nil || state.HasFailoverQuorum() { + return ctrl.Result{}, false + } + log := logf.FromContext(ctx) + var promoted, attempted int + deadPrimaries := make(map[string]bool) + for _, shard := range state.Shards { + for _, node := range shard.Nodes { + primaryId := node.PrimaryIdFromSelf() + if primaryId == "-" || primaryId == "" { + continue // node is a primary + } + if deadPrimaries[primaryId] { + continue + } + if !state.IsNodeFailed(primaryId) { + continue + } + // Note: we don't check if the pod still exists in k8s here. + // Without persistence, even if the primary is merely partitioned + // (not gone), the TAKEOVER'd replica will own the slots at a + // higher epoch. A returning primary will see the higher epoch + // and demote itself; no split-brain. + deadPrimaries[primaryId] = true + } + } + for primaryId := range deadPrimaries { + replica := state.BestReplicaOf(primaryId) + if replica == nil { + continue + } + attempted++ + log.Info("promoting orphaned replica via TAKEOVER", "node", replica.Address, "deadPrimary", primaryId) + if err := replica.Client.Do(ctx, replica.Client.B().ClusterFailover().Takeover().Build()).Error(); err != nil { + log.Error(err, "CLUSTER FAILOVER TAKEOVER failed", "node", replica.Address) + } else { + promoted++ + } + } + if promoted > 0 || attempted > 0 { + if promoted > 0 { + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, "ReplicasTakenOver", "FailoverTakeover", "Promoted %d orphaned replica(s) via TAKEOVER", promoted) + } + return ctrl.Result{RequeueAfter: 2 * time.Second}, true + } + return ctrl.Result{}, false +} + // Check each cluster node and forget stale nodes (noaddr or status fail) func (r *ValkeyClusterReconciler) forgetStaleNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState, nodes *valkeyiov1alpha1.ValkeyNodeList) { log := logf.FromContext(ctx) @@ -991,11 +1054,16 @@ func (r *ValkeyClusterReconciler) forgetStaleNodes(ctx context.Context, cluster // voting in the auto-failover election, permanently // blocking the replica's promotion. if state.HasReplicaOf(failing.Id) { - log.V(1).Info("skipping forget; failover pending for node", + if cluster.Spec.Persistence != nil || state.HasFailoverQuorum() { + log.V(1).Info("skipping forget; failover pending for node", + "address", failing.Address, "Id", failing.Id) + continue + } + log.Info("forget node despite pending replica; quorum unreachable", "address", failing.Address, "Id", failing.Id) - continue + } else { + log.V(1).Info("forget a failing node", "address", failing.Address, "Id", failing.Id) } - log.V(1).Info("forget a failing node", "address", failing.Address, "Id", failing.Id) if err := node.Client.Do(ctx, node.Client.B().ClusterForget().NodeId(failing.Id).Build()).Error(); err != nil { log.Error(err, "command failed: CLUSTER FORGET") r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "NodeForgetFailed", "ForgetNode", "Failed to forget node: %v", err) diff --git a/internal/valkey/clusterstate.go b/internal/valkey/clusterstate.go index d4a46156..e32d7007 100644 --- a/internal/valkey/clusterstate.go +++ b/internal/valkey/clusterstate.go @@ -268,6 +268,74 @@ func (s *ClusterState) HasReplicaOf(nodeId string) bool { return false } +// HasFailoverQuorum returns true if a majority of slot-owning primaries are +// reachable. Valkey requires a majority of primaries to vote in a failover +// election; if quorum is unreachable, no automatic failover can succeed. +// Uses cluster_size from CLUSTER INFO (total primaries with slots, including +// failed ones) as the denominator. +func (s *ClusterState) HasFailoverQuorum() bool { + if len(s.Shards) == 0 { + return false + } + var livePrimaries, clusterSize int + for _, shard := range s.Shards { + if shard.GetPrimaryNode() != nil && len(shard.Slots) > 0 { + livePrimaries++ + } + for _, node := range shard.Nodes { + // Take the max across nodes in case gossip hasn't fully propagated. + if size, err := strconv.Atoi(node.ClusterInfo["cluster_size"]); err == nil && size > clusterSize { + clusterSize = size + } + } + } + if clusterSize == 0 { + return false + } + return livePrimaries > (clusterSize / 2) +} + +// IsNodeFailed returns true if any live node reports the given node ID as +// "fail" or "fail?" in CLUSTER NODES. Includes "fail?" (pfail) because when +// majority of primaries are down, pfail can never promote to fail. +func (s *ClusterState) IsNodeFailed(nodeId string) bool { + for _, shard := range s.Shards { + for _, node := range shard.Nodes { + for line := range strings.SplitSeq(node.ClusterNodes, "\n") { + fields := strings.Fields(line) + if len(fields) < 8 || fields[0] != nodeId { + continue + } + flags := strings.Split(fields[2], ",") + if slices.Contains(flags, "fail") || slices.Contains(flags, "fail?") { + return true + } + } + } + } + return false +} + +// BestReplicaOf returns the replica with the highest replication offset +// that references the given primary ID, or nil if none found. +func (s *ClusterState) BestReplicaOf(primaryId string) *NodeState { + var best *NodeState + var bestOffset int64 + for _, shard := range s.Shards { + for _, node := range shard.Nodes { + if node.PrimaryIdFromSelf() != primaryId { + continue + } + offset, _ := strconv.ParseInt(node.Info["slave_repl_offset"], 10, 64) + if best == nil || offset > bestOffset { + best = node + bestOffset = offset + } + } + } + return best +} + // PrimaryIdFromSelf returns the primary node ID that this node reports as its // own primary in CLUSTER NODES (fields[3] of the "myself" line). Returns "-" // for primaries and the primary's node ID for replicas. diff --git a/internal/valkey/clusterstate_test.go b/internal/valkey/clusterstate_test.go index 1715b80e..59931392 100644 --- a/internal/valkey/clusterstate_test.go +++ b/internal/valkey/clusterstate_test.go @@ -311,3 +311,152 @@ func TestCurrentEpoch(t *testing.T) { } }) } + +func TestClusterState_HasFailoverQuorum(t *testing.T) { + t.Run("no shards", func(t *testing.T) { + state := &ClusterState{Shards: []*ShardState{}} + if state.HasFailoverQuorum() { + t.Error("expected false with no shards") + } + }) + + t.Run("majority alive (2 of 3)", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + {PrimaryId: "p2", Slots: []SlotsRange{{5462, 10922}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + }, + } + if !state.HasFailoverQuorum() { + t.Error("expected true: 2 > 3/2") + } + }) + + t.Run("no majority (1 of 3)", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + }, + } + if state.HasFailoverQuorum() { + t.Error("expected false: 1 <= 3/2") + } + }) + + t.Run("all alive (3 of 3)", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + {PrimaryId: "p2", Slots: []SlotsRange{{5462, 10922}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + {PrimaryId: "p3", Slots: []SlotsRange{{10923, 16383}}, Nodes: []*NodeState{{Id: "p3", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + }, + } + if !state.HasFailoverQuorum() { + t.Error("expected true: 3 > 3/2") + } + }) + + t.Run("half alive (2 of 4)", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "p1", Slots: []SlotsRange{{0, 4095}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "4"}}}}, + {PrimaryId: "p2", Slots: []SlotsRange{{4096, 8191}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "4"}}}}, + }, + } + if state.HasFailoverQuorum() { + t.Error("expected false: 2 <= 4/2") + } + }) + + t.Run("only replicas alive (0 primaries of 3)", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "", Nodes: []*NodeState{{Id: "r1", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + {PrimaryId: "", Nodes: []*NodeState{{Id: "r2", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + {PrimaryId: "", Nodes: []*NodeState{{Id: "r3", ClusterInfo: map[string]string{"cluster_size": "3"}}}}, + }, + } + if state.HasFailoverQuorum() { + t.Error("expected false: 0 live primaries") + } + }) + + t.Run("returns false when cluster_size missing", func(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + {PrimaryId: "p1", Slots: []SlotsRange{{0, 8191}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{}}}}, + {PrimaryId: "p2", Slots: []SlotsRange{{8192, 16383}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{}}}}, + }, + } + // Cannot determine cluster size but assume no quorum + if state.HasFailoverQuorum() { + t.Error("expected false when cluster_size unavailable") + } + }) +} + +func TestClusterState_IsNodeFailed(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + { + Nodes: []*NodeState{ + {ClusterNodes: "abc123 10.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5461\ndead1 10.0.0.99:6379@16379 master,fail - 0 0 2 connected 5462-10922\npfail1 10.0.0.98:6379@16379 master,fail? - 0 0 3 connected 10923-16383\n"}, + }, + }, + }, + } + + t.Run("fail", func(t *testing.T) { + if !state.IsNodeFailed("dead1") { + t.Error("expected true") + } + }) + + t.Run("pfail", func(t *testing.T) { + if !state.IsNodeFailed("pfail1") { + t.Error("expected true for pfail") + } + }) + + t.Run("not failed", func(t *testing.T) { + if state.IsNodeFailed("abc123") { + t.Error("expected false") + } + }) + + t.Run("unknown", func(t *testing.T) { + if state.IsNodeFailed("unknown") { + t.Error("expected false") + } + }) +} + +func TestClusterState_BestReplicaOf(t *testing.T) { + state := &ClusterState{ + Shards: []*ShardState{ + { + PrimaryId: "primary1", + Nodes: []*NodeState{ + {Id: "primary1", Flags: []string{"master"}, ClusterNodes: "primary1 10.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5461\n"}, + {Id: "r1", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "100"}, ClusterNodes: "r1 10.0.0.2:6379@16379 myself,slave primary1 0 0 1 connected\n"}, + {Id: "r2", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "500"}, ClusterNodes: "r2 10.0.0.3:6379@16379 myself,slave primary1 0 0 1 connected\n"}, + {Id: "r3", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "200"}, ClusterNodes: "r3 10.0.0.4:6379@16379 myself,slave primary1 0 0 1 connected\n"}, + }, + }, + }, + } + + t.Run("picks highest offset", func(t *testing.T) { + best := state.BestReplicaOf("primary1") + if best == nil || best.Id != "r2" { + t.Errorf("expected r2 (offset 500), got %v", best) + } + }) + + t.Run("not found", func(t *testing.T) { + best := state.BestReplicaOf("unknown") + if best != nil { + t.Errorf("expected nil, got %v", best.Id) + } + }) +}