Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/v1alpha1/valkeynode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ type ValkeyNodeStatus struct {
// +optional
Ready bool `json:"ready,omitempty"`

// Running is true when the Valkey container is in the Running state.
// This precedes readiness (cluster_state:ok) and allows ValkeyCluster to
// proceed with cluster initialization during bootstrap without waiting for
// readiness probes to pass.
// +optional
Running bool `json:"running,omitempty"`

// PodName is the name of the pod created by the workload.
// +optional
PodName string `json:"podName,omitempty"`
Expand Down Expand Up @@ -136,6 +143,7 @@ const (
// ValkeyNode is the Schema for the valkeynodes API.
// ValkeyNode is an internal CRD. Users should not create ValkeyNodes directly.
// +kubebuilder:printcolumn:name="Ready",type="boolean",JSONPath=".status.ready",description="Whether the node is ready"
// +kubebuilder:printcolumn:name="Running",type="boolean",JSONPath=".status.running",description="Whether the Valkey container is running (precedes readiness)",priority=1
// +kubebuilder:printcolumn:name="Role",type="string",JSONPath=".status.role",description="Valkey replication role"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".status.podName",description="Pod name"
// +kubebuilder:printcolumn:name="IP",type="string",JSONPath=".status.podIP",description="Pod IP",priority=1
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/valkey.io_valkeynodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ spec:
jsonPath: .status.ready
name: Ready
type: boolean
- description: Whether the Valkey container is running (precedes readiness)
jsonPath: .status.running
name: Running
priority: 1
type: boolean
- description: Valkey replication role
jsonPath: .status.role
name: Role
Expand Down Expand Up @@ -2804,6 +2809,13 @@ spec:
description: Role is the Valkey replication role of this node (primary
or replica).
type: string
running:
description: |-
Running is true when the Valkey container is in the Running state.
This precedes readiness (cluster_state:ok) and allows ValkeyCluster to
proceed with cluster initialization during bootstrap without waiting for
readiness probes to pass.
type: boolean
type: object
required:
- metadata
Expand Down
22 changes: 9 additions & 13 deletions internal/controller/scripts/readiness-check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@ if [ "$response" != "PONG" ]; then
exit 1
fi

# valkey_status_file=/tmp/.valkey_cluster_check
# if [ ! -f "$valkey_status_file" ]; then
# response=$(
# timeout $timeout \
# valkey-cli -h localhost -p $port CLUSTER INFO | grep cluster_state | tr -d '[:space:]')

# if [ "$response" != "cluster_state:ok" ]; then
# echo "$response" >&2
# exit 1
# else
# touch "$valkey_status_file"
# fi
# fi
# In standalone mode CLUSTER INFO returns "ERR This instance has cluster support
# disabled" (no cluster_state line). Skip the check in that case.
# When non-Cluster mode is implemented, this check will be revisited
cluster_info=$(timeout $timeout valkey-cli -h localhost -p $port CLUSTER INFO)
cluster_state=$(echo "$cluster_info" | grep '^cluster_state:' | tr -d '[:space:]')
if [ -n "$cluster_state" ] && [ "$cluster_state" != "cluster_state:ok" ]; then
echo "$cluster_state" >&2
exit 1
fi
2 changes: 1 addition & 1 deletion internal/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func conditionsChanged(old, new []metav1.Condition) bool {
// nodeStatusChanged compares two ValkeyNodeStatus values and returns true if
// they differ (ignoring LastTransitionTime on conditions).
func nodeStatusChanged(old, new valkeyiov1alpha1.ValkeyNodeStatus) bool {
if old.Ready != new.Ready || old.PodName != new.PodName || old.PodIP != new.PodIP || old.Role != new.Role || old.ObservedGeneration != new.ObservedGeneration {
if old.Ready != new.Ready || old.Running != new.Running || old.PodName != new.PodName || old.PodIP != new.PodIP || old.Role != new.Role || old.ObservedGeneration != new.ObservedGeneration {
return true
}
return conditionsChanged(old.Conditions, new.Conditions)
Expand Down
7 changes: 7 additions & 0 deletions internal/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,11 @@ func TestNodeStatusChanged(t *testing.T) {
new.Conditions[0].LastTransitionTime = metav1.Now()
g.Expect(nodeStatusChanged(old, new)).To(BeFalse())
})

t.Run("returns true when Running differs", func(t *testing.T) {
old := base()
new := base()
new.Running = true
g.Expect(nodeStatusChanged(old, new)).To(BeTrue())
})
}
5 changes: 5 additions & 0 deletions internal/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ const (
RoleSlave = "slave"
)

// Container name constants.
const (
ValkeyContainerName = "server"
)

// baseLabels returns the standard Kubernetes recommended labels for a Valkey
// resource with the given instance name and component type.
func baseLabels(name, component string) map[string]string {
Expand Down
68 changes: 46 additions & 22 deletions internal/controller/valkeycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,30 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

nodeTransitioning := false
nodes := &valkeyiov1alpha1.ValkeyNodeList{}
if err := r.List(ctx, nodes, client.InNamespace(cluster.Namespace), client.MatchingLabels(map[string]string{LabelCluster: cluster.Name})); err != nil {
log.Error(err, "failed to list ValkeyNodes")
setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonValkeyNodeListError, err.Error(), metav1.ConditionFalse)
_ = r.updateStatus(ctx, cluster, nil)
return ctrl.Result{}, err
}
operatorPassword, err := fetchSystemUserPassword(ctx, operatorUser, r.Client, cluster.Name, cluster.Namespace)
if err != nil {
log.Error(err, "failed to retrieve system user password")
setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonSystemUsersAclError, err.Error(), metav1.ConditionFalse)
_ = r.updateStatus(ctx, cluster, nil)
return ctrl.Result{}, nil
}

if requeue, err := r.reconcileValkeyNodes(ctx, cluster, nodes); err != nil {
if requeue, err := r.reconcileValkeyNodes(ctx, cluster, nodes, operatorUser, operatorPassword); err != nil {
setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonValkeyNodeError, err.Error(), metav1.ConditionFalse)
_ = r.updateStatus(ctx, cluster, nil)
return ctrl.Result{}, err
} else if requeue {
nodeTransitioning = true
setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionFalse)
setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionTrue)
_ = r.updateStatus(ctx, cluster, nil)
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}
operatorPassword, err := fetchSystemUserPassword(ctx, operatorUser, r.Client, cluster.Name, cluster.Namespace)
if err != nil {
log.Error(err, "failed to retrieve system user password")
setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonSystemUsersAclError, err.Error(), metav1.ConditionFalse)
_ = r.updateStatus(ctx, cluster, nil)
return ctrl.Result{}, nil
}
state := r.getValkeyClusterState(ctx, cluster, nodes, operatorUser, operatorPassword)
defer state.CloseClients()
Expand Down Expand Up @@ -228,6 +228,17 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

// While a node is transitioning (Running but not yet Ready), skip
// downstream health checks, scale-in, and rebalancing. The cluster
// topology is transient and those checks would either make incorrect
// decisions or prematurely mark the cluster as healthy with a 30s
// requeue, delaying recovery. Requeue quickly so the phases above
// can continue integrating the node on the next pass.
if nodeTransitioning {
_ = r.updateStatus(ctx, cluster, state)
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}

// Build the effective shard list: state.Shards plus any pending primaries
// that are scale-out leaders. During scale-out, GetClusterState places
// new slot-less primaries in PendingNodes because it can't distinguish
Expand Down Expand Up @@ -384,7 +395,7 @@ func (r *ValkeyClusterReconciler) upsertService(ctx context.Context, cluster *va
// mycluster-0-0, mycluster-0-1, mycluster-0-2,
// mycluster-1-0, mycluster-1-1, mycluster-1-2,
// mycluster-2-0, mycluster-2-1, mycluster-2-2.
func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, nodes *valkeyiov1alpha1.ValkeyNodeList) (bool, error) {
func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, nodes *valkeyiov1alpha1.ValkeyNodeList, username, password string) (bool, error) {
log := logf.FromContext(ctx)

nodesPerShard := 1 + int(cluster.Spec.Replicas)
Expand All @@ -398,11 +409,7 @@ func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, clus
// re-scraping fresh state.
var clusterState *valkey.ClusterState
if anyNodeRequiresRoll(cluster, nodes) {
operatorPassword, err := fetchSystemUserPassword(ctx, operatorUser, r.Client, cluster.Name, cluster.Namespace)
if err != nil {
return false, fmt.Errorf("failed to fetch operator password for proactive failover: %w", err)
}
clusterState = r.getValkeyClusterState(ctx, cluster, nodes, operatorUser, operatorPassword)
clusterState = r.getValkeyClusterState(ctx, cluster, nodes, username, password)
defer clusterState.CloseClients()
}

Expand All @@ -428,6 +435,23 @@ func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, clus
return false, nil
}

// shouldWaitForNode returns true when the reconciler should pause and requeue
// rather than advancing to the next ValkeyNode. The gate is lifecycle-aware:
//
// - Bootstrap (clusterFormed=false): wait until the pod's Valkey container is
// Running. Waiting for Ready here deadlocks: cluster_state:ok (readiness)
// requires MEET+slots, but MEET+slots require this gate to pass first.
//
// - Rolling update (clusterFormed=true): wait until the pod is Ready
// (cluster_state:ok confirmed), ensuring the cluster converges before the
// next pod is rolled.
func shouldWaitForNode(clusterFormed bool, status valkeyiov1alpha1.ValkeyNodeStatus) bool {
if clusterFormed {
return !status.Ready
}
return !status.Running
}

// reconcileValkeyNode reconciles a single ValkeyNode for (shardIndex, nodeIndex).
// Returns (requeue, nodeCreated, err). requeue signals the caller should stop
// iterating and wait before processing the next node.
Expand Down Expand Up @@ -490,12 +514,12 @@ func (r *ValkeyClusterReconciler) reconcileValkeyNode(ctx context.Context, clust
"observedGeneration", node.Status.ObservedGeneration)
return true, false, nil
}
if !node.Status.Ready {
// No spec change, but the node hasn't reached Ready yet (e.g.
// still starting after a prior update). Unlike Updated above, we
// only wait when not-ready; a ready unchanged node is safe to
// advance past.
log.V(1).Info("ValkeyNode not yet ready, waiting", "name", node.Name)
clusterFormed := meta.IsStatusConditionTrue(cluster.Status.Conditions, valkeyiov1alpha1.ConditionClusterFormed)
if shouldWaitForNode(clusterFormed, node.Status) {
log.V(1).Info("waiting for node", "name", node.Name,
"clusterFormed", clusterFormed,
"running", node.Status.Running,
"ready", node.Status.Ready)
return true, false, nil
}
default:
Expand Down
47 changes: 46 additions & 1 deletion internal/controller/valkeycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ var _ = Describe("reconcileValkeyNodes", func() {
GinkgoHelper()
node := &valkeyiov1alpha1.ValkeyNode{}
Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed())
node.Status.Running = true
node.Status.Ready = true
node.Status.ObservedGeneration = node.Generation
Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed())
Expand All @@ -504,6 +505,7 @@ var _ = Describe("reconcileValkeyNodes", func() {
GinkgoHelper()
node := &valkeyiov1alpha1.ValkeyNode{}
Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed())
node.Status.Running = false
node.Status.Ready = false
// Deliberately does not update ObservedGeneration: simulates the pod
// becoming not-ready while ObservedGeneration may already be stale (from
Expand Down Expand Up @@ -533,7 +535,7 @@ var _ = Describe("reconcileValkeyNodes", func() {
Expect(k8sClient.List(testCtx, nodeList,
client.InNamespace("default"),
client.MatchingLabels{LabelCluster: clusterName})).To(Succeed())
return r.reconcileValkeyNodes(testCtx, cluster, nodeList)
return r.reconcileValkeyNodes(testCtx, cluster, nodeList, "", "")
}

// createAllNodes runs a single reconcile that creates all 4 ValkeyNode CRs.
Expand Down Expand Up @@ -727,6 +729,7 @@ var _ = Describe("reconcileValkeyNode", func() {
nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex)
node := &valkeyiov1alpha1.ValkeyNode{}
Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed())
node.Status.Running = ready
node.Status.Ready = ready
node.Status.ObservedGeneration = node.Generation
Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed())
Expand Down Expand Up @@ -793,6 +796,7 @@ var _ = Describe("reconcileValkeyNode", func() {
nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex)
node := &valkeyiov1alpha1.ValkeyNode{}
Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed())
node.Status.Running = true
node.Status.Ready = true
// deliberately NOT setting ObservedGeneration
Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed())
Expand Down Expand Up @@ -827,4 +831,45 @@ var _ = Describe("reconcileValkeyNode", func() {
Expect(requeue).To(BeTrue(), "should requeue while ObservedGeneration is stale")
Expect(created).To(BeFalse())
})

setClusterFormed := func() {
GinkgoHelper()
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: valkeyiov1alpha1.ConditionClusterFormed,
Status: metav1.ConditionTrue,
Reason: "Formed",
})
Expect(k8sClient.Status().Update(testCtx, cluster)).To(Succeed())
}

It("during rolling update (clusterFormed), waits when Running but not Ready", func() {
// Create the node, then mark the cluster as already formed.
_, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex, nil)
Expect(err).NotTo(HaveOccurred())
setClusterFormed()

// Simulate the replacement pod: container is up but readiness probe hasn't passed yet.
nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex)
node := &valkeyiov1alpha1.ValkeyNode{}
Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed())
node.Status.Running = true
node.Status.Ready = false
node.Status.ObservedGeneration = node.Generation
Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed())

requeue, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex, nil)
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeTrue(), "should wait for Ready when cluster is already formed")
})

It("during rolling update (clusterFormed), proceeds when Ready", func() {
_, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex, nil)
Expect(err).NotTo(HaveOccurred())
setClusterFormed()
setNodeReady(true)

requeue, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex, nil)
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeFalse(), "should proceed when Ready and cluster is formed")
})
})
24 changes: 22 additions & 2 deletions internal/controller/valkeynode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,22 @@ func (r *ValkeyNodeReconciler) ensureConfigMap(ctx context.Context, node *valkey
return nil
}

// podRunning returns true if the "server" container in the pod is in the Running state.
// This is set as soon as the container process starts, before readiness probes
// fire, and is used as the bootstrap gate so the ValkeyCluster controller can
// proceed with cluster initialization before cluster_state:ok is achievable.
func podRunning(pod *corev1.Pod) bool {
if pod == nil {
return false
}
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == ValkeyContainerName {
return cs.State.Running != nil
}
}
return false
}

// updateStatus updates the ValkeyNode status based on workload and Pod state.
func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) error {
log := logf.FromContext(ctx)
Expand Down Expand Up @@ -288,6 +304,9 @@ func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov
}
}

// Set Running based on whether the Valkey server container is alive, regardless of readiness probe state.
current.Status.Running = podRunning(pod)

if nodeStatusChanged(*previous, current.Status) {
if err := r.Status().Update(ctx, current); err != nil {
log.Error(err, "failed to update ValkeyNode status")
Expand All @@ -298,9 +317,10 @@ func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov
log.V(2).Info("status unchanged, skipping update")
}

// Sync Ready back to the caller's object so the requeue check in Reconcile
// reflects the status we just wrote.
// Sync status fields back to the caller's object so the requeue check in
// Reconcile reflects the values we just wrote, without a fresh Get.
node.Status.Ready = current.Status.Ready
node.Status.Running = current.Status.Running

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/valkeynode_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func buildContainersDef(node *valkeyiov1alpha1.ValkeyNode) ([]corev1.Container,

containers := []corev1.Container{
{
Name: "server",
Name: ValkeyContainerName,
Image: image,
Resources: node.Spec.Resources,
Command: []string{
Expand Down
Loading
Loading