diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index 06d0660c..6538642f 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -143,6 +143,54 @@ type ValkeyClusterSpec struct { // +kubebuilder:default=Managed // +optional PodDisruptionBudget PDBPolicy `json:"podDisruptionBudget,omitempty"` + + // ExternalAccess configures reachability of the cluster from outside Kubernetes. + // When omitted, the cluster is internal-only and behaves identically to a cluster + // without this field. Requires Valkey 9.0+. + // +optional + ExternalAccess *ExternalAccessSpec `json:"externalAccess,omitempty"` +} + +// ExternalAccessSpec defines how a ValkeyCluster is exposed to clients outside the +// Kubernetes cluster. Node-to-node traffic always stays on internal pod IPs; only +// the client-facing endpoint is affected. +type ExternalAccessSpec struct { + // Enabled turns on external access for the cluster. + // +optional + Enabled bool `json:"enabled,omitempty"` + + // ServiceType is the type of the per-shard Service. NodePort ports are allocated + // by Kubernetes; LoadBalancer frontend ports are derived from the node index. + // +kubebuilder:default=NodePort + // +kubebuilder:validation:Enum=NodePort;LoadBalancer + // +optional + ServiceType corev1.ServiceType `json:"serviceType,omitempty"` + + // ExternalTrafficPolicy sets the externalTrafficPolicy of the per-shard Service. + // Use Local to preserve the client source IP, which requires DNS to resolve a + // shard hostname to the nodes hosting that shard's pods. + // +kubebuilder:validation:Enum=Cluster;Local + // +optional + ExternalTrafficPolicy corev1.ServiceExternalTrafficPolicy `json:"externalTrafficPolicy,omitempty"` + + // ServiceAnnotations are applied to each per-shard Service, for example to + // configure external-dns or a cloud load-balancer controller. + // +optional + ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"` + + // HostnamePrefix is prepended to each shard hostname, which is announced as + // "-.". Set a unique prefix per cluster when + // several clusters share one domain. Has no effect unless domain is set. + // +kubebuilder:default=shard + // +kubebuilder:validation:Pattern=`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$` + // +optional + HostnamePrefix string `json:"hostnamePrefix,omitempty"` + + // Domain is the DNS domain under which shard hostnames are announced. When set, + // each node announces the hostname "-." to + // clients in addition to its IP. The hostname must resolve to the shard's Service. + // +optional + Domain string `json:"domain,omitempty"` } // TLSConfig defines the TLS configuration for ValkeyCluster. @@ -212,6 +260,25 @@ type ValkeyClusterStatus struct { // +listMapKey=type // +optional Conditions []metav1.Condition `json:"conditions,omitempty"` + + // ExternalEndpoints lists the externally-reachable endpoint of each shard, + // populated when external access is enabled. + // +listType=map + // +listMapKey=shardIndex + // +optional + ExternalEndpoints []ShardEndpoint `json:"externalEndpoints,omitempty"` +} + +// ShardEndpoint describes the externally-reachable endpoint of a single shard. +type ShardEndpoint struct { + // ShardIndex is the index of the shard this endpoint belongs to. + ShardIndex int32 `json:"shardIndex"` + + // NodePorts are the external ports of the shard's nodes, indexed by node index + // (NodePorts[0] is the node-index 0 port). The address to reach each port + // depends on the Service type and the user's DNS configuration. + // +optional + NodePorts []int32 `json:"nodePorts,omitempty"` } const ( diff --git a/api/v1alpha1/valkeynode_types.go b/api/v1alpha1/valkeynode_types.go index 8b2b1f30..cf10e64c 100644 --- a/api/v1alpha1/valkeynode_types.go +++ b/api/v1alpha1/valkeynode_types.go @@ -120,6 +120,11 @@ type ValkeyNodeSpec struct { // the rest take effect on the next pod roll. // +optional Config map[string]string `json:"config,omitempty"` + + // ExternalAccess is copied verbatim from the owning cluster's Spec.ExternalAccess + // and configures how this node is exposed to clients outside the Kubernetes cluster. + // +optional + ExternalAccess *ExternalAccessSpec `json:"externalAccess,omitempty"` } // ValkeyNodeStatus defines the observed state of ValkeyNode. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 0191815a..7dc27a30 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -102,6 +102,28 @@ func (in *ExporterSpec) DeepCopy() *ExporterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExternalAccessSpec) DeepCopyInto(out *ExternalAccessSpec) { + *out = *in + if in.ServiceAnnotations != nil { + in, out := &in.ServiceAnnotations, &out.ServiceAnnotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalAccessSpec. +func (in *ExternalAccessSpec) DeepCopy() *ExternalAccessSpec { + if in == nil { + return nil + } + out := new(ExternalAccessSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KeysAclSpec) DeepCopyInto(out *KeysAclSpec) { *out = *in @@ -173,6 +195,26 @@ func (in *PersistenceSpec) DeepCopy() *PersistenceSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ShardEndpoint) DeepCopyInto(out *ShardEndpoint) { + *out = *in + if in.NodePorts != nil { + in, out := &in.NodePorts, &out.NodePorts + *out = make([]int32, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardEndpoint. +func (in *ShardEndpoint) DeepCopy() *ShardEndpoint { + if in == nil { + return nil + } + out := new(ShardEndpoint) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLSConfig) DeepCopyInto(out *TLSConfig) { *out = *in @@ -334,6 +376,11 @@ func (in *ValkeyClusterSpec) DeepCopyInto(out *ValkeyClusterSpec) { *out = new(TLSConfig) **out = **in } + if in.ExternalAccess != nil { + in, out := &in.ExternalAccess, &out.ExternalAccess + *out = new(ExternalAccessSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyClusterSpec. @@ -356,6 +403,13 @@ func (in *ValkeyClusterStatus) DeepCopyInto(out *ValkeyClusterStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ExternalEndpoints != nil { + in, out := &in.ExternalEndpoints, &out.ExternalEndpoints + *out = make([]ShardEndpoint, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyClusterStatus. @@ -487,6 +541,11 @@ func (in *ValkeyNodeSpec) DeepCopyInto(out *ValkeyNodeSpec) { (*out)[key] = val } } + if in.ExternalAccess != nil { + in, out := &in.ExternalAccess, &out.ExternalAccess + *out = new(ExternalAccessSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyNodeSpec. diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index 8495f603..4ba39052 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -2571,6 +2571,55 @@ spec: type: object type: object type: object + externalAccess: + description: |- + ExternalAccess configures reachability of the cluster from outside Kubernetes. + When omitted, the cluster is internal-only and behaves identically to a cluster + without this field. Requires Valkey 9.0+. + properties: + domain: + description: |- + Domain is the DNS domain under which shard hostnames are announced. When set, + each node announces the hostname "-." to + clients in addition to its IP. The hostname must resolve to the shard's Service. + type: string + enabled: + description: Enabled turns on external access for the cluster. + type: boolean + externalTrafficPolicy: + description: |- + ExternalTrafficPolicy sets the externalTrafficPolicy of the per-shard Service. + Use Local to preserve the client source IP, which requires DNS to resolve a + shard hostname to the nodes hosting that shard's pods. + enum: + - Cluster + - Local + type: string + hostnamePrefix: + default: shard + description: |- + HostnamePrefix is prepended to each shard hostname, which is announced as + "-.". Set a unique prefix per cluster when + several clusters share one domain. Has no effect unless domain is set. + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$ + type: string + serviceAnnotations: + additionalProperties: + type: string + description: |- + ServiceAnnotations are applied to each per-shard Service, for example to + configure external-dns or a cloud load-balancer controller. + type: object + serviceType: + default: NodePort + description: |- + ServiceType is the type of the per-shard Service. NodePort ports are allocated + by Kubernetes; LoadBalancer frontend ports are derived from the node index. + enum: + - NodePort + - LoadBalancer + type: string + type: object image: description: Override the default Valkey image type: string @@ -3142,6 +3191,35 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + externalEndpoints: + description: |- + ExternalEndpoints lists the externally-reachable endpoint of each shard, + populated when external access is enabled. + items: + description: ShardEndpoint describes the externally-reachable endpoint + of a single shard. + properties: + nodePorts: + description: |- + NodePorts are the external ports of the shard's nodes, indexed by node index + (NodePorts[0] is the node-index 0 port). The address to reach each port + depends on the Service type and the user's DNS configuration. + items: + format: int32 + type: integer + type: array + shardIndex: + description: ShardIndex is the index of the shard this endpoint + belongs to. + format: int32 + type: integer + required: + - shardIndex + type: object + type: array + x-kubernetes-list-map-keys: + - shardIndex + x-kubernetes-list-type: map message: description: Message provides human-readable details about the current state. diff --git a/config/crd/bases/valkey.io_valkeynodes.yaml b/config/crd/bases/valkey.io_valkeynodes.yaml index d4e3ef8e..570a9d51 100644 --- a/config/crd/bases/valkey.io_valkeynodes.yaml +++ b/config/crd/bases/valkey.io_valkeynodes.yaml @@ -2580,6 +2580,54 @@ spec: type: object type: object type: object + externalAccess: + description: |- + ExternalAccess is copied verbatim from the owning cluster's Spec.ExternalAccess + and configures how this node is exposed to clients outside the Kubernetes cluster. + properties: + domain: + description: |- + Domain is the DNS domain under which shard hostnames are announced. When set, + each node announces the hostname "-." to + clients in addition to its IP. The hostname must resolve to the shard's Service. + type: string + enabled: + description: Enabled turns on external access for the cluster. + type: boolean + externalTrafficPolicy: + description: |- + ExternalTrafficPolicy sets the externalTrafficPolicy of the per-shard Service. + Use Local to preserve the client source IP, which requires DNS to resolve a + shard hostname to the nodes hosting that shard's pods. + enum: + - Cluster + - Local + type: string + hostnamePrefix: + default: shard + description: |- + HostnamePrefix is prepended to each shard hostname, which is announced as + "-.". Set a unique prefix per cluster when + several clusters share one domain. Has no effect unless domain is set. + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$ + type: string + serviceAnnotations: + additionalProperties: + type: string + description: |- + ServiceAnnotations are applied to each per-shard Service, for example to + configure external-dns or a cloud load-balancer controller. + type: object + serviceType: + default: NodePort + description: |- + ServiceType is the type of the per-shard Service. NodePort ports are allocated + by Kubernetes; LoadBalancer frontend ports are derived from the node index. + enum: + - NodePort + - LoadBalancer + type: string + type: object image: description: Image is the Valkey container image to use. type: string diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index a4a8d997..6646597b 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -2,4 +2,5 @@ resources: - v1alpha1_valkeycluster.yaml - v1alpha1_valkeycluster-persistent.yaml +- v1alpha1_valkeycluster-external-access.yaml # +kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/samples/v1alpha1_valkeycluster-external-access.yaml b/config/samples/v1alpha1_valkeycluster-external-access.yaml new file mode 100644 index 00000000..711ab4b1 --- /dev/null +++ b/config/samples/v1alpha1_valkeycluster-external-access.yaml @@ -0,0 +1,11 @@ +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: cluster-external-access-sample +spec: + shards: 3 + replicas: 1 + externalAccess: + enabled: true + serviceType: NodePort + domain: valkey.example.com diff --git a/docs/valkeycluster.md b/docs/valkeycluster.md index c884e5e9..dace3f6c 100644 --- a/docs/valkeycluster.md +++ b/docs/valkeycluster.md @@ -12,6 +12,7 @@ - [Config](#config) - [Containers](#containers) +- [External access](#external-access) - [Metrics](#metrics) - [Persistence](#persistence) - [Pod disruption budget](#pod-disruption-budget) @@ -63,6 +64,35 @@ containers: `containers` patches the pod's container list using strategic merge patch. Containers named `server` or `metrics-exporter` are merged by name; anything else is appended as a sidecar. +### External access + +```yaml +externalAccess: + enabled: true + serviceType: NodePort + domain: valkey.example.com +``` + +`externalAccess` configures reachability of the cluster from outside Kubernetes. When omitted, the cluster is internal-only and behaves identically to a cluster without this field. Requires Valkey 9.0+. + +Enabling external access announces a human-readable node name (the ValkeyNode name, e.g. `cluster-sample-1-2`) so cluster events such as failures reference it alongside the node ID. Node-to-node traffic (gossip and replication) always stays on internal pod IPs. + +The operator creates one Service per shard, selecting that shard's pods and exposing one port per node. Each port targets a single node, so a client can reach a specific primary or replica. + +| Field | Description | +|---|---| +| `serviceType` | `NodePort` (default) or `LoadBalancer`. | +| `externalTrafficPolicy` | `Cluster` (default) or `Local`. Use `Local` to preserve the client source IP. | +| `serviceAnnotations` | Applied to each per-shard Service, e.g. for external-dns or a cloud load-balancer controller. | +| `hostnamePrefix` | Prefix for shard hostnames (default `shard`). Set a unique prefix per cluster when several clusters share one domain. | +| `domain` | DNS domain for shard hostnames. When set, each node announces `-.` to clients. | + +With `NodePort`, Kubernetes allocates the external ports; the operator reads them back and reports them per shard under `status.externalEndpoints` (indexed by node). With `LoadBalancer`, each shard's node ports are `6379 + nodeIndex`. + +When `domain` is set, each shard announces the hostname `-.` (e.g. `shard-0.valkey.example.com`). You are responsible for the DNS records that resolve these hostnames to the shard Services. The hostname is announced only as metadata until clients are switched to it; the cluster bus continues to use pod IPs. + +When TLS is enabled, the certificate must cover both the internal Service FQDN (`valkey-..svc.cluster.local`) and every shard hostname (`-0.` … `-N.`). Widen the certificate before scaling out. + ### Metrics ```yaml diff --git a/internal/controller/shard_services_test.go b/internal/controller/shard_services_test.go new file mode 100644 index 00000000..e2921047 --- /dev/null +++ b/internal/controller/shard_services_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/events" + "sigs.k8s.io/controller-runtime/pkg/client" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" +) + +var _ = Describe("reconcileShardServices", func() { + ctx := context.Background() + + newReconciler := func() *ValkeyClusterReconciler { + return &ValkeyClusterReconciler{ + Client: k8sClient, + APIReader: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(100), + } + } + + listShardServices := func(clusterName string) []corev1.Service { + services := &corev1.ServiceList{} + Expect(k8sClient.List(ctx, services, client.InNamespace("default"), client.MatchingLabels{LabelCluster: clusterName})).To(Succeed()) + shardServices := make([]corev1.Service, 0, len(services.Items)) + for _, svc := range services.Items { + if _, ok := svc.Labels[LabelShardIndex]; ok { + shardServices = append(shardServices, svc) + } + } + return shardServices + } + + It("creates one NodePort Service per shard pinned to each node", func() { + cluster := &valkeyiov1alpha1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "shardsvc-nodeport", Namespace: "default"}, + Spec: valkeyiov1alpha1.ValkeyClusterSpec{ + Shards: 2, + Replicas: 1, + ExternalAccess: &valkeyiov1alpha1.ExternalAccessSpec{ + Enabled: true, + ServiceType: corev1.ServiceTypeNodePort, + }, + }, + } + Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) + defer func() { + for _, svc := range listShardServices(cluster.Name) { + _ = k8sClient.Delete(ctx, &svc) + } + _ = k8sClient.Delete(ctx, cluster) + }() + + endpoints, err := newReconciler().reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + services := listShardServices(cluster.Name) + Expect(services).To(HaveLen(2)) + for _, svc := range services { + Expect(svc.Spec.Type).To(Equal(corev1.ServiceTypeNodePort)) + Expect(svc.Labels).To(HaveKeyWithValue("app.kubernetes.io/managed-by", "valkey-operator")) + Expect(svc.Spec.Selector).To(HaveKeyWithValue(LabelCluster, cluster.Name)) + Expect(svc.Spec.Selector).To(HaveKey(LabelShardIndex)) + // One port per node (1 primary + 1 replica), each targeting a node-unique name. + Expect(svc.Spec.Ports).To(HaveLen(2)) + Expect(svc.Spec.Ports[0].TargetPort.StrVal).To(Equal("vk-n0")) + Expect(svc.Spec.Ports[1].TargetPort.StrVal).To(Equal("vk-n1")) + } + + // Status endpoints are returned for every shard, one port slot per node. + Expect(endpoints).To(HaveLen(2)) + for _, ep := range endpoints { + Expect(ep.NodePorts).To(HaveLen(2)) + } + }) + + It("removes shard Services on scale-in and when disabled", func() { + cluster := &valkeyiov1alpha1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "shardsvc-scalein", Namespace: "default"}, + Spec: valkeyiov1alpha1.ValkeyClusterSpec{ + Shards: 3, + Replicas: 0, + ExternalAccess: &valkeyiov1alpha1.ExternalAccessSpec{Enabled: true}, + }, + } + Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) + defer func() { + for _, svc := range listShardServices(cluster.Name) { + _ = k8sClient.Delete(ctx, &svc) + } + _ = k8sClient.Delete(ctx, cluster) + }() + r := newReconciler() + + _, err := r.reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(listShardServices(cluster.Name)).To(HaveLen(3)) + + By("scaling in to one shard") + cluster.Spec.Shards = 1 + _, err = r.reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(listShardServices(cluster.Name)).To(HaveLen(1)) + + By("disabling external access") + cluster.Spec.ExternalAccess.Enabled = false + _, err = r.reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(listShardServices(cluster.Name)).To(BeEmpty()) + }) + + It("is a no-op on a second reconcile and preserves allocated NodePorts", func() { + cluster := &valkeyiov1alpha1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "shardsvc-stable", Namespace: "default"}, + Spec: valkeyiov1alpha1.ValkeyClusterSpec{ + Shards: 1, + Replicas: 1, + ExternalAccess: &valkeyiov1alpha1.ExternalAccessSpec{Enabled: true}, + }, + } + Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) + defer func() { + for _, svc := range listShardServices(cluster.Name) { + _ = k8sClient.Delete(ctx, &svc) + } + _ = k8sClient.Delete(ctx, cluster) + }() + r := newReconciler() + + first, err := r.reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + second, err := r.reconcileShardServices(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + // The allocated NodePorts must be stable across reconciles. + Expect(second).To(Equal(first)) + }) +}) diff --git a/internal/controller/shard_services_unit_test.go b/internal/controller/shard_services_unit_test.go new file mode 100644 index 00000000..dcc892da --- /dev/null +++ b/internal/controller/shard_services_unit_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" +) + +func TestBuildShardServicePorts(t *testing.T) { + ports := buildShardServicePorts(nil, 3) + require.Len(t, ports, 3) + + for i, p := range ports { + // Port is the in-cluster port and must be unique within the Service. + assert.Equal(t, int32(DefaultPort+i), p.Port) + // targetPort references the node-unique container port name so the port + // reaches exactly one pod. + name := "vk-n" + strconv.Itoa(i) + assert.Equal(t, name, p.Name) + assert.Equal(t, name, p.TargetPort.StrVal) + // No NodePort is requested; Kubernetes allocates it. + assert.Equal(t, int32(0), p.NodePort) + } +} + +func TestBuildShardServicePortsPreservesAllocatedNodePorts(t *testing.T) { + existing := []corev1.ServicePort{ + {Name: "vk-n0", NodePort: 31000}, + {Name: "vk-n1", NodePort: 31001}, + } + ports := buildShardServicePorts(existing, 2) + require.Len(t, ports, 2) + assert.Equal(t, int32(31000), ports[0].NodePort, "allocated NodePort must be preserved across updates") + assert.Equal(t, int32(31001), ports[1].NodePort) +} + +func TestShardEndpointFromService(t *testing.T) { + svc := &corev1.Service{} + svc.Spec.Ports = []corev1.ServicePort{ + {Name: "vk-n0", Port: DefaultPort, NodePort: 31000}, + {Name: "vk-n1", Port: DefaultPort + 1, NodePort: 31001}, + } + + nodePort := shardEndpointFromService(2, svc, corev1.ServiceTypeNodePort, 2) + assert.Equal(t, int32(2), nodePort.ShardIndex) + assert.Equal(t, []int32{31000, 31001}, nodePort.NodePorts, "NodePort type reports allocated node ports") + + loadBalancer := shardEndpointFromService(2, svc, corev1.ServiceTypeLoadBalancer, 2) + assert.Equal(t, []int32{DefaultPort, DefaultPort + 1}, loadBalancer.NodePorts, "LoadBalancer type reports frontend ports") +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index da7a9dc3..45e59b51 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -315,6 +315,21 @@ func valkeyNodeName(clusterName string, shardIndex int, nodeIndex int) string { return fmt.Sprintf("%s-%d-%d", clusterName, shardIndex, nodeIndex) } +// shardClientPortName returns the node-unique name of the server container's +// client port. A per-shard Service targets each node individually by referencing +// this name as its targetPort, so only the matching pod is added to the endpoints +// of that Service port. The name fits the 15-character port-name limit. +func shardClientPortName(nodeIndex string) string { + return "vk-n" + nodeIndex +} + +// shardHostname returns the client-facing hostname a node announces, formed as +// "-.". All nodes in a shard share one hostname; they +// are distinguished by their announced client port. +func shardHostname(prefix, shardIndex, domain string) string { + return fmt.Sprintf("%s-%s.%s", prefix, shardIndex, domain) +} + // getTLSConfig returns the TLS configuration for a ValkeyCluster. func getTLSConfig(ctx context.Context, c client.Reader, secretName, serverName, namespace string) (*tls.Config, error) { secret := &corev1.Secret{} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index db376e0c..005ae26d 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "errors" "fmt" + "maps" "reflect" "slices" "strconv" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -128,6 +130,14 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + endpoints, err := r.reconcileShardServices(ctx, cluster) + if err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonServiceError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + cluster.Status.ExternalEndpoints = endpoints + if err := r.reconcilePodDisruptionBudget(ctx, cluster); err != nil { setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonPodDisruptionBudgetError, err.Error(), metav1.ConditionFalse) _ = r.updateStatus(ctx, cluster, nil) @@ -482,6 +492,136 @@ func (r *ValkeyClusterReconciler) upsertService(ctx context.Context, cluster *va return nil } +func shardServiceName(clusterName string, shardIndex int) string { + return fmt.Sprintf("%s%s-shard-%d", resourcePrefix, clusterName, shardIndex) +} + +// reconcileShardServices ensures one externally-reachable Service per shard when +// external access is enabled, and removes Services left behind by a scale-in. It +// returns the external endpoint of each shard for the cluster status. +// +// Each Service selects the pods of a single shard and exposes one port per node, +// targeting the node's uniquely-named client port (see shardClientPortName) so a +// port reaches exactly one pod. NodePort ports are allocated by Kubernetes and read +// back from the Service; LoadBalancer ports are the node-indexed frontend ports. +func (r *ValkeyClusterReconciler) reconcileShardServices(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) ([]valkeyiov1alpha1.ShardEndpoint, error) { + if cluster.Spec.ExternalAccess == nil || !cluster.Spec.ExternalAccess.Enabled { + return nil, r.deleteExcessShardServices(ctx, cluster, 0) + } + + ea := cluster.Spec.ExternalAccess + nodesPerShard := 1 + int(cluster.Spec.Replicas) + serviceType := ea.ServiceType + if serviceType == "" { + serviceType = corev1.ServiceTypeNodePort + } + + endpoints := make([]valkeyiov1alpha1.ShardEndpoint, 0, int(cluster.Spec.Shards)) + for shardIndex := range int(cluster.Spec.Shards) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: shardServiceName(cluster.Name, shardIndex), + Namespace: cluster.Namespace, + }, + } + shardSelector := strconv.Itoa(shardIndex) + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, svc, func() error { + svc.Labels = labels(cluster) + svc.Labels[LabelCluster] = cluster.Name + svc.Labels[LabelShardIndex] = shardSelector + svc.Annotations = maps.Clone(ea.ServiceAnnotations) + svc.Spec.Type = serviceType + svc.Spec.ExternalTrafficPolicy = ea.ExternalTrafficPolicy + svc.Spec.Selector = map[string]string{ + LabelCluster: cluster.Name, + LabelShardIndex: shardSelector, + } + svc.Spec.Ports = buildShardServicePorts(svc.Spec.Ports, nodesPerShard) + return controllerutil.SetControllerReference(cluster, svc, r.Scheme) + }) + if err != nil { + r.Recorder.Eventf(cluster, svc, corev1.EventTypeWarning, "ShardServiceUpdateFailed", "UpdateShardService", "Failed to upsert shard %d Service: %v", shardIndex, err) + return nil, err + } + + endpoints = append(endpoints, shardEndpointFromService(int32(shardIndex), svc, serviceType, nodesPerShard)) + } + + if err := r.deleteExcessShardServices(ctx, cluster, int(cluster.Spec.Shards)); err != nil { + return nil, err + } + return endpoints, nil +} + +// buildShardServicePorts returns one ServicePort per node, targeting the node's +// uniquely-named client port. Existing ports are passed in so Kubernetes-allocated +// NodePort values are preserved across updates. The Port value is the in-cluster +// port and only needs to be unique within the Service. +func buildShardServicePorts(existing []corev1.ServicePort, nodesPerShard int) []corev1.ServicePort { + nodePortByName := make(map[string]int32, len(existing)) + for _, p := range existing { + nodePortByName[p.Name] = p.NodePort + } + + ports := make([]corev1.ServicePort, 0, nodesPerShard) + for nodeIndex := range nodesPerShard { + name := shardClientPortName(strconv.Itoa(nodeIndex)) + ports = append(ports, corev1.ServicePort{ + Name: name, + Port: int32(DefaultPort + nodeIndex), + TargetPort: intstr.FromString(name), + NodePort: nodePortByName[name], + }) + } + return ports +} + +// shardEndpointFromService reads the externally-reachable ports of a shard Service. +// For NodePort it reports the Kubernetes-allocated node ports; otherwise the Service +// ports (the LoadBalancer frontend ports). +func shardEndpointFromService(shardIndex int32, svc *corev1.Service, serviceType corev1.ServiceType, nodesPerShard int) valkeyiov1alpha1.ShardEndpoint { + portByName := make(map[string]corev1.ServicePort, len(svc.Spec.Ports)) + for _, p := range svc.Spec.Ports { + portByName[p.Name] = p + } + + nodePorts := make([]int32, 0, nodesPerShard) + for nodeIndex := range nodesPerShard { + p := portByName[shardClientPortName(strconv.Itoa(nodeIndex))] + if serviceType == corev1.ServiceTypeNodePort { + nodePorts = append(nodePorts, p.NodePort) + } else { + nodePorts = append(nodePorts, p.Port) + } + } + return valkeyiov1alpha1.ShardEndpoint{ShardIndex: shardIndex, NodePorts: nodePorts} +} + +// deleteExcessShardServices removes shard Services whose index is at or beyond +// desiredShards. A desiredShards of 0 removes all of them (external access disabled). +func (r *ValkeyClusterReconciler) deleteExcessShardServices(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, desiredShards int) error { + log := logf.FromContext(ctx) + services := &corev1.ServiceList{} + if err := r.List(ctx, services, client.InNamespace(cluster.Namespace), client.MatchingLabels(map[string]string{LabelCluster: cluster.Name})); err != nil { + return err + } + for i := range services.Items { + svc := &services.Items[i] + shardIndex, err := strconv.Atoi(svc.Labels[LabelShardIndex]) + if err != nil { + // Not a shard Service (e.g. the headless Service has no shard-index label). + continue + } + if shardIndex >= desiredShards { + if err := r.Delete(ctx, svc); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete excess shard Service %s: %w", svc.Name, err) + } + log.V(1).Info("deleted excess shard Service", "name", svc.Name, "shard", shardIndex) + } + } + return nil +} + // reconcileValkeyNodes ensures every (shard, nodeIndex) pair has a ValkeyNode CR. // Each ValkeyNode manages exactly one Pod (Replicas=1) and is named // deterministically: @@ -706,6 +846,7 @@ func buildClusterValkeyNode(cluster *valkeyiov1alpha1.ValkeyCluster, shardIndex UsersACLSecretName: getInternalSecretName(cluster.Name), TLS: cluster.Spec.TLS, Config: cluster.Spec.Config, + ExternalAccess: cluster.Spec.ExternalAccess, }, } } @@ -1127,6 +1268,9 @@ func (r *ValkeyClusterReconciler) updateStatus(ctx context.Context, cluster *val // Apply conditions from the in-memory cluster object current.Status.Conditions = cluster.Status.Conditions + // Apply external endpoints from the in-memory cluster object + current.Status.ExternalEndpoints = cluster.Status.ExternalEndpoints + // compute Valkey Cluster state from conditions (priority order: Degraded > Ready > Progressing > Failed) readyCondition := meta.FindStatusCondition(current.Status.Conditions, valkeyiov1alpha1.ConditionReady) progressingCondition := meta.FindStatusCondition(current.Status.Conditions, valkeyiov1alpha1.ConditionProgressing) diff --git a/internal/controller/valkeynode_resources.go b/internal/controller/valkeynode_resources.go index 35cc61cf..6afb4658 100644 --- a/internal/controller/valkeynode_resources.go +++ b/internal/controller/valkeynode_resources.go @@ -288,6 +288,25 @@ func buildContainersDef(node *valkeyiov1alpha1.ValkeyNode) ([]corev1.Container, ) } + // When external access is enabled, announce a human-readable node name so + // cross-node events (e.g. failures) reference the ValkeyNode name instead of + // only the opaque node ID, and give the client port a node-unique name so a + // per-shard Service can target this node individually. + if ea := node.Spec.ExternalAccess; ea != nil && ea.Enabled { + containers[0].Command = append(containers[0].Command, + "--cluster-announce-human-nodename", node.Name) + containers[0].Ports[0].Name = shardClientPortName(node.Labels[LabelNodeIndex]) + + // Announce the shard hostname so clients can be directed to it. This is + // metadata until cluster-preferred-endpoint-type selects hostname; the + // cluster bus continues to use the node's IP. + if ea.Domain != "" { + containers[0].Command = append(containers[0].Command, + "--cluster-announce-hostname", + shardHostname(ea.HostnamePrefix, node.Labels[LabelShardIndex], ea.Domain)) + } + } + // Add exporter sidecar if enabled. if node.Spec.Exporter.Enabled { containers = append(containers, generateMetricsExporterContainerDef(node.Spec.Exporter, node.Labels[LabelCluster], node.Spec.TLS)) diff --git a/internal/controller/valkeynode_resources_test.go b/internal/controller/valkeynode_resources_test.go index a135c537..352d3d54 100644 --- a/internal/controller/valkeynode_resources_test.go +++ b/internal/controller/valkeynode_resources_test.go @@ -422,6 +422,69 @@ func TestBuildValkeyNodePodTemplateSpec_Resources(t *testing.T) { assert.Equal(t, resources, pts.Spec.Containers[0].Resources, "resource requirements should pass through") } +func TestBuildContainersDef_ExternalAccessHumanNodename(t *testing.T) { + node := newTestValkeyNode("mycluster-1-2", "test-ns") + node.Spec.ExternalAccess = &valkeyv1.ExternalAccessSpec{Enabled: true} + + containers, err := buildContainersDef(node) + require.NoError(t, err) + + assert.Equal(t, + []string{"valkey-server", "/config/valkey.conf", "--cluster-announce-ip", "$(POD_IP)", "--cluster-announce-human-nodename", "mycluster-1-2"}, + containers[0].Command, + "enabling external access should announce the ValkeyNode name as the human nodename") +} + +func TestBuildContainersDef_ExternalAccessHostname(t *testing.T) { + node := newTestValkeyNode("mycluster-1-2", "test-ns") + node.Labels = map[string]string{LabelShardIndex: "1", LabelNodeIndex: "2"} + node.Spec.ExternalAccess = &valkeyv1.ExternalAccessSpec{ + Enabled: true, + HostnamePrefix: "shard", + Domain: "example.com", + } + + containers, err := buildContainersDef(node) + require.NoError(t, err) + + assert.Contains(t, containers[0].Command, "--cluster-announce-hostname") + assert.Contains(t, containers[0].Command, "shard-1.example.com", + "the shard hostname should be -.") + // The client port is named per node so the shard Service can target it. + assert.Equal(t, "vk-n2", containers[0].Ports[0].Name) +} + +func TestBuildContainersDef_ExternalAccessNoHostnameWithoutDomain(t *testing.T) { + node := newTestValkeyNode("mycluster-1-2", "test-ns") + node.Labels = map[string]string{LabelShardIndex: "1", LabelNodeIndex: "2"} + node.Spec.ExternalAccess = &valkeyv1.ExternalAccessSpec{Enabled: true, HostnamePrefix: "shard"} + + containers, err := buildContainersDef(node) + require.NoError(t, err) + + assert.NotContains(t, containers[0].Command, "--cluster-announce-hostname", + "no hostname should be announced when domain is unset") +} + +func TestBuildContainersDef_ExternalAccessDisabledIsUnchanged(t *testing.T) { + // Backward compatibility: a nil or disabled ExternalAccess must render the + // same command as a cluster without the field at all. + base := newTestValkeyNode("mycluster-1-2", "test-ns") + baseContainers, err := buildContainersDef(base) + require.NoError(t, err) + + disabled := newTestValkeyNode("mycluster-1-2", "test-ns") + disabled.Spec.ExternalAccess = &valkeyv1.ExternalAccessSpec{Enabled: false} + disabledContainers, err := buildContainersDef(disabled) + require.NoError(t, err) + + assert.Equal(t, + []string{"valkey-server", "/config/valkey.conf", "--cluster-announce-ip", "$(POD_IP)"}, + baseContainers[0].Command) + assert.Equal(t, baseContainers[0].Command, disabledContainers[0].Command, + "a disabled ExternalAccess must not change the rendered command") +} + func TestBuildValkeyNodeConfigMap(t *testing.T) { node := newTestValkeyNode("mynode", "test-ns") cm, err := buildValkeyNodeConfigMap(node) diff --git a/internal/valkey/clusterstate_test.go b/internal/valkey/clusterstate_test.go index 677cd207..33aaff13 100644 --- a/internal/valkey/clusterstate_test.go +++ b/internal/valkey/clusterstate_test.go @@ -504,3 +504,24 @@ func TestHighestOffsetReplica(t *testing.T) { } }) } + +func TestGetFailingNodes_WithAnnouncedHostname(t *testing.T) { + // When an announced hostname is set, CLUSTER NODES appends ",hostname" to the + // address field. The address parser must still extract the node IP so the + // reconciler can correlate it with the pod IP. + node := &NodeState{ + ClusterNodes: "self 10.0.0.1:6379@16379,shard-0.example.com myself,master - 0 0 1 connected 0-5461\n" + + "dead1 10.0.0.99:6379@16379,shard-1.example.com master,fail - 0 0 2 connected 5462-10922\n", + } + + failing := node.GetFailingNodes() + if len(failing) != 1 { + t.Fatalf("expected 1 failing node, got %d", len(failing)) + } + if failing[0].Address != "10.0.0.99" { + t.Errorf("expected address 10.0.0.99, got %q", failing[0].Address) + } + if failing[0].Id != "dead1" { + t.Errorf("expected id dead1, got %q", failing[0].Id) + } +} diff --git a/test/e2e/valkeycluster_external_access_test.go b/test/e2e/valkeycluster_external_access_test.go new file mode 100644 index 00000000..a603976e --- /dev/null +++ b/test/e2e/valkeycluster_external_access_test.go @@ -0,0 +1,123 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "os/exec" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" + "valkey.io/valkey-operator/test/utils" +) + +var _ = Describe("ValkeyCluster external access", Ordered, Label("ValkeyCluster", "ExternalAccess"), func() { + const clusterName = "cluster-external-access-sample" + const manifestPath = "config/samples/v1alpha1_valkeycluster-external-access.yaml" + + AfterEach(func() { + specReport := CurrentSpecReport() + if specReport.Failed() { + utils.CollectDebugInfo(namespace) + } + }) + + AfterAll(func() { + utils.Run(exec.Command("kubectl", "delete", "-f", manifestPath, "--ignore-not-found=true")) + }) + + It("exposes each shard through a NodePort Service and reports the ports in status", func() { + By("creating the external-access cluster manifest") + utils.Run(exec.Command("kubectl", "delete", "-f", manifestPath, "--ignore-not-found=true")) + _, err := utils.Run(exec.Command("kubectl", "create", "-f", manifestPath)) + Expect(err).NotTo(HaveOccurred(), "Failed to create external-access ValkeyCluster CR") + + By("verifying the cluster reaches Ready") + verifyClusterReady := func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady)) + g.Expect(cr.Status.ReadyShards).To(Equal(int32(3))) + } + Eventually(verifyClusterReady, 10*time.Minute, 5*time.Second).Should(Succeed()) + + By("verifying a NodePort Service exists per shard") + verifyServices := func(g Gomega) { + cmd := exec.Command("kubectl", "get", "svc", + "-l", fmt.Sprintf("valkey.io/cluster=%s,valkey.io/shard-index", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}:{.spec.type}{\"\\n\"}{end}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + lines := utils.GetNonEmptyLines(output) + g.Expect(lines).To(HaveLen(3)) + for _, line := range lines { + g.Expect(line).To(ContainSubstring(":NodePort")) + } + } + Eventually(verifyServices).Should(Succeed()) + + By("verifying the status reports allocated external ports for every shard") + verifyStatus := func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.ExternalEndpoints).To(HaveLen(3)) + for _, ep := range cr.Status.ExternalEndpoints { + // One primary + one replica per shard. + g.Expect(ep.NodePorts).To(HaveLen(2)) + for _, port := range ep.NodePorts { + g.Expect(port).To(BeNumerically(">=", 30000)) + g.Expect(port).To(BeNumerically("<=", 32767)) + } + } + } + Eventually(verifyStatus).Should(Succeed()) + + By("verifying each shard Service port resolves to exactly one endpoint") + verifyEndpoints := func(g Gomega) { + cmd := exec.Command("kubectl", "get", "endpointslices", + "-l", fmt.Sprintf("kubernetes.io/service-name=%s", controllerShardServiceName(clusterName, 0)), + "-o", "jsonpath={range .items[*].endpoints[*]}{.addresses[0]}{\"\\n\"}{end}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(utils.GetNonEmptyLines(output)).NotTo(BeEmpty()) + } + Eventually(verifyEndpoints).Should(Succeed()) + + By("verifying the announced shard hostname appears in CLUSTER NODES") + verifyHostname := func(g Gomega) { + cmd := exec.Command("kubectl", "exec", + fmt.Sprintf("valkey-%s-0-0-0", clusterName), "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(ContainSubstring("shard-0.valkey.example.com")) + } + Eventually(verifyHostname).Should(Succeed()) + }) +}) + +// controllerShardServiceName mirrors controller.shardServiceName, which is +// unexported, so the e2e test can address the per-shard Service by name. +func controllerShardServiceName(clusterName string, shardIndex int) string { + return fmt.Sprintf("valkey-%s-shard-%d", clusterName, shardIndex) +}