Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/status-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ These events are emitted during proactive primary failovers (e.g., when a primar
| `FailoverTimeout` | Warning | The failover did not complete within the allowed timeout |
| `FailoverCompleted` | Normal | The replica has successfully become the new primary |

### Recovery events

These events are emitted when the operator intervenes to recover the cluster.

| Event Type | Type | Description |
|---|---|---|
| `ReplicasTakenOver` | Normal | Orphaned replicas promoted via `CLUSTER FAILOVER TAKEOVER` after quorum loss |

### Maintenance events

These events are emitted during cluster maintenance operations.
Expand Down
106 changes: 87 additions & 19 deletions internal/controller/valkeycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,31 @@ type ValkeyClusterReconciler struct {
// cluster one step closer to the desired state described by the ValkeyCluster
// spec. The pipeline runs in the following order:
//
// 1. Ensure the headless Service exists (upsertService).
// 2. Ensure the ConfigMap with valkey.conf and health-check scripts exists
// - Ensure the headless Service exists (upsertService).
// - Ensure PodDisruptionBudget exists (reconcilePodDisruptionBudget).
// - Ensure internal ACL users are configured (reconcileUsersAcl).
// - Ensure the ConfigMap with valkey.conf and health-check scripts exists
// (upsertConfigMap).
// 3. Ensure one ValkeyNode per (shard, node) pair exists, creating missing
// - Ensure one ValkeyNode per (shard, node) pair exists, creating missing
// nodes and propagating spec changes one at a time in shard order with
// replicas updated before the primary (reconcileValkeyNodes).
// 4. List all ValkeyNodes and build the Valkey cluster state by connecting to each
// node and scraping CLUSTER INFO / CLUSTER NODES.
// 5. Forget stale nodes that no longer have a backing ValkeyNode.
// 6. Phase 1 – MEET: batch-introduce all isolated pending nodes to the
// cluster via CLUSTER MEET. Requeue to let gossip propagate.
// 7. Phase 2 – Assign slots: batch-assign hash-slot ranges to all
// primary-labeled pending nodes via CLUSTER ADDSLOTSRANGE.
// 8. Phase 3 – Replicate: batch-attach all replica-labeled pending nodes
// to their matching primaries via CLUSTER REPLICATE.
// 9. Scale-in: if the cluster has more shards than desired, drain slots
// - Build the Valkey cluster state by connecting to each node and scraping
// CLUSTER INFO / CLUSTER NODES.
// - Promote orphaned replicas via CLUSTER FAILOVER TAKEOVER when quorum
// is lost (promoteOrphanedReplicas).
// - Forget stale nodes that no longer have a backing ValkeyNode.
// - MEET: batch-introduce all isolated pending nodes to the cluster via
// CLUSTER MEET. Requeue to let gossip propagate.
// - Assign slots: batch-assign hash-slot ranges to all primary-labeled
// pending nodes via CLUSTER ADDSLOTSRANGE.
// - Replicate: batch-attach all replica-labeled pending nodes to their
// matching primaries via CLUSTER REPLICATE.
// - Scale-in: if the cluster has more shards than desired, drain slots
// from excess shards via CLUSTER MIGRATESLOTS and delete their
// ValkeyNodes once fully drained.
// 10. Verify that the expected number of shards and replicas exist.
// 11. Verify that all 16384 hash slots are assigned.
// 12. If everything is healthy, mark the cluster Ready and requeue after 30s
// - Verify that the expected number of shards and replicas exist.
// - Verify that all 16384 hash slots are assigned.
// - If everything is healthy, mark the cluster Ready and requeue after 30s
// for periodic health checks.
//
// For more details, check Reconcile and its Result here:
Expand Down Expand Up @@ -188,6 +192,12 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
state := r.getValkeyClusterState(ctx, cluster, nodes, operatorUser, operatorPassword)
defer state.CloseClients()

// Promote replicas of dead primaries when quorum is lost.
// TAKEOVER before FORGET so slots remain continuously owned.
if result, handled := r.promoteOrphanedReplicas(ctx, cluster, state); handled {
return result, nil
}

r.forgetStaleNodes(ctx, cluster, state, nodes)

// --- Phase 1: MEET all isolated nodes in one batch ---
Expand Down Expand Up @@ -973,6 +983,59 @@ func (r *ValkeyClusterReconciler) replicateToShardPrimary(ctx context.Context, c
return nil
}

// promoteOrphanedReplicas issues CLUSTER FAILOVER TAKEOVER to replicas whose
// primary is dead and failover quorum is unreachable. With persistence enabled
// the pod will return with the same node ID, so TAKEOVER is skipped.
// Returns (result, true) if a promotion was made and reconcile should requeue.
func (r *ValkeyClusterReconciler) promoteOrphanedReplicas(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState) (ctrl.Result, bool) {
if cluster.Spec.Persistence != nil || state.HasFailoverQuorum() {
return ctrl.Result{}, false
}
log := logf.FromContext(ctx)
var promoted, attempted int
deadPrimaries := make(map[string]bool)
for _, shard := range state.Shards {
for _, node := range shard.Nodes {
primaryId := node.PrimaryIdFromSelf()
if primaryId == "-" || primaryId == "" {
continue // node is a primary
}
if deadPrimaries[primaryId] {
continue
}
if !state.IsNodeFailed(primaryId) {
continue
}
// Note: we don't check if the pod still exists in k8s here.
// Without persistence, even if the primary is merely partitioned
// (not gone), the TAKEOVER'd replica will own the slots at a
// higher epoch. A returning primary will see the higher epoch
// and demote itself; no split-brain.
deadPrimaries[primaryId] = true
}
}
for primaryId := range deadPrimaries {
replica := state.BestReplicaOf(primaryId)
if replica == nil {
continue
}
attempted++
log.Info("promoting orphaned replica via TAKEOVER", "node", replica.Address, "deadPrimary", primaryId)
if err := replica.Client.Do(ctx, replica.Client.B().ClusterFailover().Takeover().Build()).Error(); err != nil {
log.Error(err, "CLUSTER FAILOVER TAKEOVER failed", "node", replica.Address)
} else {
promoted++
}
}
if promoted > 0 || attempted > 0 {
if promoted > 0 {
r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, "ReplicasTakenOver", "FailoverTakeover", "Promoted %d orphaned replica(s) via TAKEOVER", promoted)
}
return ctrl.Result{RequeueAfter: 2 * time.Second}, true
}
return ctrl.Result{}, false
}

// Check each cluster node and forget stale nodes (noaddr or status fail)
func (r *ValkeyClusterReconciler) forgetStaleNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState, nodes *valkeyiov1alpha1.ValkeyNodeList) {
log := logf.FromContext(ctx)
Expand All @@ -991,11 +1054,16 @@ func (r *ValkeyClusterReconciler) forgetStaleNodes(ctx context.Context, cluster
// voting in the auto-failover election, permanently
// blocking the replica's promotion.
if state.HasReplicaOf(failing.Id) {
log.V(1).Info("skipping forget; failover pending for node",
if cluster.Spec.Persistence != nil || state.HasFailoverQuorum() {
log.V(1).Info("skipping forget; failover pending for node",
"address", failing.Address, "Id", failing.Id)
continue
}
log.Info("forget node despite pending replica; quorum unreachable",
"address", failing.Address, "Id", failing.Id)
continue
} else {
log.V(1).Info("forget a failing node", "address", failing.Address, "Id", failing.Id)
}
log.V(1).Info("forget a failing node", "address", failing.Address, "Id", failing.Id)
if err := node.Client.Do(ctx, node.Client.B().ClusterForget().NodeId(failing.Id).Build()).Error(); err != nil {
log.Error(err, "command failed: CLUSTER FORGET")
r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "NodeForgetFailed", "ForgetNode", "Failed to forget node: %v", err)
Expand Down
68 changes: 68 additions & 0 deletions internal/valkey/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,74 @@ func (s *ClusterState) HasReplicaOf(nodeId string) bool {
return false
}

// HasFailoverQuorum returns true if a majority of slot-owning primaries are
// reachable. Valkey requires a majority of primaries to vote in a failover
// election; if quorum is unreachable, no automatic failover can succeed.
// Uses cluster_size from CLUSTER INFO (total primaries with slots, including
// failed ones) as the denominator.
func (s *ClusterState) HasFailoverQuorum() bool {
if len(s.Shards) == 0 {
return false
}
var livePrimaries, clusterSize int
for _, shard := range s.Shards {
if shard.GetPrimaryNode() != nil && len(shard.Slots) > 0 {
livePrimaries++
}
for _, node := range shard.Nodes {
// Take the max across nodes in case gossip hasn't fully propagated.
if size, err := strconv.Atoi(node.ClusterInfo["cluster_size"]); err == nil && size > clusterSize {
Comment thread
jdheyburn marked this conversation as resolved.
clusterSize = size
}
}
}
if clusterSize == 0 {
return false
}
return livePrimaries > (clusterSize / 2)
}

// IsNodeFailed returns true if any live node reports the given node ID as
// "fail" or "fail?" in CLUSTER NODES. Includes "fail?" (pfail) because when
// majority of primaries are down, pfail can never promote to fail.
func (s *ClusterState) IsNodeFailed(nodeId string) bool {
for _, shard := range s.Shards {
for _, node := range shard.Nodes {
for line := range strings.SplitSeq(node.ClusterNodes, "\n") {
fields := strings.Fields(line)
if len(fields) < 8 || fields[0] != nodeId {
continue
}
flags := strings.Split(fields[2], ",")
if slices.Contains(flags, "fail") || slices.Contains(flags, "fail?") {
return true
}
}
}
}
return false
}

// BestReplicaOf returns the replica with the highest replication offset
// that references the given primary ID, or nil if none found.
func (s *ClusterState) BestReplicaOf(primaryId string) *NodeState {
var best *NodeState
var bestOffset int64
for _, shard := range s.Shards {
for _, node := range shard.Nodes {
if node.PrimaryIdFromSelf() != primaryId {
continue
}
offset, _ := strconv.ParseInt(node.Info["slave_repl_offset"], 10, 64)
if best == nil || offset > bestOffset {
best = node
bestOffset = offset
}
}
}
return best
}
Comment thread
jdheyburn marked this conversation as resolved.

// PrimaryIdFromSelf returns the primary node ID that this node reports as its
// own primary in CLUSTER NODES (fields[3] of the "myself" line). Returns "-"
// for primaries and the primary's node ID for replicas.
Expand Down
149 changes: 149 additions & 0 deletions internal/valkey/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,152 @@ func TestCurrentEpoch(t *testing.T) {
}
})
}

func TestClusterState_HasFailoverQuorum(t *testing.T) {
t.Run("no shards", func(t *testing.T) {
state := &ClusterState{Shards: []*ShardState{}}
if state.HasFailoverQuorum() {
t.Error("expected false with no shards")
}
})

t.Run("majority alive (2 of 3)", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
{PrimaryId: "p2", Slots: []SlotsRange{{5462, 10922}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
},
}
if !state.HasFailoverQuorum() {
t.Error("expected true: 2 > 3/2")
}
})

t.Run("no majority (1 of 3)", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
},
}
if state.HasFailoverQuorum() {
t.Error("expected false: 1 <= 3/2")
}
})

t.Run("all alive (3 of 3)", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "p1", Slots: []SlotsRange{{0, 5461}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
{PrimaryId: "p2", Slots: []SlotsRange{{5462, 10922}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
{PrimaryId: "p3", Slots: []SlotsRange{{10923, 16383}}, Nodes: []*NodeState{{Id: "p3", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
},
}
if !state.HasFailoverQuorum() {
t.Error("expected true: 3 > 3/2")
}
})

t.Run("half alive (2 of 4)", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "p1", Slots: []SlotsRange{{0, 4095}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{"cluster_size": "4"}}}},
{PrimaryId: "p2", Slots: []SlotsRange{{4096, 8191}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{"cluster_size": "4"}}}},
},
}
if state.HasFailoverQuorum() {
t.Error("expected false: 2 <= 4/2")
}
})

t.Run("only replicas alive (0 primaries of 3)", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "", Nodes: []*NodeState{{Id: "r1", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
{PrimaryId: "", Nodes: []*NodeState{{Id: "r2", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
{PrimaryId: "", Nodes: []*NodeState{{Id: "r3", ClusterInfo: map[string]string{"cluster_size": "3"}}}},
},
}
if state.HasFailoverQuorum() {
t.Error("expected false: 0 live primaries")
}
})

t.Run("returns false when cluster_size missing", func(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{PrimaryId: "p1", Slots: []SlotsRange{{0, 8191}}, Nodes: []*NodeState{{Id: "p1", ClusterInfo: map[string]string{}}}},
{PrimaryId: "p2", Slots: []SlotsRange{{8192, 16383}}, Nodes: []*NodeState{{Id: "p2", ClusterInfo: map[string]string{}}}},
},
}
// Cannot determine cluster size but assume no quorum
if state.HasFailoverQuorum() {
t.Error("expected false when cluster_size unavailable")
}
})
}

func TestClusterState_IsNodeFailed(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{
Nodes: []*NodeState{
{ClusterNodes: "abc123 10.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5461\ndead1 10.0.0.99:6379@16379 master,fail - 0 0 2 connected 5462-10922\npfail1 10.0.0.98:6379@16379 master,fail? - 0 0 3 connected 10923-16383\n"},
},
},
},
}

t.Run("fail", func(t *testing.T) {
if !state.IsNodeFailed("dead1") {
t.Error("expected true")
}
})

t.Run("pfail", func(t *testing.T) {
if !state.IsNodeFailed("pfail1") {
t.Error("expected true for pfail")
}
})

t.Run("not failed", func(t *testing.T) {
if state.IsNodeFailed("abc123") {
t.Error("expected false")
}
})

t.Run("unknown", func(t *testing.T) {
if state.IsNodeFailed("unknown") {
t.Error("expected false")
}
})
}

func TestClusterState_BestReplicaOf(t *testing.T) {
state := &ClusterState{
Shards: []*ShardState{
{
PrimaryId: "primary1",
Nodes: []*NodeState{
{Id: "primary1", Flags: []string{"master"}, ClusterNodes: "primary1 10.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5461\n"},
{Id: "r1", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "100"}, ClusterNodes: "r1 10.0.0.2:6379@16379 myself,slave primary1 0 0 1 connected\n"},
{Id: "r2", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "500"}, ClusterNodes: "r2 10.0.0.3:6379@16379 myself,slave primary1 0 0 1 connected\n"},
{Id: "r3", Flags: []string{"slave"}, Info: map[string]string{"slave_repl_offset": "200"}, ClusterNodes: "r3 10.0.0.4:6379@16379 myself,slave primary1 0 0 1 connected\n"},
},
},
},
}

t.Run("picks highest offset", func(t *testing.T) {
best := state.BestReplicaOf("primary1")
if best == nil || best.Id != "r2" {
t.Errorf("expected r2 (offset 500), got %v", best)
}
})

t.Run("not found", func(t *testing.T) {
best := state.BestReplicaOf("unknown")
if best != nil {
t.Errorf("expected nil, got %v", best.Id)
}
})
}