From a9542b431e6be3154ea668b506edceb6e8a5c77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Sun, 24 May 2026 15:11:51 +0200 Subject: [PATCH 01/11] Add chaos test suite for long-running fault injection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a standalone chaos test framework in test/chaos/ that continuously injects faults into a ValkeyCluster until failure. Scenarios include pod deletion, workload deletion, network partitions, container pauses, shard scaling, rolling updates, controller pod deletion, worker node pauses, and full cluster delete/recreate. - Dedicated build tag (//go:build chaos) and Kind cluster - New Makefile target: make test-chaos - CI step to verify compilation - Configurable via environment variables (scenarios, shards, replicas, workload type, tolerations, CPU pressure ...) - CPU pressure mode: randomly throttle Kind worker node CPUs per iteration to simulate loaded nodes (CHAOS_CPU_PRESSURE=true) - Network partition scenarios block all traffic (not just Valkey ports) to simulate fully unreachable nodes - Scenarios disabled by default: network-partition-primary, network-partition-replica, pause-worker-node (require tolerations for meaningful testing) Examples: # Run all default scenarios make test-chaos # Stress test random scaling between 3 and 9 shards CHAOS_SCENARIOS=scale-shards CHAOS_MIN_SHARDS=3 CHAOS_MAX_SHARDS=9 make test-chaos # Randomly alternate between pod deletion and scaling CHAOS_SCENARIOS=delete-primary-pod,scale-shards make test-chaos # Alternate between pod deletion and scaling in sequence CHAOS_SCENARIOS=delete-primary-pod,scale-shards CHAOS_MODE=sequential make test-chaos # Repeatedly kill the primary pod of a random shard CHAOS_SCENARIOS=delete-primary-pod make test-chaos # Stress test with CPU pressure CHAOS_CPU_PRESSURE=true make test-chaos # Test network partitions with pod evictions CHAOS_SCENARIOS=network-partition-primary,pause-worker-node \ CHAOS_TOLERATION_SECONDS=10 make test-chaos # Repeatedly delete pods across multiple shards CHAOS_SCENARIOS=delete-multiple-shard-pods \ CHAOS_SHARDS=7 CHAOS_REPLICAS=2 KIND_WORKERS=3 make test-chaos # Run with Deployment workload type CHAOS_WORKLOAD_TYPE=Deployment make test-chaos Note: Network partition, container pause, and CPU pressure scenarios require Docker access to Kind worker nodes. These only work when running against a local Kind cluster, not remote clusters. Signed-off-by: Björn Svensson --- .github/workflows/test.yml | 3 + Makefile | 14 +- docs/chaos-testing.md | 140 +++++ docs/developer-guide.md | 1 + test/chaos/chaos_suite_test.go | 120 +++++ test/chaos/chaos_test.go | 934 +++++++++++++++++++++++++++++++++ test/utils/chaos.go | 659 +++++++++++++++++++++++ test/utils/utils.go | 1 + 8 files changed, 1870 insertions(+), 2 deletions(-) create mode 100644 docs/chaos-testing.md create mode 100644 test/chaos/chaos_suite_test.go create mode 100644 test/chaos/chaos_test.go create mode 100644 test/utils/chaos.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6578d7a2..290cdd86 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,3 +23,6 @@ jobs: run: | go mod tidy make test + + - name: Verify that chaos tests compiles + run: go test -tags=chaos -c ./test/chaos/ -o /dev/null diff --git a/Makefile b/Makefile index 3f74aca6..7d6ab478 100644 --- a/Makefile +++ b/Makefile @@ -80,6 +80,8 @@ test: manifests generate fmt vet setup-envtest ## Run tests. # CertManager is installed by default; skip with: # - CERT_MANAGER_INSTALL_SKIP=true KIND_CLUSTER ?= valkey-operator-test-e2e +KIND_CLUSTER_CHAOS ?= valkey-operator-test-chaos +KIND_WORKERS ?= 2 .PHONY: setup-test-e2e setup-test-e2e: ## Set up a Kind cluster for e2e tests if it does not exist @@ -91,8 +93,10 @@ setup-test-e2e: ## Set up a Kind cluster for e2e tests if it does not exist *"$(KIND_CLUSTER)"*) \ echo "Kind cluster '$(KIND_CLUSTER)' already exists. Skipping creation." ;; \ *) \ - echo "Creating Kind cluster '$(KIND_CLUSTER)'..."; \ - echo '{"kind": "Cluster", "apiVersion": "kind.x-k8s.io/v1alpha4", "nodes": [{"role": "control-plane"}, {"role": "worker"}, {"role": "worker"}]}' | $(KIND) create cluster --name $(KIND_CLUSTER) --config - ;; \ + echo "Creating Kind cluster '$(KIND_CLUSTER)' with $(KIND_WORKERS) workers..."; \ + echo '{"kind":"Cluster","apiVersion":"kind.x-k8s.io/v1alpha4","nodes":[{"role":"control-plane"}' \ + $$(for i in $$(seq 1 $(KIND_WORKERS)); do echo -n ',{"role":"worker"}'; done) \ + ']}' | $(KIND) create cluster --name $(KIND_CLUSTER) --config - ;; \ esac @echo "Switching kubectl context to kind-$(KIND_CLUSTER)..." @"$(KUBECTL)" config use-context kind-$(KIND_CLUSTER) @@ -102,6 +106,12 @@ test-e2e: setup-test-e2e manifests generate fmt vet ## Run the e2e tests. Expect KIND=$(KIND) KIND_CLUSTER=$(KIND_CLUSTER) go test -tags=e2e ./test/e2e/ -v -ginkgo.v -ginkgo.label-filter "${TEST_LABELS}" -timeout 30m $(MAKE) cleanup-test-e2e +.PHONY: test-chaos +test-chaos: manifests generate fmt vet ## Run chaos tests in separate cluster (runs until failure) + $(MAKE) setup-test-e2e KIND_CLUSTER=$(KIND_CLUSTER_CHAOS) + KIND=$(KIND) KIND_CLUSTER=$(KIND_CLUSTER_CHAOS) go run github.com/onsi/ginkgo/v2/ginkgo --tags=chaos -v --timeout=24h ./test/chaos/ + $(MAKE) cleanup-test-e2e KIND_CLUSTER=$(KIND_CLUSTER_CHAOS) + .PHONY: cleanup-test-e2e cleanup-test-e2e: ## Tear down the Kind cluster used for e2e tests @$(KIND) delete cluster --name $(KIND_CLUSTER) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md new file mode 100644 index 00000000..a080be01 --- /dev/null +++ b/docs/chaos-testing.md @@ -0,0 +1,140 @@ +# Chaos Testing + +The chaos test suite continuously injects faults into a running ValkeyCluster and verifies that the operator recovers the cluster to a healthy state without losing data. + +## Prerequisites + +- Kind +- Docker +- `kubectl` configured to the target cluster + +## Running + +```bash +make test-chaos +``` + +This creates a Kind cluster, builds and deploys the operator, then runs the chaos test loop until a failure occurs or it is interrupted. + +To save the output to a log file (useful for post-mortem analysis): + +```bash +make test-chaos 2>&1 | tee --ignore-interrupts chaos.log +``` + +The `--ignore-interrupts` flag ensures the log is fully written even if you Ctrl+C the test. + +## How It Works + +1. Creates a ValkeyCluster with the configured number of shards and replicas +2. Waits for the cluster to become Ready and healthy +3. Seeds test data across all shards +4. Loops indefinitely: + - Picks a scenario (random or sequential) + - Logs cluster state and per-shard key counts + - Injects the fault + - Waits for recovery (Ready state + cluster health) + - Verifies data integrity (unless the scenario is expected to lose data) +5. On failure, collects controller logs, pod states, and CLUSTER NODES output + +## Configuration + +All configuration is via environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `KIND_WORKERS` | `2` | Number of Kind worker nodes to create | +| `CHAOS_SHARDS` | `3` | Number of shards to create | +| `CHAOS_MIN_SHARDS` | value of `CHAOS_SHARDS` | Minimum shards for scale scenarios | +| `CHAOS_MAX_SHARDS` | `CHAOS_SHARDS + 3` | Maximum shards for scale scenarios | +| `CHAOS_REPLICAS` | `1` | Replicas per shard | +| `CHAOS_WORKLOAD_TYPE` | `StatefulSet` | `StatefulSet` or `Deployment` | +| `CHAOS_PERSISTENCE` | `false` | Enable persistence (requires StatefulSet) | +| `CHAOS_SCENARIOS` | all except disabled | Comma-separated list of scenarios to run | +| `CHAOS_SEED` | current time | Random seed for reproducibility | +| `CHAOS_MODE` | `random` | `random` or `sequential` scenario selection | +| `CHAOS_TARGET_SHARD` | `random` | Shard index to target, or `random` | +| `CHAOS_RECOVERY_TIMEOUT` | `5m` | Max time to wait for cluster recovery | +| `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | +| `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | +| `CHAOS_DATA_SIZE` | `3` | Size of each key's value in bytes | +| `CHAOS_CPU_PRESSURE` | `false` | Throttle Kind worker node CPUs each iteration | +| `CHAOS_CPU_MIN` | `0.3` | Minimum CPU limit when throttling | +| `CHAOS_CPU_MAX` | `1.0` | Maximum CPU limit when throttling | + +## Scenarios + +| Scenario | Description | +|----------|-------------| +| `delete-primary-pod` | Deletes the primary pod of a shard | +| `delete-replica-pod` | Deletes a replica pod | +| `delete-shard-pods` | Deletes all pods in a shard | +| `delete-multiple-shard-pods` | Deletes pods across multiple shards | +| `delete-primary-workload` | Deletes the primary's Deployment/StatefulSet | +| `delete-replica-workload` | Deletes a replica's Deployment/StatefulSet | +| `pause-primary-container` | Pauses the primary container | +| `pause-replica-container` | Pauses a replica container | +| `scale-shards` | Scales shards up or down randomly | +| `rolling-update` | Changes cluster config to trigger a rolling update | +| `delete-recreate-cluster` | Deletes and recreates the ValkeyCluster | +| `delete-controller-pod` | Kills the operator controller pod | +| `pause-worker-node` | Pauses the Kind worker node hosting the primary (disabled by default) | +| `network-partition-primary` | Isolates the primary from the cluster (disabled by default) | +| `network-partition-replica` | Isolates a replica from the cluster (disabled by default) | + +Scenarios that target replicas are skipped when `CHAOS_REPLICAS=0`. + +Scenarios marked "disabled by default" are excluded unless explicitly listed in `CHAOS_SCENARIOS`. +They require `CHAOS_TOLERATION_SECONDS` to be set for meaningful eviction testing. + +## Reading the Output + +Each iteration logs: + +``` +--- Iteration 5: scenario=rolling-update --- + PODS before: ... + CLUSTER NODES before: ... + KEY COUNT before: ... + + STEP: Iteration 5: waiting for cluster recovery + + PODS after: ... + CLUSTER NODES after: ... + Iteration 5: PASSED +``` + +- **PODS before/after** — pod placement showing node, IP, and readiness. +- **CLUSTER NODES before/after** — full cluster topology showing roles, slots, and node IDs. +- **KEY COUNT before** — per-shard key distribution before fault injection. All shards should have keys. If a shard shows 0, data was already lost before this iteration. + +On failure, the test collects extensive debug information: + +- **Keyspace counts** — total and per-shard key counts at the time of failure +- **Pod listing** — all cluster pods with node placement and readiness +- **ValkeyCluster CR status** — full YAML of the ValkeyCluster resource +- **ValkeyNodes** — all ValkeyNode resources for the cluster +- **CLUSTER NODES** — topology output from every pod +- **Server logs** — last 100 lines from each pod's server container +- **Controller logs** — operator controller-manager logs and pod description +- **Kubernetes events** — events in the operator namespace + +## Examples + +```bash +# Run all default scenarios with Deployment workload type +CHAOS_WORKLOAD_TYPE=Deployment make test-chaos + +# Stress test with CPU pressure on worker nodes +CHAOS_CPU_PRESSURE=true make test-chaos + +# Test network partitions with pod evictions +CHAOS_SCENARIOS=network-partition-primary,pause-worker-node \ + CHAOS_TOLERATION_SECONDS=10 make test-chaos + +# Large cluster with more workers +CHAOS_SHARDS=7 CHAOS_REPLICAS=2 KIND_WORKERS=3 make test-chaos + +# Run scenarios in sequence instead of random +CHAOS_SCENARIOS=delete-primary-pod,scale-shards CHAOS_MODE=sequential make test-chaos +``` diff --git a/docs/developer-guide.md b/docs/developer-guide.md index 683a9fbd..370f40e7 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -16,6 +16,7 @@ make build # Build the manager binary (runs manifests, generate, fmt, ve make manifests # Regenerate CRD manifests and RBAC from controller-runtime markers make generate # Regenerate DeepCopy methods make test-e2e # Run end-to-end tests in a Kind cluster (creates one if needed, tears it down after) +make test-chaos # Run chaos tests (see docs/chaos-testing.md) ``` After modifying types in `api/v1alpha1/`, always run `make manifests generate` before testing. diff --git a/test/chaos/chaos_suite_test.go b/test/chaos/chaos_suite_test.go new file mode 100644 index 00000000..8edea5c1 --- /dev/null +++ b/test/chaos/chaos_suite_test.go @@ -0,0 +1,120 @@ +//go:build chaos +// +build chaos + +/* +Copyright 2026 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaos + +import ( + "fmt" + "os" + "os/exec" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "valkey.io/valkey-operator/test/utils" +) + +var ( + managerImage = "valkey/valkey-operator:v0.0.1" + shouldCleanupCertManager = false +) + +func TestChaos(t *testing.T) { + RegisterFailHandler(Fail) + _, _ = fmt.Fprintf(GinkgoWriter, "Starting valkey-operator chaos test suite\n") + RunSpecs(t, "Chaos Suite") +} + +var _ = BeforeSuite(func() { + if os.Getenv("KUBECTL_KUBERC") != "true" { + _ = os.Setenv("KUBECTL_KUBERC", "false") + } + + By("building the manager image") + cmd := exec.Command("make", "docker-build", fmt.Sprintf("IMG=%s", managerImage)) + _, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "Failed to build the manager image") + + By("loading the manager image on Kind") + err = utils.LoadImageToKindClusterWithName(managerImage) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "Failed to load the manager image into Kind") + + setupCertManager() + + SetDefaultEventuallyTimeout(2 * time.Minute) + SetDefaultEventuallyPollingInterval(time.Second) + + By("creating manager namespace") + cmd = exec.Command("kubectl", "create", "ns", namespace) + _, _ = utils.Run(cmd) // ignore if already exists + + By("labeling the namespace to enforce the restricted security policy") + cmd = exec.Command("kubectl", "label", "--overwrite", "ns", namespace, + "pod-security.kubernetes.io/enforce=restricted") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to label namespace with restricted policy") + + By("installing CRDs") + cmd = exec.Command("make", "install") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to install CRDs") + + By("deploying the controller-manager") + cmd = exec.Command("make", "deploy", fmt.Sprintf("IMG=%s", managerImage)) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to deploy the controller-manager") +}) + +var _ = AfterSuite(func() { + teardownCertManager() + + By("undeploying the controller-manager") + cmd := exec.Command("make", "undeploy") + _, _ = utils.Run(cmd) + + By("uninstalling CRDs") + cmd = exec.Command("make", "uninstall") + _, _ = utils.Run(cmd) + + By("removing manager namespace") + cmd = exec.Command("kubectl", "delete", "ns", namespace) + _, _ = utils.Run(cmd) +}) + +func setupCertManager() { + if os.Getenv("CERT_MANAGER_INSTALL_SKIP") == "true" { + return + } + if utils.IsCertManagerCRDsInstalled() { + return + } + shouldCleanupCertManager = true + By("installing CertManager") + Expect(utils.InstallCertManager()).To(Succeed(), "Failed to install CertManager") +} + +func teardownCertManager() { + if !shouldCleanupCertManager { + return + } + By("uninstalling CertManager") + utils.UninstallCertManager() +} diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go new file mode 100644 index 00000000..7625b475 --- /dev/null +++ b/test/chaos/chaos_test.go @@ -0,0 +1,934 @@ +//go:build chaos +// +build chaos + +/* +Copyright 2026 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaos + +//---------------------------------------------------------- +// Example: +// +// CHAOS_SCENARIOS=delete-recreate-cluster \ +// CHAOS_WORKLOAD_TYPE=Deployment \ +// make test-chaos 2>&1 | tee --ignore-interrupts chaos.log +// +// To run in parallel with e2e tests (separate Kind cluster), +// set KUBECONFIG to avoid context conflicts: +// +// KUBECONFIG=/tmp/chaos-kube.conf make test-chaos +// +//---------------------------------------------------------- + +import ( + "fmt" + "math/rand" + "os" + "os/exec" + "strconv" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" + "valkey.io/valkey-operator/test/utils" +) + +const namespace = "valkey-operator-system" + +// ChaosContext holds the configuration for a chaos test iteration. +type ChaosContext struct { + ClusterName string + Namespace string + WorkloadType string + TargetShard int + Shards int + MinShards int + MaxShards int + Replicas int + TolerationSec int + Rand *rand.Rand +} + +// Scenario defines a named chaos scenario. +type Scenario struct { + Name string + LosesData bool // may lose data, skip verification and re-seed + LosesDataIfNoReplica bool // loses data only when replicas == 0 + DisabledByDefault bool // excluded unless explicitly listed in CHAOS_SCENARIOS + Inject func(ctx *ChaosContext) error +} + +func (s Scenario) losesData(replicas int) bool { + return s.LosesData || (s.LosesDataIfNoReplica && replicas == 0) +} + +var allScenarios = []Scenario{ + {Name: "delete-primary-pod", LosesDataIfNoReplica: true, Inject: deletePrimaryPod}, + {Name: "delete-replica-pod", Inject: deleteReplicaPod}, + {Name: "delete-shard-pods", LosesData: true, Inject: deleteShardPods}, + {Name: "delete-multiple-shard-pods", LosesData: true, Inject: deleteMultipleShardPods}, + {Name: "delete-primary-workload", LosesDataIfNoReplica: true, Inject: deletePrimaryWorkload}, + {Name: "delete-replica-workload", Inject: deleteReplicaWorkload}, + {Name: "pause-primary-container", Inject: pausePrimaryContainer}, + {Name: "pause-replica-container", Inject: pauseReplicaContainer}, + {Name: "scale-shards", Inject: scaleShards}, + {Name: "rolling-update", Inject: rollingUpdate}, + {Name: "delete-recreate-cluster", LosesData: true, Inject: deleteRecreateCluster}, + {Name: "delete-controller-pod", Inject: deleteControllerPod}, + {Name: "pause-worker-node", DisabledByDefault: true, LosesData: true, Inject: pauseWorkerNode}, + {Name: "network-partition-primary", DisabledByDefault: true, LosesDataIfNoReplica: true, Inject: networkPartitionPrimary}, + {Name: "network-partition-replica", DisabledByDefault: true, Inject: networkPartitionReplica}, +} + +var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { + var ( + clusterName = "chaos-test-cluster" + workloadType string + persistence bool + shards int + minShards int + maxShards int + replicas int + numKeys int + dataSize int + seededKeys int + recoveryTimeout time.Duration + tolerationSec int + targetShard string + mode string + seed int64 + rnd *rand.Rand + scenarios []Scenario + cpuPressure bool + cpuMin float64 + cpuMax float64 + throttledNodes []string + workerNodes []string + ) + + BeforeAll(func() { + // Parse configuration from environment + workloadType = envOneOf("CHAOS_WORKLOAD_TYPE", "StatefulSet", []string{"StatefulSet", "Deployment"}) + persistence = envBool("CHAOS_PERSISTENCE", false) + shards = envIntOrDefault("CHAOS_SHARDS", 3, 1 /* min */) + minShards = envIntOrDefault("CHAOS_MIN_SHARDS", shards, 1 /* min */) + maxShards = envIntOrDefault("CHAOS_MAX_SHARDS", shards+3, minShards /* min */) + replicas = envIntOrDefault("CHAOS_REPLICAS", 1, 0 /* min */) + numKeys = envIntOrDefault("CHAOS_NUM_KEYS", 100000, 0 /* min */) + dataSize = envIntOrDefault("CHAOS_DATA_SIZE", 3, 1 /* min */) + targetShard = envOneOfOrInt("CHAOS_TARGET_SHARD", "random", []string{"random"}) + mode = envOneOf("CHAOS_MODE", "random", []string{"random", "sequential"}) + recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", 5*time.Minute) + tolerationSec = envIntOrDefault("CHAOS_TOLERATION_SECONDS", 0, 0 /* min */) + seed = envInt64OrDefault("CHAOS_SEED", time.Now().UnixNano()) + scenarios = filterScenarios(allScenarios, envOrDefault("CHAOS_SCENARIOS", "")) + cpuPressure = envBool("CHAOS_CPU_PRESSURE", false) + cpuMin = envFloat64OrDefault("CHAOS_CPU_MIN", 0.3, 0.1) + cpuMax = envFloat64OrDefault("CHAOS_CPU_MAX", 1.0, cpuMin) + if cpuPressure { + workerNodes = utils.GetWorkerNodes() + } + + rnd = rand.New(rand.NewSource(seed)) + + // Log configuration + _, _ = fmt.Fprintf(GinkgoWriter, "=== Chaos Test Configuration ===\n") + _, _ = fmt.Fprintf(GinkgoWriter, " WorkloadType: %s\n", workloadType) + _, _ = fmt.Fprintf(GinkgoWriter, " Persistence: %v\n", persistence) + _, _ = fmt.Fprintf(GinkgoWriter, " Shards: %d (min=%d, max=%d)\n", shards, minShards, maxShards) + _, _ = fmt.Fprintf(GinkgoWriter, " Replicas: %d\n", replicas) + _, _ = fmt.Fprintf(GinkgoWriter, " NumKeys: %d\n", numKeys) + _, _ = fmt.Fprintf(GinkgoWriter, " DataSize: %d\n", dataSize) + _, _ = fmt.Fprintf(GinkgoWriter, " TargetShard: %s\n", targetShard) + _, _ = fmt.Fprintf(GinkgoWriter, " RecoveryTimeout: %s\n", recoveryTimeout) + if tolerationSec > 0 { + _, _ = fmt.Fprintf(GinkgoWriter, " Tolerations: not-ready=%ds, unreachable=%ds (default: 300s)\n", tolerationSec, tolerationSec) + } else { + _, _ = fmt.Fprintf(GinkgoWriter, " Tolerations: not set (default: 300s, no evictions will be triggered)\n") + } + _, _ = fmt.Fprintf(GinkgoWriter, " Seed: %d\n", seed) + _, _ = fmt.Fprintf(GinkgoWriter, " Mode: %s\n", mode) + _, _ = fmt.Fprintf(GinkgoWriter, " CpuPressure: %v (min=%.2f, max=%.2f)\n", cpuPressure, cpuMin, cpuMax) + _, _ = fmt.Fprintf(GinkgoWriter, " Scenarios:\n") + enabledNames := make(map[string]bool) + for _, s := range scenarios { + enabledNames[s.Name] = true + } + for _, s := range allScenarios { + status := "enabled" + if !enabledNames[s.Name] { + status = "disabled" + } + _, _ = fmt.Fprintf(GinkgoWriter, " - %-30s [%s]\n", s.Name, status) + } + _, _ = fmt.Fprintf(GinkgoWriter, "================================\n") + + // Create a cluster + By("creating ValkeyCluster for chaos testing") + manifest := fmt.Sprintf(`apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: %s + namespace: default +spec: + shards: %d + replicas: %d + workloadType: %s +`, clusterName, shards, replicas, workloadType) + + if tolerationSec > 0 { + manifest += fmt.Sprintf(` tolerations: + - key: node.kubernetes.io/not-ready + operator: Exists + effect: NoExecute + tolerationSeconds: %d + - key: node.kubernetes.io/unreachable + operator: Exists + effect: NoExecute + tolerationSeconds: %d +`, tolerationSec, tolerationSec) + } + + if persistence { + if workloadType != "StatefulSet" { + Fail("CHAOS_PERSISTENCE=true requires CHAOS_WORKLOAD_TYPE=StatefulSet") + } + manifest += ` persistence: + size: 1Gi + reclaimPolicy: Delete +` + } + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(manifest) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create chaos ValkeyCluster") + + By("waiting for cluster to become Ready") + Eventually(func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady)) + g.Expect(cr.Status.ReadyShards).To(Equal(int32(shards))) + err = utils.VerifyClusterHealth(clusterName, "default", shards, replicas) + g.Expect(err).NotTo(HaveOccurred()) + }, recoveryTimeout, 5*time.Second).Should(Succeed()) + + By("seeding test data") + seededKeys, err = utils.SeedTestData(clusterName, "default", numKeys, dataSize, seed) + Expect(err).NotTo(HaveOccurred(), "Failed to seed test data") + _, _ = fmt.Fprintf(GinkgoWriter, " Seeded keys: %d\n", seededKeys) + }) + + AfterEach(func() { + // Always remove CPU pressure to avoid leaving nodes throttled + if cpuPressure { + utils.UnthrottleNodes(workerNodes) + } + + if CurrentSpecReport().Failed() { + // Dump keyspace counts + if total, perShard, err := utils.GetTotalKeyCount(clusterName, "default"); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "KEY COUNT at failure: total=%d, per-shard=%v\n", total, perShard) + } + + // Dump cluster-specific state + By("collecting chaos cluster debug info") + cmd := exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", clusterName), "-o", "wide") + if output, err := utils.Run(cmd); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "Cluster pods:\n%s\n", output) + } + cmd = exec.Command("kubectl", "get", "valkeycluster", clusterName, "-o", "yaml") + if output, err := utils.Run(cmd); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "ValkeyCluster status:\n%s\n", output) + } + cmd = exec.Command("kubectl", "get", "valkeynodes", "-l", + fmt.Sprintf("valkey.io/cluster=%s", clusterName), "-o", "wide") + if output, err := utils.Run(cmd); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "ValkeyNodes:\n%s\n", output) + } + + // Collect logs and CLUSTER NODES from all valkey node pods + cmd = exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + if podList, err := utils.Run(cmd); err == nil { + for _, pod := range utils.GetNonEmptyLines(podList) { + cmd = exec.Command("kubectl", "exec", pod, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + if output, err := utils.Run(cmd); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "CLUSTER NODES from %s:\n%s\n", pod, output) + } + cmd = exec.Command("kubectl", "logs", pod, "-c", "server", "--tail=100") + if logs, err := utils.Run(cmd); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, "Logs for %s:\n%s\n", pod, logs) + } + } + } + + // Controller logs and K8s events + utils.CollectDebugInfo(namespace) + } + }) + + AfterAll(func() { + By("cleaning up chaos cluster") + cmd := exec.Command("kubectl", "delete", "valkeycluster", clusterName, "--ignore-not-found=true") + _, _ = utils.Run(cmd) + }) + + It("runs fault injection until failure", func() { + scenarioCount := make(map[string]int) + + for iteration := 1; ; iteration++ { + var scenario Scenario + if mode == "sequential" { + scenario = scenarios[(iteration-1)%len(scenarios)] + } else { + scenario = scenarios[rnd.Intn(len(scenarios))] + } + + shard := 0 + if targetShard == "random" { + shard = rnd.Intn(shards) + } else { + shard, _ = strconv.Atoi(targetShard) + } + + _, _ = fmt.Fprintf(GinkgoWriter, "\n--- Iteration %d: scenario=%s ---\n", + iteration, scenario.Name) + + logClusterState(clusterName, "default", "before") + + // Log per-shard key counts before scenario + if _, perShard, err := utils.GetTotalKeyCount(clusterName, "default"); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, " KEY COUNT before: %v\n", perShard) + } + + // Mark iteration start in all valkey node logs + logMsg := fmt.Sprintf("CHAOS-TEST: iteration %d scenario=%s", iteration, scenario.Name) + cmd := exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + if podList, err := utils.Run(cmd); err == nil { + for _, pod := range utils.GetNonEmptyLines(podList) { + cmd = exec.Command("kubectl", "exec", pod, "-c", "server", "--", + "valkey-cli", "EVAL", fmt.Sprintf("return server.log(server.LOG_WARNING, '%s')", logMsg), "0") + _, _ = utils.Run(cmd) + } + } + + ctx := &ChaosContext{ + ClusterName: clusterName, + Namespace: "default", + WorkloadType: workloadType, + TargetShard: shard, + Shards: shards, + MinShards: minShards, + MaxShards: maxShards, + Replicas: replicas, + TolerationSec: tolerationSec, + Rand: rnd, + } + + // Apply random CPU pressure to Kind worker nodes + if cpuPressure { + utils.UnthrottleNodes(workerNodes) + throttledNodes = utils.ThrottleRandomWorkerNodes(rnd, workerNodes, cpuMin, cpuMax) + if len(throttledNodes) > 0 { + _, _ = fmt.Fprintf(GinkgoWriter, " CPU pressure on %v\n", throttledNodes) + } + } + + err := scenario.Inject(ctx) + if err != nil { + if strings.Contains(err.Error(), "skip:") { + _, _ = fmt.Fprintf(GinkgoWriter, " Skipped: %s\n", err) + utils.UnthrottleNodes(throttledNodes) + continue + } + Fail(fmt.Sprintf("Iteration %d: scenario %s failed to inject: %v", iteration, scenario.Name, err)) + } + + // Update shards/replicas in case a scale scenario changed them + shards = ctx.Shards + replicas = ctx.Replicas + + // Wait until CR is Ready, all pods are Running, and cluster health is ok. + By(fmt.Sprintf("Iteration %d: waiting for cluster recovery", iteration)) + Eventually(func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady), + fmt.Sprintf("cluster state: %s, reason: %s", cr.Status.State, cr.Status.Reason)) + g.Expect(cr.Status.ReadyShards).To(Equal(int32(shards))) + err = utils.VerifyK8sResources(clusterName, "default", workloadType, shards, replicas) + g.Expect(err).NotTo(HaveOccurred(), "K8s resources not ready: %v", err) + err = utils.VerifyClusterHealth(clusterName, "default", shards, replicas) + g.Expect(err).NotTo(HaveOccurred(), "cluster health: %v", err) + }, recoveryTimeout, 5*time.Second).Should(Succeed(), + fmt.Sprintf("Iteration %d: cluster did not recover after %s (scenario=%s, shard=%d, seed=%d)", + iteration, recoveryTimeout, scenario.Name, shard, seed)) + + // Remove CPU pressure after recovery + utils.UnthrottleNodes(throttledNodes) + + // Log cluster state after recovery + logClusterState(clusterName, "default", "after") + + if !scenario.losesData(replicas) { + By(fmt.Sprintf("Iteration %d: verifying test data integrity", iteration)) + Eventually(func() error { + return utils.VerifyTestData(clusterName, "default", seededKeys) + }, 60*time.Second).Should(Succeed(), + fmt.Sprintf("Iteration %d: data integrity check failed (seed=%d)", iteration, seed)) + } else { + By(fmt.Sprintf("Iteration %d: re-seeding test data after data-loss scenario", iteration)) + err := utils.FlushAll(clusterName, "default") + Expect(err).NotTo(HaveOccurred(), "Failed to flush data") + seededKeys, err = utils.SeedTestData(clusterName, "default", numKeys, dataSize, seed) + Expect(err).NotTo(HaveOccurred(), "Failed to re-seed test data") + _, _ = fmt.Fprintf(GinkgoWriter, " Seeded keys: %d\n", seededKeys) + } + + _, _ = fmt.Fprintf(GinkgoWriter, " Iteration %d: PASSED\n", iteration) + + // Print statistics every 10th iteration + scenarioCount[scenario.Name]++ + if len(scenarios) > 1 && iteration%10 == 0 { + _, _ = fmt.Fprintf(GinkgoWriter, "\n=== Scenario Statistics (after %d iterations) ===\n", iteration) + for _, s := range scenarios { + _, _ = fmt.Fprintf(GinkgoWriter, " %-30s %d\n", s.Name, scenarioCount[s.Name]) + } + _, _ = fmt.Fprintf(GinkgoWriter, "=================================================\n") + } + } + }) +}) + +// Fault scenario implementations + +func deletePrimaryPod(ctx *ChaosContext) error { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod: %s\n", pod) + return utils.DeletePod(pod, ctx.Namespace) +} + +func deleteReplicaPod(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica pod: %s\n", pod) + return utils.DeletePod(pod, ctx.Namespace) +} + +func deleteShardPods(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + primary, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + replica, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod %s and replica pod %s simultaneously\n", primary, replica) + // Delete both without waiting so they're removed at the same time + if err := utils.DeletePod(primary, ctx.Namespace); err != nil { + return err + } + return utils.DeletePod(replica, ctx.Namespace) +} + +func deleteMultipleShardPods(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + // Pick a random number of shards to kill: [1, ctx.Shards] + count := ctx.Rand.Intn(ctx.Shards) + 1 + // Pick which shards to target + shardIndices := ctx.Rand.Perm(ctx.Shards)[:count] + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting all pods for %d shards of %d %v\n", count, ctx.Shards, shardIndices) + + // Collect all pod names first, before deleting any + var pods []string + for _, shard := range shardIndices { + primary, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + replica, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + pods = append(pods, primary, replica) + } + + // Delete all collected pods + for _, pod := range pods { + if err := utils.DeletePod(pod, ctx.Namespace); err != nil { + return err + } + } + return nil +} + +func deletePrimaryWorkload(ctx *ChaosContext) error { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary %s: %s\n", ctx.WorkloadType, workload) + return utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType) +} + +func deleteReplicaWorkload(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica %s: %s\n", ctx.WorkloadType, workload) + return utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType) +} + +func networkPartitionPrimary(ctx *ChaosContext) error { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + // Without tolerations: 3-5s, enough to trigger failover. + // With tolerations: extends up to eviction threshold + 20s to also test pod rescheduling. + maxDuration := 5 * time.Second + if ctx.TolerationSec > 0 { + evictionThreshold := 40*time.Second + time.Duration(ctx.TolerationSec)*time.Second + maxDuration = evictionThreshold + 20*time.Second + } + duration := randomDuration(ctx.Rand, 3*time.Second, maxDuration) + _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning node %s (primary pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Millisecond)) + logIfControllerNode(nodeName) + if err := utils.PartitionNode(nodeName); err != nil { + return err + } + time.Sleep(duration) + _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) + return utils.HealNode(nodeName) +} + +func networkPartitionReplica(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + // Without tolerations: 3-5s, enough to trigger failover. + // With tolerations: extends up to eviction threshold + 20s to also test pod rescheduling. + maxDuration := 5 * time.Second + if ctx.TolerationSec > 0 { + evictionThreshold := 40*time.Second + time.Duration(ctx.TolerationSec)*time.Second + maxDuration = evictionThreshold + 20*time.Second + } + duration := randomDuration(ctx.Rand, 3*time.Second, maxDuration) + _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning node %s (replica pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Millisecond)) + logIfControllerNode(nodeName) + if err := utils.PartitionNode(nodeName); err != nil { + return err + } + time.Sleep(duration) + _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) + return utils.HealNode(nodeName) +} + +func pausePrimaryContainer(ctx *ChaosContext) error { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases + duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing primary container in pod: %s for %s\n", pod, duration.Truncate(time.Millisecond)) + if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { + return err + } + time.Sleep(duration) + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing primary container in pod: %s\n", pod) + return utils.UnpauseContainer(pod, ctx.Namespace) +} + +func pauseReplicaContainer(ctx *ChaosContext) error { + if ctx.Replicas == 0 { + return fmt.Errorf("skip: no replicas configured") + } + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases + duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing replica container in pod: %s for %s\n", pod, duration.Truncate(time.Millisecond)) + if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { + return err + } + time.Sleep(duration) + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing replica container in pod: %s\n", pod) + return utils.UnpauseContainer(pod, ctx.Namespace) +} + +func pauseWorkerNode(ctx *ChaosContext) error { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) + if err != nil { + return err + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + // Eviction threshold: 40s (node-monitor-grace) + tolerationSeconds. + // Range spans below and above threshold to cover both eviction and non-eviction cases. + evictionThreshold := 40*time.Second + time.Duration(ctx.TolerationSec)*time.Second + duration := randomDuration(ctx.Rand, 3*time.Second, evictionThreshold+30*time.Second) + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing Kind node %s (primary pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Second)) + logIfControllerNode(nodeName) + cmd := exec.Command("docker", "pause", nodeName) + if _, err := utils.Run(cmd); err != nil { + return err + } + time.Sleep(duration) + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing Kind node %s\n", nodeName) + cmd = exec.Command("docker", "unpause", nodeName) + _, err = utils.Run(cmd) + return err +} + +func scaleShards(ctx *ChaosContext) error { + newShards := ctx.MinShards + ctx.Rand.Intn(ctx.MaxShards-ctx.MinShards+1) + if newShards == ctx.Shards { + // Ensure we actually change something + if newShards < ctx.MaxShards { + newShards++ + } else { + newShards-- + } + } + _, _ = fmt.Fprintf(GinkgoWriter, " Scaling shards from %d to %d\n", ctx.Shards, newShards) + cmd := exec.Command("kubectl", "patch", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "--type=merge", + "-p", fmt.Sprintf(`{"spec":{"shards":%d}}`, newShards)) + if _, err := utils.Run(cmd); err != nil { + return err + } + ctx.Shards = newShards + return nil +} + +func deleteRecreateCluster(ctx *ChaosContext) error { + // Capture the current spec before deleting + cmd := exec.Command("kubectl", "get", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "-o", "jsonpath={.spec}") + spec, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("failed to get ValkeyCluster spec: %w", err) + } + + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting ValkeyCluster %s and waiting for removal\n", ctx.ClusterName) + cmd = exec.Command("kubectl", "delete", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "--wait=true", "--timeout=120s") + if _, err := utils.Run(cmd); err != nil { + return fmt.Errorf("failed to delete ValkeyCluster: %w", err) + } + + // Wait for all pods to be gone to ensure the new cluster won't have + // GetClusterState accidentally connect to a still-terminating old pod. + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", ctx.ClusterName), + "-n", ctx.Namespace, "-o", "jsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(strings.TrimSpace(output)).To(BeEmpty(), "pods still exist: %s", output) + }, 120*time.Second, 2*time.Second).Should(Succeed()) + + _, _ = fmt.Fprintf(GinkgoWriter, " Recreating ValkeyCluster %s with captured spec\n", ctx.ClusterName) + manifest := fmt.Sprintf(`{"apiVersion":"valkey.io/v1alpha1","kind":"ValkeyCluster","metadata":{"name":"%s","namespace":"%s"},"spec":%s}`, + ctx.ClusterName, ctx.Namespace, spec) + + cmd = exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(manifest) + if _, err := utils.Run(cmd); err != nil { + return fmt.Errorf("failed to recreate ValkeyCluster: %w", err) + } + return nil +} + +func rollingUpdate(ctx *ChaosContext) error { + // Toggle io-threads between 1 and 2 to trigger a restart-requiring config change. + cmd := exec.Command("kubectl", "get", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "-o", "jsonpath={.spec.config.io-threads}") + output, _ := utils.Run(cmd) + current := strings.TrimSpace(output) + next := "2" + if current == "2" { + next = "1" + } + + // Capture current pod UIDs to detect restarts. + cmd = exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", ctx.ClusterName), + "-n", ctx.Namespace, "-o", "jsonpath={range .items[*]}{.metadata.uid}{\"\\n\"}{end}") + uidsBefore, _ := utils.Run(cmd) + + _, _ = fmt.Fprintf(GinkgoWriter, " Patching config io-threads=%s (was %q)\n", next, current) + cmd = exec.Command("kubectl", "patch", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "--type=merge", + "-p", fmt.Sprintf(`{"spec":{"config":{"io-threads":"%s"}}}`, next)) + if _, err := utils.Run(cmd); err != nil { + return err + } + + // Wait for at least one pod to be replaced (new UID). + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", + fmt.Sprintf("valkey.io/cluster=%s", ctx.ClusterName), + "-n", ctx.Namespace, "-o", "jsonpath={range .items[*]}{.metadata.uid}{\"\\n\"}{end}") + uidsAfter, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(uidsAfter).NotTo(Equal(uidsBefore), "no pods restarted after config change") + }, 60*time.Second, 2*time.Second).Should(Succeed()) + + return nil +} + +func deleteControllerPod(_ *ChaosContext) error { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].metadata.name}") + podName, err := utils.Run(cmd) + if err != nil { + return err + } + podName = strings.TrimSpace(podName) + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting controller pod: %s\n", podName) + cmd = exec.Command("kubectl", "delete", "pod", podName, "-n", namespace, "--grace-period=0", "--force") + if _, err := utils.Run(cmd); err != nil { + return err + } + // Wait for the new controller pod to become Ready. + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].status.conditions[?(@.type==\"Ready\")].status}") + out, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(strings.TrimSpace(out)).To(Equal("True")) + }, 60*time.Second, 2*time.Second).Should(Succeed()) + return nil +} + +//---------------------------------------------------------- + +// logClusterState logs pod placement and CLUSTER NODES with a label (e.g. "before" or "after"). +func logClusterState(clusterName, namespace, label string) { + if output, err := utils.GetPodsWide(clusterName, namespace); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, " PODS %s:\n%s\n", label, output) + } + if output, err := utils.GetClusterNodes(clusterName, namespace); err == nil { + _, _ = fmt.Fprintf(GinkgoWriter, " CLUSTER NODES %s:\n%s", label, output) + } +} + +// randomDuration returns a random duration between min and max. +func randomDuration(rnd *rand.Rand, min, max time.Duration) time.Duration { + if max <= min { + return max + } + return min + time.Duration(rnd.Int63n(int64(max-min))) +} + +// getControllerNodeName returns the node hosting the controller-manager pod. +func getControllerNodeName() string { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].spec.nodeName}") + out, err := utils.Run(cmd) + if err != nil { + return "" + } + return strings.TrimSpace(out) +} + +// logIfControllerNode emits a warning if the target node hosts the controller-manager. +func logIfControllerNode(nodeName string) { + if nodeName == getControllerNodeName() { + _, _ = fmt.Fprintf(GinkgoWriter, " WARNING: target node %s hosts the controller-manager; operator will be disrupted\n", nodeName) + } +} + +// Helper functions for configuration parsing + +func envOrDefault(key, defaultVal string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultVal +} + +func envBool(key string, defaultVal bool) bool { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + b, err := strconv.ParseBool(v) + if err != nil { + Fail(fmt.Sprintf("%s=%q is not a valid boolean", key, v)) + } + return b +} + +func envOneOfOrInt(key, defaultVal string, valid []string) string { + v := envOrDefault(key, defaultVal) + for _, opt := range valid { + if v == opt { + return v + } + } + if _, err := strconv.Atoi(v); err == nil { + return v + } + Fail(fmt.Sprintf("%s=%q is invalid, must be one of %v or an integer", key, v, valid)) + return "" +} + +func envOneOf(key, defaultVal string, valid []string) string { + v := envOrDefault(key, defaultVal) + for _, opt := range valid { + if v == opt { + return v + } + } + Fail(fmt.Sprintf("%s=%q is invalid, must be one of %v", key, v, valid)) + return "" +} + +func envIntOrDefault(key string, defaultVal int, minVal ...int) int { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + i, err := strconv.Atoi(v) + if err != nil { + Fail(fmt.Sprintf("%s=%q is not a valid integer", key, v)) + } + if len(minVal) > 0 && i < minVal[0] { + Fail(fmt.Sprintf("%s=%d must be >= %d", key, i, minVal[0])) + } + return i +} + +func envInt64OrDefault(key string, defaultVal int64) int64 { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + i, err := strconv.ParseInt(v, 10, 64) + if err != nil { + Fail(fmt.Sprintf("%s=%q is not a valid integer", key, v)) + } + return i +} + +func envDurationOrDefault(key string, defaultVal time.Duration) time.Duration { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + d, err := time.ParseDuration(v) + if err != nil { + Fail(fmt.Sprintf("%s=%q is not a valid duration (e.g. 30s, 5m)", key, v)) + } + return d +} + +func filterScenarios(all []Scenario, filter string) []Scenario { + if filter == "" { + var result []Scenario + for _, s := range all { + if !s.DisabledByDefault { + result = append(result, s) + } + } + return result + } + requested := make(map[string]bool) + for _, name := range strings.Split(filter, ",") { + requested[strings.TrimSpace(name)] = true + } + var result []Scenario + for _, s := range all { + if requested[s.Name] { + result = append(result, s) + delete(requested, s.Name) + } + } + for name := range requested { + Fail(fmt.Sprintf("CHAOS_SCENARIOS contains unknown scenario: %q", name)) + } + return result +} + +func envFloat64OrDefault(key string, defaultVal float64, minVal float64) float64 { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + f, err := strconv.ParseFloat(v, 64) + if err != nil { + Fail(fmt.Sprintf("%s=%q is not a valid float", key, v)) + } + if f < minVal { + Fail(fmt.Sprintf("%s=%.2f must be >= %.2f", key, f, minVal)) + } + return f +} diff --git a/test/utils/chaos.go b/test/utils/chaos.go new file mode 100644 index 00000000..4c633c44 --- /dev/null +++ b/test/utils/chaos.go @@ -0,0 +1,659 @@ +//go:build chaos +// +build chaos + +/* +Copyright 2026 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "math/rand" + "os/exec" + "strconv" + "strings" +) + +// DeletePod deletes a pod by name in the given namespace. +func DeletePod(name, namespace string) error { + cmd := exec.Command("kubectl", "delete", "pod", name, "-n", namespace, "--grace-period=0", "--force") + _, err := Run(cmd) + return err +} + +// GetPodNameByLabels returns the first pod name matching the given labels. +func GetPodNameByLabels(namespace string, labels map[string]string) (string, error) { + selector := labelsToSelector(labels) + cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, + "-l", selector, "-o", "jsonpath={.items[0].metadata.name}") + output, err := Run(cmd) + if err != nil { + return "", err + } + return strings.TrimSpace(output), nil +} + +// DeleteWorkload deletes a StatefulSet or Deployment by name. +func DeleteWorkload(name, namespace, kind string) error { + cmd := exec.Command("kubectl", "delete", strings.ToLower(kind), name, "-n", namespace, "--wait=false") + _, err := Run(cmd) + return err +} + +// GetClusterNodes returns the CLUSTER NODES output from any pod in the cluster. +func GetClusterNodes(clusterName, namespace string) (string, error) { + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return "", err + } + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + return Run(cmd) +} + +func GetPodsWide(clusterName, namespace string) (string, error) { + cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, "-l", + fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "wide", "--no-headers") + return Run(cmd) +} + +// GetShardPrimaryPod queries CLUSTER NODES to find the actual primary pod for a shard. +func GetShardPrimaryPod(clusterName, namespace string, shardIndex int) (string, error) { + // Get any pod from the cluster to query CLUSTER NODES + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return "", fmt.Errorf("failed to get any pod for cluster: %w", err) + } + + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + output, err := Run(cmd) + if err != nil { + return "", fmt.Errorf("failed to run CLUSTER NODES: %w", err) + } + + // Parse CLUSTER NODES output to find primaries with slots + primaries := parsePrimariesFromClusterNodes(output) + if shardIndex >= len(primaries) { + return "", fmt.Errorf("shard index %d out of range (found %d primaries)", shardIndex, len(primaries)) + } + + ip := primaries[shardIndex] + return GetPodByIP(namespace, ip) +} + +// GetShardReplicaPod queries CLUSTER NODES to find a replica pod for a shard. +func GetShardReplicaPod(clusterName, namespace string, shardIndex int) (string, error) { + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return "", fmt.Errorf("failed to get any pod for cluster: %w", err) + } + + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + output, err := Run(cmd) + if err != nil { + return "", fmt.Errorf("failed to run CLUSTER NODES: %w", err) + } + + primaries := parsePrimariesFromClusterNodes(output) + if shardIndex >= len(primaries) { + return "", fmt.Errorf("shard index %d out of range", shardIndex) + } + + // Find the primary's node ID, then find a replica of that primary + primaryIP := primaries[shardIndex] + replicaIP, err := findReplicaOfPrimary(output, primaryIP) + if err != nil { + return "", err + } + + return GetPodByIP(namespace, replicaIP) +} + +// GetPodByIP finds a pod name by its IP address. +func GetPodByIP(namespace, ip string) (string, error) { + cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, + "--field-selector", fmt.Sprintf("status.podIP=%s", ip), + "-o", "jsonpath={.items[0].metadata.name}") + output, err := Run(cmd) + if err != nil { + return "", fmt.Errorf("failed to find pod with IP %s: %w", ip, err) + } + name := strings.TrimSpace(output) + if name == "" { + return "", fmt.Errorf("no pod found with IP %s", ip) + } + return name, nil +} + +// GetWorkloadForPod returns the owning workload name and kind for a pod. +func GetWorkloadForPod(podName, namespace, workloadType string) (string, error) { + // Get the pod's owner reference + jsonpath := "{.metadata.ownerReferences[0].name}" + if strings.EqualFold(workloadType, "statefulset") { + // For StatefulSet, pod owner is the StatefulSet directly + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", namespace, + "-o", "jsonpath="+jsonpath) + output, err := Run(cmd) + if err != nil { + return "", err + } + return strings.TrimSpace(output), nil + } + // For Deployment, pod owner is a ReplicaSet; get the RS owner + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", namespace, + "-o", "jsonpath="+jsonpath) + rsName, err := Run(cmd) + if err != nil { + return "", err + } + cmd = exec.Command("kubectl", "get", "replicaset", strings.TrimSpace(rsName), "-n", namespace, + "-o", "jsonpath="+jsonpath) + output, err := Run(cmd) + if err != nil { + return "", err + } + return strings.TrimSpace(output), nil +} + +// VerifyClusterHealth checks that all pods report cluster_state:ok, +// the topology has no stale nodes, correct node count, and no shard merges. +func VerifyClusterHealth(clusterName, namespace string, shards, replicas int) error { + expectedNodes := shards * (1 + replicas) + cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err := Run(cmd) + if err != nil { + return err + } + + pods := GetNonEmptyLines(output) + if len(pods) != expectedNodes { + return fmt.Errorf("expected %d pods, got %d", expectedNodes, len(pods)) + } + + for _, pod := range pods { + cmd = exec.Command("kubectl", "exec", pod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "INFO") + info, err := Run(cmd) + if err != nil { + return fmt.Errorf("CLUSTER INFO failed on %s: %w", pod, err) + } + if !strings.Contains(info, "cluster_state:ok") { + return fmt.Errorf("pod %s reports cluster_state not ok: %s", pod, info) + } + + cmd = exec.Command("kubectl", "exec", pod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + nodes, err := Run(cmd) + if err != nil { + return fmt.Errorf("CLUSTER NODES failed on %s: %w", pod, err) + } + if err := verifyClusterNodesOutput(nodes, shards, replicas, pod); err != nil { + return err + } + } + return nil +} + +func verifyClusterNodesOutput(output string, shards, replicas int, pod string) error { + expectedNodes := shards * (1 + replicas) + + var healthy int + replicasOf := map[string]int{} // primary node ID to replica count + for line := range strings.SplitSeq(strings.TrimSpace(output), "\n") { + fields := strings.Fields(line) + if len(fields) < 8 { + continue + } + flags := fields[2] + if strings.Contains(flags, "fail") || strings.Contains(flags, "noaddr") { + return fmt.Errorf("[%s] stale node in topology: %s (flags=%s)\nCLUSTER NODES:\n%s", pod, fields[1], flags, output) + } + healthy++ + if strings.Contains(flags, "slave") { + primaryId := fields[3] + replicasOf[primaryId]++ + } + } + if healthy != expectedNodes { + return fmt.Errorf("[%s] expected %d nodes in topology, got %d\nCLUSTER NODES:\n%s", + pod, expectedNodes, healthy, output) + } + for primaryId, count := range replicasOf { + if count != replicas { + return fmt.Errorf("[%s] primary %s has %d replicas (expected %d)\nCLUSTER NODES:\n%s", + pod, primaryId, count, replicas, output) + } + } + return nil +} + +// VerifyK8sResources verifies the correct number of pods, ValkeyNodes, and workloads exist. +func VerifyK8sResources(clusterName, namespace, workloadType string, shards, replicas int) error { + expectedTotal := shards * (1 + replicas) + + // Check pods + cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "--field-selector", "status.phase=Running", + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err := Run(cmd) + if err != nil { + return err + } + pods := GetNonEmptyLines(output) + if len(pods) != expectedTotal { + return fmt.Errorf("expected %d Running pods, got %d", expectedTotal, len(pods)) + } + + // Check ValkeyNodes + cmd = exec.Command("kubectl", "get", "valkeynodes", "-n", namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err = Run(cmd) + if err != nil { + return err + } + nodes := GetNonEmptyLines(output) + if len(nodes) != expectedTotal { + return fmt.Errorf("expected %d ValkeyNodes, got %d", expectedTotal, len(nodes)) + } + + // Check workloads + kind := "statefulsets" + if strings.EqualFold(workloadType, "deployment") { + kind = "deployments" + } + cmd = exec.Command("kubectl", "get", kind, "-n", namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err = Run(cmd) + if err != nil { + return err + } + workloads := GetNonEmptyLines(output) + if len(workloads) != expectedTotal { + return fmt.Errorf("expected %d %s, got %d", expectedTotal, kind, len(workloads)) + } + + return nil +} + +// FlushAll runs FLUSHALL on every primary in the cluster. +func FlushAll(clusterName, namespace string) error { + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return err + } + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + output, err := Run(cmd) + if err != nil { + return err + } + for _, ip := range parsePrimariesFromClusterNodes(output) { + pod, err := GetPodByIP(namespace, ip) + if err != nil { + return err + } + cmd = exec.Command("kubectl", "exec", pod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "FLUSHALL") + if _, err := Run(cmd); err != nil { + return fmt.Errorf("FLUSHALL failed on %s: %w", pod, err) + } + } + return nil +} + +// SeedTestData uses valkey-benchmark to write keys across the cluster. +// Returns the actual number of keys stored (from INFO keyspace). +func SeedTestData(clusterName, namespace string, numKeys, dataSize int, seed int64) (int, error) { + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return 0, err + } + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-benchmark", "-n", strconv.Itoa(numKeys), + "--cluster", "-r", strconv.Itoa(numKeys), + "-d", strconv.Itoa(dataSize), "-q", + "--seed", strconv.FormatInt(seed, 10), + "SET", "key:{tag}:__rand_int__", "__data__") + if _, err := Run(cmd); err != nil { + return 0, fmt.Errorf("valkey-benchmark failed: %w", err) + } + total, _, err := GetTotalKeyCount(clusterName, namespace) + return total, err +} + +// VerifyTestData checks that the total key count across all primaries matches expected. +func VerifyTestData(clusterName, namespace string, seededKeys int) error { + totalKeys, perShard, err := GetTotalKeyCount(clusterName, namespace) + if err != nil { + return fmt.Errorf("failed to get keyspace info: %w", err) + } + if totalKeys != seededKeys { + return fmt.Errorf( + "keyspace count mismatch: expected %d keys, INFO keyspace reports %d (per-shard: %v)", + seededKeys, totalKeys, perShard) + } + return nil +} + +// GetTotalKeyCount sums the key count from INFO keyspace across all primaries. +// Returns total keys and per-shard key counts. +func GetTotalKeyCount(clusterName, namespace string) (int, map[string]int, error) { + anyPod, err := GetPodNameByLabels(namespace, map[string]string{ + "valkey.io/cluster": clusterName, + }) + if err != nil { + return 0, nil, err + } + + cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "CLUSTER", "NODES") + output, err := Run(cmd) + if err != nil { + return 0, nil, err + } + + primaryIPs := parsePrimariesFromClusterNodes(output) + total := 0 + perShard := make(map[string]int, len(primaryIPs)) + for _, ip := range primaryIPs { + pod, err := GetPodByIP(namespace, ip) + if err != nil { + return 0, nil, err + } + cmd = exec.Command("kubectl", "exec", pod, "-n", namespace, "-c", "server", "--", + "valkey-cli", "INFO", "keyspace") + info, err := Run(cmd) + if err != nil { + return 0, nil, fmt.Errorf("INFO keyspace failed on %s: %w", pod, err) + } + keys := parseKeysFromInfoKeyspace(info) + perShard[pod] = keys + total += keys + } + return total, perShard, nil +} + +// parseKeysFromInfoKeyspace parses INFO keyspace output and sums all db keys. +// Format: db0:keys=123,expires=0,avg_ttl=0 +func parseKeysFromInfoKeyspace(info string) int { + total := 0 + for line := range strings.SplitSeq(info, "\n") { + if !strings.HasPrefix(line, "db") { + continue + } + _, after, ok := strings.Cut(line, ":") + if !ok { + continue + } + for field := range strings.SplitSeq(after, ",") { + if v, ok := strings.CutPrefix(field, "keys="); ok { + n, _ := strconv.Atoi(v) + total += n + } + } + } + return total +} + +// parsePrimariesFromClusterNodes parses CLUSTER NODES output and returns primary IPs +// sorted by their slot ranges (shard 0 = lowest slots, etc.) +func parsePrimariesFromClusterNodes(output string) []string { + type primaryInfo struct { + ip string + slotStart int + } + var primaries []primaryInfo + + for line := range strings.SplitSeq(output, "\n") { + fields := strings.Fields(line) + if len(fields) < 9 { + continue + } + if !strings.Contains(fields[2], "master") { + continue + } + // fields[1] is ip:port@cport + ip, _, _ := strings.Cut(fields[1], ":") + // Slot info starts at field 8 + slotStart, _ := strconv.Atoi(strings.SplitN(fields[8], "-", 2)[0]) + primaries = append(primaries, primaryInfo{ip: ip, slotStart: slotStart}) + } + + // Sort by slot start to get consistent shard ordering + for i := 0; i < len(primaries); i++ { + for j := i + 1; j < len(primaries); j++ { + if primaries[j].slotStart < primaries[i].slotStart { + primaries[i], primaries[j] = primaries[j], primaries[i] + } + } + } + + result := make([]string, len(primaries)) + for i, p := range primaries { + result[i] = p.ip + } + return result +} + +// findReplicaOfPrimary finds a replica IP that replicates the primary at the given IP. +func findReplicaOfPrimary(clusterNodesOutput, primaryIP string) (string, error) { + // First find the node ID of the primary + var primaryNodeID string + for line := range strings.SplitSeq(clusterNodesOutput, "\n") { + fields := strings.Fields(line) + if len(fields) < 3 { + continue + } + ip, _, _ := strings.Cut(fields[1], ":") + if ip == primaryIP && strings.Contains(fields[2], "master") { + primaryNodeID = fields[0] + break + } + } + if primaryNodeID == "" { + return "", fmt.Errorf("could not find primary node ID for IP %s", primaryIP) + } + + // Find a replica that references this primary's node ID + for line := range strings.SplitSeq(clusterNodesOutput, "\n") { + fields := strings.Fields(line) + if len(fields) < 4 { + continue + } + if !strings.Contains(fields[2], "slave") { + continue + } + // fields[3] is the master node ID this replica follows + if fields[3] == primaryNodeID { + ip, _, _ := strings.Cut(fields[1], ":") + return ip, nil + } + } + return "", fmt.Errorf("no replica found for primary %s (nodeID=%s)", primaryIP, primaryNodeID) +} + +func labelsToSelector(labels map[string]string) string { + parts := make([]string, 0, len(labels)) + for k, v := range labels { + parts = append(parts, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(parts, ",") +} + +// GetPodNodeName returns the node name where a pod is running. +func GetPodNodeName(podName, namespace string) (string, error) { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", namespace, + "-o", "jsonpath={.spec.nodeName}") + output, err := Run(cmd) + if err != nil { + return "", err + } + return strings.TrimSpace(output), nil +} + +// PartitionNode adds iptables DROP rules to isolate a node from the cluster network. +func PartitionNode(nodeName string) error { + // In Kind, nodes are docker containers. We exec into the node container to add iptables rules. + // Block all network traffic to simulate a fully unreachable node. + // docker exec still works as it uses the Docker daemon socket, not the container's network. + rules := [][]string{ + {"iptables", "-A", "INPUT", "-j", "DROP"}, + {"iptables", "-A", "OUTPUT", "-j", "DROP"}, + } + for _, rule := range rules { + cmd := exec.Command("docker", append([]string{"exec", nodeName}, rule...)...) + if _, err := Run(cmd); err != nil { + return fmt.Errorf("failed to partition node %s: %w", nodeName, err) + } + } + return nil +} + +// HealNode removes iptables DROP rules to restore network connectivity. +func HealNode(nodeName string) error { + cmd := exec.Command("docker", "exec", nodeName, "iptables", "-F") + _, err := Run(cmd) + return err +} + +// PauseContainer pauses the valkey container in a pod using ctr on the Kind node. +func PauseContainer(podName, namespace string) error { + containerID, err := GetContainerID(podName, namespace) + if err != nil { + return err + } + nodeName, err := GetPodNodeName(podName, namespace) + if err != nil { + return err + } + cmd := exec.Command("docker", "exec", nodeName, "ctr", "-n", "k8s.io", "task", "pause", containerID) + _, err = Run(cmd) + return err +} + +// UnpauseContainer unpauses a previously paused container. +func UnpauseContainer(podName, namespace string) error { + containerID, err := GetContainerID(podName, namespace) + if err != nil { + return err + } + nodeName, err := GetPodNodeName(podName, namespace) + if err != nil { + return err + } + cmd := exec.Command("docker", "exec", nodeName, "ctr", "-n", "k8s.io", "task", "resume", containerID) + _, err = Run(cmd) + return err +} + +// GetContainerID returns the docker container ID for the server container in a pod. +func GetContainerID(podName, namespace string) (string, error) { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", namespace, + "-o", "jsonpath={.status.containerStatuses[?(@.name=='server')].containerID}") + output, err := Run(cmd) + if err != nil { + return "", err + } + // containerID format: containerd://abc123 or docker://abc123 + id := strings.TrimSpace(output) + if idx := strings.Index(id, "://"); idx >= 0 { + id = id[idx+3:] + } + return id, nil +} + +// GetWorkerNodes returns the names of all worker nodes in the cluster. +func GetWorkerNodes() []string { + cmd := exec.Command("kubectl", "get", "nodes", + "--selector=!node-role.kubernetes.io/control-plane", + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err := Run(cmd) + if err != nil { + return nil + } + return GetNonEmptyLines(output) +} + +// ThrottleNodes applies a CPU limit to the given Docker containers. +func ThrottleNodes(nodes []string, cpus float64) []string { + var throttled []string + for _, node := range nodes { + cmd := exec.Command("docker", "update", "--cpus", fmt.Sprintf("%.2f", cpus), node) + if _, err := Run(cmd); err != nil { + continue + } + throttled = append(throttled, node) + } + return throttled +} + +var hostCPUs string + +func getHostCPUs() string { + if hostCPUs == "" { + cmd := exec.Command("docker", "info", "--format", "{{.NCPU}}") + output, err := Run(cmd) + if err == nil { + hostCPUs = strings.TrimSpace(output) + } + } + return hostCPUs +} + +// UnthrottleNodes removes CPU limits from the given Docker containers. +// docker update --cpus 0 is a no-op; we must set cpus to the host max. +func UnthrottleNodes(nodes []string) { + cpus := getHostCPUs() + for _, node := range nodes { + cmd := exec.Command("docker", "update", "--cpus", cpus, node) + _, _ = Run(cmd) + } +} + +// ThrottleRandomWorkerNodes picks a random subset of the given nodes and applies a random CPU limit per node. +func ThrottleRandomWorkerNodes(rnd *rand.Rand, nodes []string, cpuMin, cpuMax float64) []string { + if len(nodes) == 0 { + return nil + } + count := 1 + rnd.Intn(len(nodes)) + perm := rnd.Perm(len(nodes)) + var throttled []string + for i := 0; i < count; i++ { + cpus := cpuMin + rnd.Float64()*(cpuMax-cpuMin) + if result := ThrottleNodes([]string{nodes[perm[i]]}, cpus); len(result) > 0 { + throttled = append(throttled, result...) + } + } + return throttled +} diff --git a/test/utils/utils.go b/test/utils/utils.go index b23ebcfd..a0450d33 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -174,6 +174,7 @@ func GetProjectDir() (string, error) { return wd, fmt.Errorf("failed to get current working directory: %w", err) } wd = strings.ReplaceAll(wd, "/test/e2e", "") + wd = strings.ReplaceAll(wd, "/test/chaos", "") return wd, nil } From b5ffb1a1cdc8a3510a6a940033fbb6ca0f62d9ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 9 Jun 2026 23:45:05 +0200 Subject: [PATCH 02/11] fixup: check existing kind worker nodes vs. set KIND_WORKERS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7d6ab478..b97aded5 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,13 @@ setup-test-e2e: ## Set up a Kind cluster for e2e tests if it does not exist } @case "$$($(KIND) get clusters)" in \ *"$(KIND_CLUSTER)"*) \ - echo "Kind cluster '$(KIND_CLUSTER)' already exists. Skipping creation." ;; \ + echo "Kind cluster '$(KIND_CLUSTER)' already exists. Skipping creation."; \ + ACTUAL_WORKERS=$$($(KIND) get nodes --name $(KIND_CLUSTER) | grep -c worker); \ + if [ "$$ACTUAL_WORKERS" != "$(KIND_WORKERS)" ]; then \ + echo "ERROR: Cluster has $$ACTUAL_WORKERS worker(s) but KIND_WORKERS=$(KIND_WORKERS) was requested."; \ + echo " Delete the cluster first: kind delete cluster --name $(KIND_CLUSTER)"; \ + exit 1; \ + fi ;; \ *) \ echo "Creating Kind cluster '$(KIND_CLUSTER)' with $(KIND_WORKERS) workers..."; \ echo '{"kind":"Cluster","apiVersion":"kind.x-k8s.io/v1alpha4","nodes":[{"role":"control-plane"}' \ From a2b45b1668069b50ab5e2518ca0c71fc33dcd0ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 9 Jun 2026 23:49:11 +0200 Subject: [PATCH 03/11] fixup: replace CHAOS_TARGET_SHARD with CHAOS_TARGET_SHARDS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All scenarios now accept multiple target shards via CHAOS_TARGET_SHARDS (comma-separated indices, "all", or "random"). This removes the delete-multiple-shard-pods scenario by merging it into delete-shard-pods, and enables multi-shard testing for all fault injection scenarios. Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 16 +- test/chaos/chaos_test.go | 338 +++++++++++++++++++++++---------------- 2 files changed, 209 insertions(+), 145 deletions(-) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index a080be01..41b467e0 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -53,7 +53,7 @@ All configuration is via environment variables: | `CHAOS_SCENARIOS` | all except disabled | Comma-separated list of scenarios to run | | `CHAOS_SEED` | current time | Random seed for reproducibility | | `CHAOS_MODE` | `random` | `random` or `sequential` scenario selection | -| `CHAOS_TARGET_SHARD` | `random` | Shard index to target, or `random` | +| `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random`, `all`, or comma-separated indices (e.g. `0,2`) | | `CHAOS_RECOVERY_TIMEOUT` | `5m` | Max time to wait for cluster recovery | | `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | | `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | @@ -66,10 +66,9 @@ All configuration is via environment variables: | Scenario | Description | |----------|-------------| -| `delete-primary-pod` | Deletes the primary pod of a shard | -| `delete-replica-pod` | Deletes a replica pod | -| `delete-shard-pods` | Deletes all pods in a shard | -| `delete-multiple-shard-pods` | Deletes pods across multiple shards | +| `delete-primary-pod` | Deletes the primary pod of targeted shards | +| `delete-replica-pod` | Deletes a replica pod of targeted shards | +| `delete-shard-pods` | Deletes all pods in targeted shards | | `delete-primary-workload` | Deletes the primary's Deployment/StatefulSet | | `delete-replica-workload` | Deletes a replica's Deployment/StatefulSet | | `pause-primary-container` | Pauses the primary container | @@ -137,4 +136,11 @@ CHAOS_SHARDS=7 CHAOS_REPLICAS=2 KIND_WORKERS=3 make test-chaos # Run scenarios in sequence instead of random CHAOS_SCENARIOS=delete-primary-pod,scale-shards CHAOS_MODE=sequential make test-chaos + +# Kill primaries on specific shards (triggers quorum loss + TAKEOVER) +CHAOS_SHARDS=3 CHAOS_REPLICAS=1 CHAOS_TARGET_SHARDS=0,1 \ + CHAOS_SCENARIOS=delete-primary-pod make test-chaos + +# Delete all pods in all shards +CHAOS_TARGET_SHARDS=all CHAOS_SCENARIOS=delete-shard-pods make test-chaos ``` diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index 7625b475..7563ecda 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -38,6 +38,7 @@ import ( "math/rand" "os" "os/exec" + "slices" "strconv" "strings" "time" @@ -56,7 +57,7 @@ type ChaosContext struct { ClusterName string Namespace string WorkloadType string - TargetShard int + TargetShards []int Shards int MinShards int MaxShards int @@ -82,7 +83,6 @@ var allScenarios = []Scenario{ {Name: "delete-primary-pod", LosesDataIfNoReplica: true, Inject: deletePrimaryPod}, {Name: "delete-replica-pod", Inject: deleteReplicaPod}, {Name: "delete-shard-pods", LosesData: true, Inject: deleteShardPods}, - {Name: "delete-multiple-shard-pods", LosesData: true, Inject: deleteMultipleShardPods}, {Name: "delete-primary-workload", LosesDataIfNoReplica: true, Inject: deletePrimaryWorkload}, {Name: "delete-replica-workload", Inject: deleteReplicaWorkload}, {Name: "pause-primary-container", Inject: pausePrimaryContainer}, @@ -110,7 +110,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { seededKeys int recoveryTimeout time.Duration tolerationSec int - targetShard string + targetShards string mode string seed int64 rnd *rand.Rand @@ -132,7 +132,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { replicas = envIntOrDefault("CHAOS_REPLICAS", 1, 0 /* min */) numKeys = envIntOrDefault("CHAOS_NUM_KEYS", 100000, 0 /* min */) dataSize = envIntOrDefault("CHAOS_DATA_SIZE", 3, 1 /* min */) - targetShard = envOneOfOrInt("CHAOS_TARGET_SHARD", "random", []string{"random"}) + targetShards = envOrDefault("CHAOS_TARGET_SHARDS", "random") mode = envOneOf("CHAOS_MODE", "random", []string{"random", "sequential"}) recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", 5*time.Minute) tolerationSec = envIntOrDefault("CHAOS_TOLERATION_SECONDS", 0, 0 /* min */) @@ -155,7 +155,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { _, _ = fmt.Fprintf(GinkgoWriter, " Replicas: %d\n", replicas) _, _ = fmt.Fprintf(GinkgoWriter, " NumKeys: %d\n", numKeys) _, _ = fmt.Fprintf(GinkgoWriter, " DataSize: %d\n", dataSize) - _, _ = fmt.Fprintf(GinkgoWriter, " TargetShard: %s\n", targetShard) + _, _ = fmt.Fprintf(GinkgoWriter, " TargetShards: %s\n", targetShards) _, _ = fmt.Fprintf(GinkgoWriter, " RecoveryTimeout: %s\n", recoveryTimeout) if tolerationSec > 0 { _, _ = fmt.Fprintf(GinkgoWriter, " Tolerations: not-ready=%ds, unreachable=%ds (default: 300s)\n", tolerationSec, tolerationSec) @@ -305,11 +305,21 @@ spec: scenario = scenarios[rnd.Intn(len(scenarios))] } - shard := 0 - if targetShard == "random" { - shard = rnd.Intn(shards) - } else { - shard, _ = strconv.Atoi(targetShard) + var targetShardsForIteration []int + switch targetShards { + case "random": + targetShardsForIteration = []int{rnd.Intn(shards)} + case "all": + targetShardsForIteration = make([]int, shards) + for i := range shards { + targetShardsForIteration[i] = i + } + default: + for _, s := range strings.Split(targetShards, ",") { + if v, err := strconv.Atoi(strings.TrimSpace(s)); err == nil { + targetShardsForIteration = append(targetShardsForIteration, v) + } + } } _, _ = fmt.Fprintf(GinkgoWriter, "\n--- Iteration %d: scenario=%s ---\n", @@ -339,7 +349,7 @@ spec: ClusterName: clusterName, Namespace: "default", WorkloadType: workloadType, - TargetShard: shard, + TargetShards: targetShardsForIteration, Shards: shards, MinShards: minShards, MaxShards: maxShards, @@ -384,8 +394,8 @@ spec: err = utils.VerifyClusterHealth(clusterName, "default", shards, replicas) g.Expect(err).NotTo(HaveOccurred(), "cluster health: %v", err) }, recoveryTimeout, 5*time.Second).Should(Succeed(), - fmt.Sprintf("Iteration %d: cluster did not recover after %s (scenario=%s, shard=%d, seed=%d)", - iteration, recoveryTimeout, scenario.Name, shard, seed)) + fmt.Sprintf("Iteration %d: cluster did not recover after %s (scenario=%s, shards=%v, seed=%d)", + iteration, recoveryTimeout, scenario.Name, targetShardsForIteration, seed)) // Remove CPU pressure after recovery utils.UnthrottleNodes(throttledNodes) @@ -426,71 +436,53 @@ spec: // Fault scenario implementations func deletePrimaryPod(ctx *ChaosContext) error { - pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod: %s (shard %d)\n", pod, shard) + if err := utils.DeletePod(pod, ctx.Namespace); err != nil { + return err + } } - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod: %s\n", pod) - return utils.DeletePod(pod, ctx.Namespace) + return nil } func deleteReplicaPod(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } - pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return fmt.Errorf("skip: %w", err) + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica pod: %s (shard %d)\n", pod, shard) + if err := utils.DeletePod(pod, ctx.Namespace); err != nil { + return err + } } - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica pod: %s\n", pod) - return utils.DeletePod(pod, ctx.Namespace) + return nil } func deleteShardPods(ctx *ChaosContext) error { - if ctx.Replicas == 0 { - return fmt.Errorf("skip: no replicas configured") - } - primary, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err - } - replica, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return fmt.Errorf("skip: %w", err) - } - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod %s and replica pod %s simultaneously\n", primary, replica) - // Delete both without waiting so they're removed at the same time - if err := utils.DeletePod(primary, ctx.Namespace); err != nil { - return err - } - return utils.DeletePod(replica, ctx.Namespace) -} + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting all pods for shards %v\n", ctx.TargetShards) -func deleteMultipleShardPods(ctx *ChaosContext) error { - if ctx.Replicas == 0 { - return fmt.Errorf("skip: no replicas configured") - } - // Pick a random number of shards to kill: [1, ctx.Shards] - count := ctx.Rand.Intn(ctx.Shards) + 1 - // Pick which shards to target - shardIndices := ctx.Rand.Perm(ctx.Shards)[:count] - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting all pods for %d shards of %d %v\n", count, ctx.Shards, shardIndices) - - // Collect all pod names first, before deleting any var pods []string - for _, shard := range shardIndices { - primary, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + for _, shard := range ctx.TargetShards { + cmd := exec.Command("kubectl", "get", "pods", "-n", ctx.Namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s,valkey.io/shard-index=%d", ctx.ClusterName, shard), + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}") + output, err := utils.Run(cmd) if err != nil { return err } - replica, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) - if err != nil { - return fmt.Errorf("skip: %w", err) + for _, pod := range utils.GetNonEmptyLines(output) { + pods = append(pods, pod) } - pods = append(pods, primary, replica) } - // Delete all collected pods for _, pod := range pods { if err := utils.DeletePod(pod, ctx.Namespace); err != nil { return err @@ -500,43 +492,45 @@ func deleteMultipleShardPods(ctx *ChaosContext) error { } func deletePrimaryWorkload(ctx *ChaosContext) error { - pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err - } - workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) - if err != nil { - return err + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary %s: %s (shard %d)\n", ctx.WorkloadType, workload, shard) + if err := utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType); err != nil { + return err + } } - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary %s: %s\n", ctx.WorkloadType, workload) - return utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType) + return nil } func deleteReplicaWorkload(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } - pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return fmt.Errorf("skip: %w", err) - } - workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) - if err != nil { - return err + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + workload, err := utils.GetWorkloadForPod(pod, ctx.Namespace, ctx.WorkloadType) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica %s: %s (shard %d)\n", ctx.WorkloadType, workload, shard) + if err := utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType); err != nil { + return err + } } - _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica %s: %s\n", ctx.WorkloadType, workload) - return utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType) + return nil } func networkPartitionPrimary(ctx *ChaosContext) error { - pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err - } - nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) - if err != nil { - return err - } // Without tolerations: 3-5s, enough to trigger failover. // With tolerations: extends up to eviction threshold + 20s to also test pod rescheduling. maxDuration := 5 * time.Second @@ -545,28 +539,43 @@ func networkPartitionPrimary(ctx *ChaosContext) error { maxDuration = evictionThreshold + 20*time.Second } duration := randomDuration(ctx.Rand, 3*time.Second, maxDuration) - _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning node %s (primary pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Millisecond)) - logIfControllerNode(nodeName) - if err := utils.PartitionNode(nodeName); err != nil { - return err + var nodes []string + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + if slices.Contains(nodes, nodeName) { + continue + } + _, _ = fmt.Fprintf(GinkgoWriter, " Will partition node %s (primary pod: %s, shard %d)\n", nodeName, pod, shard) + logIfControllerNode(nodeName) + nodes = append(nodes, nodeName) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning %d node(s) for %s\n", len(nodes), duration.Truncate(time.Millisecond)) + for _, nodeName := range nodes { + if err := utils.PartitionNode(nodeName); err != nil { + return err + } } time.Sleep(duration) - _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) - return utils.HealNode(nodeName) + for _, nodeName := range nodes { + _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) + if err := utils.HealNode(nodeName); err != nil { + return err + } + } + return nil } func networkPartitionReplica(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } - pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return fmt.Errorf("skip: %w", err) - } - nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) - if err != nil { - return err - } // Without tolerations: 3-5s, enough to trigger failover. // With tolerations: extends up to eviction threshold + 20s to also test pod rescheduling. maxDuration := 5 * time.Second @@ -575,75 +584,124 @@ func networkPartitionReplica(ctx *ChaosContext) error { maxDuration = evictionThreshold + 20*time.Second } duration := randomDuration(ctx.Rand, 3*time.Second, maxDuration) - _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning node %s (replica pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Millisecond)) - logIfControllerNode(nodeName) - if err := utils.PartitionNode(nodeName); err != nil { - return err + var nodes []string + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + if slices.Contains(nodes, nodeName) { + continue + } + _, _ = fmt.Fprintf(GinkgoWriter, " Will partition node %s (replica pod: %s, shard %d)\n", nodeName, pod, shard) + logIfControllerNode(nodeName) + nodes = append(nodes, nodeName) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Partitioning %d node(s) for %s\n", len(nodes), duration.Truncate(time.Millisecond)) + for _, nodeName := range nodes { + if err := utils.PartitionNode(nodeName); err != nil { + return err + } } time.Sleep(duration) - _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) - return utils.HealNode(nodeName) + for _, nodeName := range nodes { + _, _ = fmt.Fprintf(GinkgoWriter, " Healing node %s\n", nodeName) + if err := utils.HealNode(nodeName); err != nil { + return err + } + } + return nil } func pausePrimaryContainer(ctx *ChaosContext) error { - pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err - } // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) - _, _ = fmt.Fprintf(GinkgoWriter, " Pausing primary container in pod: %s for %s\n", pod, duration.Truncate(time.Millisecond)) - if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { - return err + var paused []string + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing primary container in pod: %s (shard %d) for %s\n", pod, shard, duration.Truncate(time.Millisecond)) + if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { + return err + } + paused = append(paused, pod) } time.Sleep(duration) - _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing primary container in pod: %s\n", pod) - return utils.UnpauseContainer(pod, ctx.Namespace) + for _, pod := range paused { + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing primary container in pod: %s\n", pod) + if err := utils.UnpauseContainer(pod, ctx.Namespace); err != nil { + return err + } + } + return nil } func pauseReplicaContainer(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } - pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return fmt.Errorf("skip: %w", err) - } // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) - _, _ = fmt.Fprintf(GinkgoWriter, " Pausing replica container in pod: %s for %s\n", pod, duration.Truncate(time.Millisecond)) - if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { - return err + var paused []string + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return fmt.Errorf("skip: %w", err) + } + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing replica container in pod: %s (shard %d) for %s\n", pod, shard, duration.Truncate(time.Millisecond)) + if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { + return err + } + paused = append(paused, pod) } time.Sleep(duration) - _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing replica container in pod: %s\n", pod) - return utils.UnpauseContainer(pod, ctx.Namespace) + for _, pod := range paused { + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing replica container in pod: %s\n", pod) + if err := utils.UnpauseContainer(pod, ctx.Namespace); err != nil { + return err + } + } + return nil } func pauseWorkerNode(ctx *ChaosContext) error { - pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, ctx.TargetShard) - if err != nil { - return err - } - nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) - if err != nil { - return err - } // Eviction threshold: 40s (node-monitor-grace) + tolerationSeconds. // Range spans below and above threshold to cover both eviction and non-eviction cases. evictionThreshold := 40*time.Second + time.Duration(ctx.TolerationSec)*time.Second duration := randomDuration(ctx.Rand, 3*time.Second, evictionThreshold+30*time.Second) - _, _ = fmt.Fprintf(GinkgoWriter, " Pausing Kind node %s (primary pod: %s) for %s\n", nodeName, pod, duration.Truncate(time.Second)) - logIfControllerNode(nodeName) - cmd := exec.Command("docker", "pause", nodeName) - if _, err := utils.Run(cmd); err != nil { - return err + var paused []string + for _, shard := range ctx.TargetShards { + pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) + if err != nil { + return err + } + nodeName, err := utils.GetPodNodeName(pod, ctx.Namespace) + if err != nil { + return err + } + _, _ = fmt.Fprintf(GinkgoWriter, " Pausing Kind node %s (primary pod: %s, shard %d) for %s\n", nodeName, pod, shard, duration.Truncate(time.Second)) + logIfControllerNode(nodeName) + cmd := exec.Command("docker", "pause", nodeName) + if _, err := utils.Run(cmd); err != nil { + return err + } + paused = append(paused, nodeName) } time.Sleep(duration) - _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing Kind node %s\n", nodeName) - cmd = exec.Command("docker", "unpause", nodeName) - _, err = utils.Run(cmd) - return err + for _, nodeName := range paused { + _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing Kind node %s\n", nodeName) + cmd := exec.Command("docker", "unpause", nodeName) + if _, err := utils.Run(cmd); err != nil { + return err + } + } + return nil } func scaleShards(ctx *ChaosContext) error { From c5bb02e0c884565ecd7cb8efe19946542dbbfd38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Wed, 10 Jun 2026 14:49:23 +0200 Subject: [PATCH 04/11] fixup: random multi-shard targeting and collect-then-delete for chaos scenarios MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 18 +++++++++++++++++- test/chaos/chaos_test.go | 33 +++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index 41b467e0..5f41d6f1 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -53,7 +53,7 @@ All configuration is via environment variables: | `CHAOS_SCENARIOS` | all except disabled | Comma-separated list of scenarios to run | | `CHAOS_SEED` | current time | Random seed for reproducibility | | `CHAOS_MODE` | `random` | `random` or `sequential` scenario selection | -| `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random`, `all`, or comma-separated indices (e.g. `0,2`) | +| `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random` (1 to N shards each iteration), `all`, or comma-separated indices (e.g. `0,2`) | | `CHAOS_RECOVERY_TIMEOUT` | `5m` | Max time to wait for cluster recovery | | `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | | `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | @@ -144,3 +144,19 @@ CHAOS_SHARDS=3 CHAOS_REPLICAS=1 CHAOS_TARGET_SHARDS=0,1 \ # Delete all pods in all shards CHAOS_TARGET_SHARDS=all CHAOS_SCENARIOS=delete-shard-pods make test-chaos ``` + +## Running Multiple Tests in Parallel + +The chaos test uses a dedicated Kind cluster (`valkey-operator-test-chaos`) that is separate from the e2e test cluster. +To run two chaos tests simultaneously with different configurations, use separate Kind clusters and kubeconfigs: + +```bash +# Terminal 1: default config +make test-chaos 2>&1 | tee --ignore-interrupts chaos-run1.log + +# Terminal 2: different cluster name and kubeconfig to avoid conflicts +KIND_CLUSTER_CHAOS=chaos-2 KUBECONFIG=/tmp/chaos-2.conf \ + make test-chaos 2>&1 | tee --ignore-interrupts chaos-run2.log +``` + +Each run creates its own Kind cluster, so they don't interfere with each other. diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index 7563ecda..f0db5e53 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -19,19 +19,7 @@ limitations under the License. package chaos -//---------------------------------------------------------- -// Example: -// -// CHAOS_SCENARIOS=delete-recreate-cluster \ -// CHAOS_WORKLOAD_TYPE=Deployment \ -// make test-chaos 2>&1 | tee --ignore-interrupts chaos.log -// -// To run in parallel with e2e tests (separate Kind cluster), -// set KUBECONFIG to avoid context conflicts: -// -// KUBECONFIG=/tmp/chaos-kube.conf make test-chaos -// -//---------------------------------------------------------- +// Chaos test suite. See docs/chaos-testing.md for configuration and usage. import ( "fmt" @@ -308,7 +296,8 @@ spec: var targetShardsForIteration []int switch targetShards { case "random": - targetShardsForIteration = []int{rnd.Intn(shards)} + count := rnd.Intn(shards) + 1 + targetShardsForIteration = rnd.Perm(shards)[:count] case "all": targetShardsForIteration = make([]int, shards) for i := range shards { @@ -436,12 +425,16 @@ spec: // Fault scenario implementations func deletePrimaryPod(ctx *ChaosContext) error { + var pods []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { return err } _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary pod: %s (shard %d)\n", pod, shard) + pods = append(pods, pod) + } + for _, pod := range pods { if err := utils.DeletePod(pod, ctx.Namespace); err != nil { return err } @@ -453,12 +446,16 @@ func deleteReplicaPod(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } + var pods []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { return fmt.Errorf("skip: %w", err) } _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica pod: %s (shard %d)\n", pod, shard) + pods = append(pods, pod) + } + for _, pod := range pods { if err := utils.DeletePod(pod, ctx.Namespace); err != nil { return err } @@ -492,6 +489,7 @@ func deleteShardPods(ctx *ChaosContext) error { } func deletePrimaryWorkload(ctx *ChaosContext) error { + var workloads []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { @@ -502,6 +500,9 @@ func deletePrimaryWorkload(ctx *ChaosContext) error { return err } _, _ = fmt.Fprintf(GinkgoWriter, " Deleting primary %s: %s (shard %d)\n", ctx.WorkloadType, workload, shard) + workloads = append(workloads, workload) + } + for _, workload := range workloads { if err := utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType); err != nil { return err } @@ -513,6 +514,7 @@ func deleteReplicaWorkload(ctx *ChaosContext) error { if ctx.Replicas == 0 { return fmt.Errorf("skip: no replicas configured") } + var workloads []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { @@ -523,6 +525,9 @@ func deleteReplicaWorkload(ctx *ChaosContext) error { return err } _, _ = fmt.Fprintf(GinkgoWriter, " Deleting replica %s: %s (shard %d)\n", ctx.WorkloadType, workload, shard) + workloads = append(workloads, workload) + } + for _, workload := range workloads { if err := utils.DeleteWorkload(workload, ctx.Namespace, ctx.WorkloadType); err != nil { return err } From 867473ac57fa7c31ff2eeb331e608d804850e920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Wed, 10 Jun 2026 22:32:21 +0200 Subject: [PATCH 05/11] fixup: collect pods before pausing and unthrottle CPU in AfterSuite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- test/chaos/chaos_suite_test.go | 3 +++ test/chaos/chaos_test.go | 16 ++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/test/chaos/chaos_suite_test.go b/test/chaos/chaos_suite_test.go index 8edea5c1..d2542c5f 100644 --- a/test/chaos/chaos_suite_test.go +++ b/test/chaos/chaos_suite_test.go @@ -84,6 +84,9 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { + // Reset any CPU pressure before teardown. + utils.UnthrottleNodes(utils.GetWorkerNodes()) + teardownCertManager() By("undeploying the controller-manager") diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index f0db5e53..1e56ad11 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -625,20 +625,22 @@ func networkPartitionReplica(ctx *ChaosContext) error { func pausePrimaryContainer(ctx *ChaosContext) error { // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) - var paused []string + var pods []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardPrimaryPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { return err } _, _ = fmt.Fprintf(GinkgoWriter, " Pausing primary container in pod: %s (shard %d) for %s\n", pod, shard, duration.Truncate(time.Millisecond)) + pods = append(pods, pod) + } + for _, pod := range pods { if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { return err } - paused = append(paused, pod) } time.Sleep(duration) - for _, pod := range paused { + for _, pod := range pods { _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing primary container in pod: %s\n", pod) if err := utils.UnpauseContainer(pod, ctx.Namespace); err != nil { return err @@ -653,20 +655,22 @@ func pauseReplicaContainer(ctx *ChaosContext) error { } // 1-5s covers both non-failover (<2s timeout) and failover (>2s) cases duration := randomDuration(ctx.Rand, 1*time.Second, 5*time.Second) - var paused []string + var pods []string for _, shard := range ctx.TargetShards { pod, err := utils.GetShardReplicaPod(ctx.ClusterName, ctx.Namespace, shard) if err != nil { return fmt.Errorf("skip: %w", err) } _, _ = fmt.Fprintf(GinkgoWriter, " Pausing replica container in pod: %s (shard %d) for %s\n", pod, shard, duration.Truncate(time.Millisecond)) + pods = append(pods, pod) + } + for _, pod := range pods { if err := utils.PauseContainer(pod, ctx.Namespace); err != nil { return err } - paused = append(paused, pod) } time.Sleep(duration) - for _, pod := range paused { + for _, pod := range pods { _, _ = fmt.Fprintf(GinkgoWriter, " Unpausing replica container in pod: %s\n", pod) if err := utils.UnpauseContainer(pod, ctx.Namespace); err != nil { return err From 7d89698432a85925349feb187391c698c99d6310 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 12 Jun 2026 09:29:30 +0200 Subject: [PATCH 06/11] chaos: support large clusters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 2 +- test/chaos/chaos_suite_test.go | 6 ++++++ test/chaos/chaos_test.go | 15 ++++++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index 5f41d6f1..1bbbc39c 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -54,7 +54,7 @@ All configuration is via environment variables: | `CHAOS_SEED` | current time | Random seed for reproducibility | | `CHAOS_MODE` | `random` | `random` or `sequential` scenario selection | | `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random` (1 to N shards each iteration), `all`, or comma-separated indices (e.g. `0,2`) | -| `CHAOS_RECOVERY_TIMEOUT` | `5m` | Max time to wait for cluster recovery | +| `CHAOS_RECOVERY_TIMEOUT` | `15s × pods` (min 5m) | Max time to wait for cluster recovery | | `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | | `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | | `CHAOS_DATA_SIZE` | `3` | Size of each key's value in bytes | diff --git a/test/chaos/chaos_suite_test.go b/test/chaos/chaos_suite_test.go index d2542c5f..4f11f107 100644 --- a/test/chaos/chaos_suite_test.go +++ b/test/chaos/chaos_suite_test.go @@ -81,6 +81,12 @@ var _ = BeforeSuite(func() { cmd = exec.Command("make", "deploy", fmt.Sprintf("IMG=%s", managerImage)) _, err = utils.Run(cmd) Expect(err).NotTo(HaveOccurred(), "Failed to deploy the controller-manager") + + By("increasing controller-manager memory limit for large clusters") + cmd = exec.Command("kubectl", "set", "resources", "deployment/valkey-operator-controller-manager", + "-n", namespace, "--limits=memory=512Mi", "--requests=memory=128Mi") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to set controller-manager memory limits") }) var _ = AfterSuite(func() { diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index 1e56ad11..efb223dc 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -122,7 +122,12 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { dataSize = envIntOrDefault("CHAOS_DATA_SIZE", 3, 1 /* min */) targetShards = envOrDefault("CHAOS_TARGET_SHARDS", "random") mode = envOneOf("CHAOS_MODE", "random", []string{"random", "sequential"}) - recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", 5*time.Minute) + // Scale timeout with cluster size: 15s per pod, minimum 5 minutes. + defaultTimeout := time.Duration(shards*(replicas+1)) * 15 * time.Second + if defaultTimeout < 5*time.Minute { + defaultTimeout = 5 * time.Minute + } + recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", defaultTimeout) tolerationSec = envIntOrDefault("CHAOS_TOLERATION_SECONDS", 0, 0 /* min */) seed = envInt64OrDefault("CHAOS_SEED", time.Now().UnixNano()) scenarios = filterScenarios(allScenarios, envOrDefault("CHAOS_SCENARIOS", "")) @@ -372,9 +377,17 @@ spec: // Wait until CR is Ready, all pods are Running, and cluster health is ok. By(fmt.Sprintf("Iteration %d: waiting for cluster recovery", iteration)) + recoveryStart := time.Now() + lastStatus := recoveryStart Eventually(func(g Gomega) { cr, err := utils.GetValkeyClusterStatus(clusterName) g.Expect(err).NotTo(HaveOccurred()) + if time.Since(lastStatus) >= time.Minute { + remaining := recoveryTimeout - time.Since(recoveryStart) + _, _ = fmt.Fprintf(GinkgoWriter, " recovery status: state=%s reason=%s readyShards=%d/%d (timeout in %s)\n", + cr.Status.State, cr.Status.Reason, cr.Status.ReadyShards, shards, remaining.Truncate(time.Second)) + lastStatus = time.Now() + } g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady), fmt.Sprintf("cluster state: %s, reason: %s", cr.Status.State, cr.Status.Reason)) g.Expect(cr.Status.ReadyShards).To(Equal(int32(shards))) From a1843fdfc59f8581ded44a625dd54b7e38e59520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 12 Jun 2026 10:07:28 +0200 Subject: [PATCH 07/11] chaos: add scenario to scale replicas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 4 ++++ test/chaos/chaos_test.go | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index 1bbbc39c..fa04a09b 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -74,6 +74,7 @@ All configuration is via environment variables: | `pause-primary-container` | Pauses the primary container | | `pause-replica-container` | Pauses a replica container | | `scale-shards` | Scales shards up or down randomly | +| `scale-replicas` | Scales replicas up or down randomly | | `rolling-update` | Changes cluster config to trigger a rolling update | | `delete-recreate-cluster` | Deletes and recreates the ValkeyCluster | | `delete-controller-pod` | Kills the operator controller pod | @@ -134,6 +135,9 @@ CHAOS_SCENARIOS=network-partition-primary,pause-worker-node \ # Large cluster with more workers CHAOS_SHARDS=7 CHAOS_REPLICAS=2 KIND_WORKERS=3 make test-chaos +# Test scaling shards and replicas together +CHAOS_SHARDS=5 CHAOS_REPLICAS=2 CHAOS_SCENARIOS=scale-shards,scale-replicas make test-chaos + # Run scenarios in sequence instead of random CHAOS_SCENARIOS=delete-primary-pod,scale-shards CHAOS_MODE=sequential make test-chaos diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index efb223dc..d87b2a19 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -50,6 +50,7 @@ type ChaosContext struct { MinShards int MaxShards int Replicas int + MaxReplicas int TolerationSec int Rand *rand.Rand } @@ -76,6 +77,7 @@ var allScenarios = []Scenario{ {Name: "pause-primary-container", Inject: pausePrimaryContainer}, {Name: "pause-replica-container", Inject: pauseReplicaContainer}, {Name: "scale-shards", Inject: scaleShards}, + {Name: "scale-replicas", Inject: scaleReplicas}, {Name: "rolling-update", Inject: rollingUpdate}, {Name: "delete-recreate-cluster", LosesData: true, Inject: deleteRecreateCluster}, {Name: "delete-controller-pod", Inject: deleteControllerPod}, @@ -348,6 +350,7 @@ spec: MinShards: minShards, MaxShards: maxShards, Replicas: replicas, + MaxReplicas: replicas + 2, TolerationSec: tolerationSec, Rand: rnd, } @@ -747,6 +750,23 @@ func scaleShards(ctx *ChaosContext) error { return nil } +func scaleReplicas(ctx *ChaosContext) error { + // Pick a random replica count in [0, MaxReplicas], excluding current. + newReplicas := ctx.Rand.Intn(ctx.MaxReplicas) + if newReplicas >= ctx.Replicas { + newReplicas++ + } + _, _ = fmt.Fprintf(GinkgoWriter, " Scaling replicas from %d to %d\n", ctx.Replicas, newReplicas) + cmd := exec.Command("kubectl", "patch", "valkeycluster", ctx.ClusterName, + "-n", ctx.Namespace, "--type=merge", + "-p", fmt.Sprintf(`{"spec":{"replicas":%d}}`, newReplicas)) + if _, err := utils.Run(cmd); err != nil { + return err + } + ctx.Replicas = newReplicas + return nil +} + func deleteRecreateCluster(ctx *ChaosContext) error { // Capture the current spec before deleting cmd := exec.Command("kubectl", "get", "valkeycluster", ctx.ClusterName, From d4b1cf7603b38d0ecad1fee2ddb12583305be2d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 12 Jun 2026 17:38:51 +0200 Subject: [PATCH 08/11] Add compound scenarios, like CHAOS_SCENARIOS=scale-shards+delete-primary-pod MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 18 ++++++++++ test/chaos/chaos_test.go | 73 ++++++++++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index fa04a09b..a57c3a14 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -87,6 +87,24 @@ Scenarios that target replicas are skipped when `CHAOS_REPLICAS=0`. Scenarios marked "disabled by default" are excluded unless explicitly listed in `CHAOS_SCENARIOS`. They require `CHAOS_TOLERATION_SECONDS` to be set for meaningful eviction testing. +### Compound Scenarios + +Use the `+` separator in `CHAOS_SCENARIOS` to inject multiple faults back-to-back within a single iteration. +Each sub-scenario runs sequentially with a random 0-9s delay between them, testing that the operator handles overlapping faults mid-reconciliation. + +```bash +# Scale shards while killing a primary during the operation +CHAOS_SCENARIOS=scale-shards+delete-primary-pod make test-chaos + +# Multiple compounds, randomly selected each iteration +CHAOS_SCENARIOS=scale-shards+delete-primary-pod,scale-replicas+delete-replica-pod make test-chaos + +# Triple fault: scale + kill primary + kill replica +CHAOS_SCENARIOS=scale-shards+delete-primary-pod+delete-replica-pod make test-chaos +``` + +Compound scenarios are always marked as potentially losing data; the test logs a warning showing how many keys were lost and re-seeds after each iteration. + ## Reading the Output Each iteration logs: diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index d87b2a19..efe2061d 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -415,6 +415,10 @@ spec: }, 60*time.Second).Should(Succeed(), fmt.Sprintf("Iteration %d: data integrity check failed (seed=%d)", iteration, seed)) } else { + By(fmt.Sprintf("Iteration %d: checking for data loss (scenario may lose data)", iteration)) + if err := utils.VerifyTestData(clusterName, "default", seededKeys); err != nil { + _, _ = fmt.Fprintf(GinkgoWriter, " WARNING: data lost (expected): %s\n", err) + } By(fmt.Sprintf("Iteration %d: re-seeding test data after data-loss scenario", iteration)) err := utils.FlushAll(clusterName, "default") Expect(err).NotTo(HaveOccurred(), "Failed to flush data") @@ -1001,21 +1005,70 @@ func filterScenarios(all []Scenario, filter string) []Scenario { } return result } - requested := make(map[string]bool) + var result []Scenario for _, name := range strings.Split(filter, ",") { - requested[strings.TrimSpace(name)] = true + name = strings.TrimSpace(name) + if strings.Contains(name, "+") { + // Ad-hoc compound: "scale-shards+delete-primary-pod" + parts := strings.Split(name, "+") + group := make([]string, len(parts)) + for i, p := range parts { + group[i] = strings.TrimSpace(p) + } + result = append(result, Scenario{ + Name: name, + LosesData: true, // compound scenarios may lose data due to overlapping faults + Inject: makeCompoundInject(group), + }) + continue + } + found := false + for _, s := range all { + if s.Name == name { + result = append(result, s) + found = true + break + } + } + if !found { + Fail(fmt.Sprintf("CHAOS_SCENARIOS contains unknown scenario: %q", name)) + } } - var result []Scenario - for _, s := range all { - if requested[s.Name] { - result = append(result, s) - delete(requested, s.Name) + return result +} + +func scenarioByName(name string) *Scenario { + for i := range allScenarios { + if allScenarios[i].Name == name { + return &allScenarios[i] } } - for name := range requested { - Fail(fmt.Sprintf("CHAOS_SCENARIOS contains unknown scenario: %q", name)) + return nil +} + +func makeCompoundInject(group []string) func(*ChaosContext) error { + return func(ctx *ChaosContext) error { + _, _ = fmt.Fprintf(GinkgoWriter, " Compound test: %s\n", strings.Join(group, " + ")) + for i, n := range group { + if i > 0 { + delay := time.Duration(ctx.Rand.Intn(10)) * time.Second + _, _ = fmt.Fprintf(GinkgoWriter, " delay %s before %s\n", delay, n) + time.Sleep(delay) + } + s := scenarioByName(n) + if s == nil { + return fmt.Errorf("unknown scenario in compound: %s", n) + } + if err := s.Inject(ctx); err != nil { + if strings.Contains(err.Error(), "skip:") { + _, _ = fmt.Fprintf(GinkgoWriter, " %s skipped: %s\n", n, err) + continue + } + return err + } + } + return nil } - return result } func envFloat64OrDefault(key string, defaultVal float64, minVal float64) float64 { From eb804e035e97da4abd14dc0f604a220e7dd9c148 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Sun, 14 Jun 2026 00:15:23 +0200 Subject: [PATCH 09/11] chaos: add randomX to CHAOS_TARGET_SHARDS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 2 +- test/chaos/chaos_test.go | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index a57c3a14..bee11e71 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -53,7 +53,7 @@ All configuration is via environment variables: | `CHAOS_SCENARIOS` | all except disabled | Comma-separated list of scenarios to run | | `CHAOS_SEED` | current time | Random seed for reproducibility | | `CHAOS_MODE` | `random` | `random` or `sequential` scenario selection | -| `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random` (1 to N shards each iteration), `all`, or comma-separated indices (e.g. `0,2`) | +| `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random` (1 to N shards each iteration), `randomX` (exactly X random shards), `all`, or comma-separated indices (e.g. `0,2`) | | `CHAOS_RECOVERY_TIMEOUT` | `15s × pods` (min 5m) | Max time to wait for cluster recovery | | `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | | `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index efe2061d..bc20df89 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -165,13 +165,20 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { for _, s := range scenarios { enabledNames[s.Name] = true } + allNames := make(map[string]bool) for _, s := range allScenarios { + allNames[s.Name] = true status := "enabled" if !enabledNames[s.Name] { status = "disabled" } _, _ = fmt.Fprintf(GinkgoWriter, " - %-30s [%s]\n", s.Name, status) } + for _, s := range scenarios { + if !allNames[s.Name] { + _, _ = fmt.Fprintf(GinkgoWriter, " - %-30s [enabled]\n", s.Name) + } + } _, _ = fmt.Fprintf(GinkgoWriter, "================================\n") // Create a cluster @@ -301,15 +308,22 @@ spec: } var targetShardsForIteration []int - switch targetShards { - case "random": + switch { + case targetShards == "random": count := rnd.Intn(shards) + 1 targetShardsForIteration = rnd.Perm(shards)[:count] - case "all": + case targetShards == "all": targetShardsForIteration = make([]int, shards) for i := range shards { targetShardsForIteration[i] = i } + case strings.HasPrefix(targetShards, "random"): + count, err := strconv.Atoi(strings.TrimPrefix(targetShards, "random")) + if err != nil || count < 1 { + Fail(fmt.Sprintf("CHAOS_TARGET_SHARDS=%q: expected 'randomN' where N >= 1", targetShards)) + } + count = min(count, shards) + targetShardsForIteration = rnd.Perm(shards)[:count] default: for _, s := range strings.Split(targetShards, ",") { if v, err := strconv.Atoi(strings.TrimSpace(s)); err == nil { From 679893fe503095074af26b0a3ce0a755ce5720db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Sun, 14 Jun 2026 21:10:34 +0200 Subject: [PATCH 10/11] chaos: replace valkey-benchmark with custom Go client for seeding and writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a lightweight Go client (test/chaos/client/) that seeds keys deterministically and optionally maintains continuous writes at a configurable rate (CHAOS_WRITE_RPS). This replaces valkey-benchmark for both seeding and background writes because valkey-benchmark's {tag} rewriting produces non-deterministic key names, making exact key count verification impossible. The custom client: - Seeds keys sequentially (key:000000000000 to key:000000099999) - Overwrites the same keys in a loop (no extra keys created) - Uses valkey-go cluster mode (auto-routing, auto-reconnect) - Reports writes/errors every 5s via pod logs - Supports configurable value size (CHAOS_DATA_SIZE) Signed-off-by: Björn Svensson --- docs/chaos-testing.md | 25 ++++++++- test/chaos/chaos_suite_test.go | 9 ++++ test/chaos/chaos_test.go | 7 ++- test/chaos/client/Dockerfile | 10 ++++ test/chaos/client/go.mod | 7 +++ test/chaos/client/go.sum | 14 ++++++ test/chaos/client/main.go | 92 ++++++++++++++++++++++++++++++++++ test/utils/chaos.go | 88 ++++++++++++++++++++++++-------- 8 files changed, 227 insertions(+), 25 deletions(-) create mode 100644 test/chaos/client/Dockerfile create mode 100644 test/chaos/client/go.mod create mode 100644 test/chaos/client/go.sum create mode 100644 test/chaos/client/main.go diff --git a/docs/chaos-testing.md b/docs/chaos-testing.md index bee11e71..bb972266 100644 --- a/docs/chaos-testing.md +++ b/docs/chaos-testing.md @@ -56,8 +56,9 @@ All configuration is via environment variables: | `CHAOS_TARGET_SHARDS` | `random` | Shards to target: `random` (1 to N shards each iteration), `randomX` (exactly X random shards), `all`, or comma-separated indices (e.g. `0,2`) | | `CHAOS_RECOVERY_TIMEOUT` | `15s × pods` (min 5m) | Max time to wait for cluster recovery | | `CHAOS_TOLERATION_SECONDS` | `0` | Pod toleration seconds for not-ready/unreachable (0 = not set) | -| `CHAOS_NUM_KEYS` | `100000` | Number of keys to seed | +| `CHAOS_NUM_KEYS` | `100000` | Number of keys | | `CHAOS_DATA_SIZE` | `3` | Size of each key's value in bytes | +| `CHAOS_WRITE_RPS` | `20` | Continuous write rate after seeding (0 = seed only, client exits) | | `CHAOS_CPU_PRESSURE` | `false` | Throttle Kind worker node CPUs each iteration | | `CHAOS_CPU_MIN` | `0.3` | Minimum CPU limit when throttling | | `CHAOS_CPU_MAX` | `1.0` | Maximum CPU limit when throttling | @@ -87,6 +88,28 @@ Scenarios that target replicas are skipped when `CHAOS_REPLICAS=0`. Scenarios marked "disabled by default" are excluded unless explicitly listed in `CHAOS_SCENARIOS`. They require `CHAOS_TOLERATION_SECONDS` to be set for meaningful eviction testing. +### Client + +A custom Go client (`test/chaos/client/`) handles seeding and continuous writes. +It is built and loaded into Kind automatically during `BeforeSuite`. + +**Seeding:** Writes `CHAOS_NUM_KEYS` keys sequentially (`key:000000000000` to `key:000000099999`) +with a fixed value of `CHAOS_DATA_SIZE` bytes. + +**Continuous writes:** After seeding, the client overwrites the same keys at `CHAOS_WRITE_RPS` rate. +This generates replication traffic on all shards, simulating a real workload during fault injection. +Set `CHAOS_WRITE_RPS=0` to disable (client seeds and exits). + +**Stats output** (printed at test end and in pod logs): +``` +2026/06/14 15:50:53 writes=5721 errors=21 rps=20.0 +``` +- `writes` — successful SET commands since seeding completed +- `errors` — failed SET commands (cluster unavailable, CLUSTERDOWN, timeout) +- `rps` — configured target rate + +Errors during faults are expected. Sustained errors after recovery indicate an operator bug. + ### Compound Scenarios Use the `+` separator in `CHAOS_SCENARIOS` to inject multiple faults back-to-back within a single iteration. diff --git a/test/chaos/chaos_suite_test.go b/test/chaos/chaos_suite_test.go index 4f11f107..3f04cbef 100644 --- a/test/chaos/chaos_suite_test.go +++ b/test/chaos/chaos_suite_test.go @@ -57,6 +57,15 @@ var _ = BeforeSuite(func() { err = utils.LoadImageToKindClusterWithName(managerImage) ExpectWithOffset(1, err).NotTo(HaveOccurred(), "Failed to load the manager image into Kind") + By("building the chaos client image") + cmd = exec.Command("docker", "build", "-t", "chaos-client:v0.0.1", "test/chaos/client/") + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "Failed to build the chaos client image") + + By("loading the chaos client image on Kind") + err = utils.LoadImageToKindClusterWithName("chaos-client:v0.0.1") + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "Failed to load the chaos client image into Kind") + setupCertManager() SetDefaultEventuallyTimeout(2 * time.Minute) diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index bc20df89..37259f1b 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -110,6 +110,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { cpuMax float64 throttledNodes []string workerNodes []string + writeRPS int ) BeforeAll(func() { @@ -136,6 +137,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { cpuPressure = envBool("CHAOS_CPU_PRESSURE", false) cpuMin = envFloat64OrDefault("CHAOS_CPU_MIN", 0.3, 0.1) cpuMax = envFloat64OrDefault("CHAOS_CPU_MAX", 1.0, cpuMin) + writeRPS = envIntOrDefault("CHAOS_WRITE_RPS", 20, 0 /* min */) if cpuPressure { workerNodes = utils.GetWorkerNodes() } @@ -233,7 +235,7 @@ spec: }, recoveryTimeout, 5*time.Second).Should(Succeed()) By("seeding test data") - seededKeys, err = utils.SeedTestData(clusterName, "default", numKeys, dataSize, seed) + seededKeys, err = utils.StartBackgroundClient(clusterName, "default", numKeys, dataSize, writeRPS) Expect(err).NotTo(HaveOccurred(), "Failed to seed test data") _, _ = fmt.Fprintf(GinkgoWriter, " Seeded keys: %d\n", seededKeys) }) @@ -291,6 +293,7 @@ spec: }) AfterAll(func() { + utils.StopBackgroundClient("default") By("cleaning up chaos cluster") cmd := exec.Command("kubectl", "delete", "valkeycluster", clusterName, "--ignore-not-found=true") _, _ = utils.Run(cmd) @@ -436,7 +439,7 @@ spec: By(fmt.Sprintf("Iteration %d: re-seeding test data after data-loss scenario", iteration)) err := utils.FlushAll(clusterName, "default") Expect(err).NotTo(HaveOccurred(), "Failed to flush data") - seededKeys, err = utils.SeedTestData(clusterName, "default", numKeys, dataSize, seed) + seededKeys, err = utils.StartBackgroundClient(clusterName, "default", numKeys, dataSize, writeRPS) Expect(err).NotTo(HaveOccurred(), "Failed to re-seed test data") _, _ = fmt.Fprintf(GinkgoWriter, " Seeded keys: %d\n", seededKeys) } diff --git a/test/chaos/client/Dockerfile b/test/chaos/client/Dockerfile new file mode 100644 index 00000000..aaa5fd20 --- /dev/null +++ b/test/chaos/client/Dockerfile @@ -0,0 +1,10 @@ +FROM golang:1.26-alpine AS builder +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY main.go . +RUN CGO_ENABLED=0 go build -o /chaos-client . + +FROM scratch +COPY --from=builder /chaos-client /chaos-client +ENTRYPOINT ["/chaos-client"] diff --git a/test/chaos/client/go.mod b/test/chaos/client/go.mod new file mode 100644 index 00000000..1b761100 --- /dev/null +++ b/test/chaos/client/go.mod @@ -0,0 +1,7 @@ +module valkey.io/valkey-operator/test/chaos/client + +go 1.25 + +require github.com/valkey-io/valkey-go v1.0.68 + +require golang.org/x/sys v0.31.0 // indirect diff --git a/test/chaos/client/go.sum b/test/chaos/client/go.sum new file mode 100644 index 00000000..fdaacb99 --- /dev/null +++ b/test/chaos/client/go.sum @@ -0,0 +1,14 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= +github.com/valkey-io/valkey-go v1.0.68 h1:bTbfonp49b41DqrF30q+y2JL3gcbjd2IiacFAtO4JBA= +github.com/valkey-io/valkey-go v1.0.68/go.mod h1:bHmwjIEOrGq/ubOJfh5uMRs7Xj6mV3mQ/ZXUbmqpjqY= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/chaos/client/main.go b/test/chaos/client/main.go new file mode 100644 index 00000000..22c38a45 --- /dev/null +++ b/test/chaos/client/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/valkey-io/valkey-go" +) + +func main() { + addr := os.Getenv("VALKEY_ADDR") + if addr == "" { + log.Fatal("VALKEY_ADDR not set") + } + numKeys, _ := strconv.Atoi(os.Getenv("NUM_KEYS")) + if numKeys <= 0 { + numKeys = 100000 + } + dataSize, _ := strconv.Atoi(os.Getenv("DATA_SIZE")) + if dataSize <= 0 { + dataSize = 3 + } + rps, _ := strconv.Atoi(os.Getenv("RPS")) + if rps <= 0 { + rps = 20 + } + value := strings.Repeat("x", dataSize) + + log.Printf("Connecting to %s...\n", addr) + client, err := valkey.NewClient(valkey.ClientOption{ + InitAddress: []string{addr}, + }) + if err != nil { + log.Fatalf("connect failed: %v\n", err) + } + defer client.Close() + + ctx := context.Background() + + // Phase 1: Seed all keys + log.Printf("SEEDING %d keys...\n", numKeys) + seeded := 0 + for i := range numKeys { + key := fmt.Sprintf("key:%012d", i) + err := client.Do(ctx, client.B().Set().Key(key).Value(value).Build()).Error() + if err != nil { + log.Printf("seed error at key %d: %v\n", i, err) + continue + } + seeded++ + } + log.Printf("SEEDED %d\n", seeded) + + // Phase 2: Continuous updates at target RPS (or exit if RPS=0) + if rps <= 0 { + log.Println("RPS=0, seed-only mode, exiting") + return + } + + var writes, errors atomic.Int64 + interval := time.Second / time.Duration(rps) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Print stats every 5 seconds + go func() { + for range time.Tick(5 * time.Second) { + w := writes.Load() + e := errors.Load() + log.Printf("writes=%d errors=%d rps=%.1f\n", w, e, float64(rps)) + } + }() + + keyIdx := 0 + for range ticker.C { + key := fmt.Sprintf("key:%012d", keyIdx%numKeys) + keyIdx++ + + err := client.Do(ctx, client.B().Set().Key(key).Value(value).Build()).Error() + if err != nil { + errors.Add(1) + continue + } + writes.Add(1) + } +} diff --git a/test/utils/chaos.go b/test/utils/chaos.go index 4c633c44..5aaee699 100644 --- a/test/utils/chaos.go +++ b/test/utils/chaos.go @@ -25,6 +25,7 @@ import ( "os/exec" "strconv" "strings" + "time" ) // DeletePod deletes a pod by name in the given namespace. @@ -330,28 +331,6 @@ func FlushAll(clusterName, namespace string) error { return nil } -// SeedTestData uses valkey-benchmark to write keys across the cluster. -// Returns the actual number of keys stored (from INFO keyspace). -func SeedTestData(clusterName, namespace string, numKeys, dataSize int, seed int64) (int, error) { - anyPod, err := GetPodNameByLabels(namespace, map[string]string{ - "valkey.io/cluster": clusterName, - }) - if err != nil { - return 0, err - } - cmd := exec.Command("kubectl", "exec", anyPod, "-n", namespace, "-c", "server", "--", - "valkey-benchmark", "-n", strconv.Itoa(numKeys), - "--cluster", "-r", strconv.Itoa(numKeys), - "-d", strconv.Itoa(dataSize), "-q", - "--seed", strconv.FormatInt(seed, 10), - "SET", "key:{tag}:__rand_int__", "__data__") - if _, err := Run(cmd); err != nil { - return 0, fmt.Errorf("valkey-benchmark failed: %w", err) - } - total, _, err := GetTotalKeyCount(clusterName, namespace) - return total, err -} - // VerifyTestData checks that the total key count across all primaries matches expected. func VerifyTestData(clusterName, namespace string, seededKeys int) error { totalKeys, perShard, err := GetTotalKeyCount(clusterName, namespace) @@ -657,3 +636,68 @@ func ThrottleRandomWorkerNodes(rnd *rand.Rand, nodes []string, cpuMin, cpuMax fl } return throttled } + +const backgroundClientPod = "chaos-background-client" + +// StartBackgroundClient deploys a pod running a custom Go client that seeds +// all keys and then continuously overwrites them, keeping replication offsets +// active on all shards. Waits for seeding to complete before returning. +// Returns the number of keys seeded. +func StartBackgroundClient(clusterName, namespace string, numKeys, dataSize, rps int) (int, error) { + // Delete any leftover client pod + StopBackgroundClient(namespace) + + svcHost := fmt.Sprintf("valkey-%s.%s.svc.cluster.local:6379", clusterName, namespace) + cmd := exec.Command("kubectl", "run", backgroundClientPod, + "-n", namespace, + "--image=chaos-client:v0.0.1", + "--restart=Always", + "--image-pull-policy=Never", + "--env=VALKEY_ADDR="+svcHost, + "--env=NUM_KEYS="+strconv.Itoa(numKeys), + "--env=DATA_SIZE="+strconv.Itoa(dataSize), + "--env=RPS="+strconv.Itoa(rps), + ) + if _, err := Run(cmd); err != nil { + return 0, err + } + + // Wait for "SEEDED N" in logs + var seeded int + for attempts := 0; attempts < 240; attempts++ { + time.Sleep(1 * time.Second) + cmd = exec.Command("kubectl", "logs", backgroundClientPod, "-n", namespace) + output, err := Run(cmd) + if err != nil { + continue + } + for _, line := range strings.Split(output, "\n") { + if idx := strings.Index(line, "SEEDED "); idx >= 0 { + if n, err := fmt.Sscanf(line[idx:], "SEEDED %d", &seeded); n == 1 && err == nil { + return seeded, nil + } + } + } + } + // Print pod logs on timeout to help debug + cmd = exec.Command("kubectl", "logs", backgroundClientPod, "-n", namespace, "--tail=50") + if output, err := Run(cmd); err == nil { + fmt.Printf(" Background client logs at timeout:\n%s\n", output) + } + cmd = exec.Command("kubectl", "logs", backgroundClientPod, "-n", namespace, "--previous", "--tail=50") + if output, err := Run(cmd); err == nil && output != "" { + fmt.Printf(" Background client previous logs:\n%s\n", output) + } + return 0, fmt.Errorf("background client did not finish seeding within 240s") +} + +// StopBackgroundClient prints stats and deletes the background client pod. +func StopBackgroundClient(namespace string) { + cmd := exec.Command("kubectl", "logs", backgroundClientPod, "-n", namespace, "--tail=20") + if output, err := Run(cmd); err == nil && output != "" { + fmt.Printf(" Background client output:\n%s\n", output) + } + cmd = exec.Command("kubectl", "delete", "pod", backgroundClientPod, + "-n", namespace, "--ignore-not-found=true", "--grace-period=0", "--force") + _, _ = Run(cmd) +} From 10c60ac9008f867c4356617755679e253cc5abd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 16 Jun 2026 01:36:37 +0200 Subject: [PATCH 11/11] chaos: validate config hash, add test stability fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- test/chaos/chaos_test.go | 23 ++++++++++++++------ test/utils/chaos.go | 46 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/test/chaos/chaos_test.go b/test/chaos/chaos_test.go index 37259f1b..b4f4511c 100644 --- a/test/chaos/chaos_test.go +++ b/test/chaos/chaos_test.go @@ -78,7 +78,7 @@ var allScenarios = []Scenario{ {Name: "pause-replica-container", Inject: pauseReplicaContainer}, {Name: "scale-shards", Inject: scaleShards}, {Name: "scale-replicas", Inject: scaleReplicas}, - {Name: "rolling-update", Inject: rollingUpdate}, + {Name: "rolling-update", LosesDataIfNoReplica: true, Inject: rollingUpdate}, {Name: "delete-recreate-cluster", LosesData: true, Inject: deleteRecreateCluster}, {Name: "delete-controller-pod", Inject: deleteControllerPod}, {Name: "pause-worker-node", DisabledByDefault: true, LosesData: true, Inject: pauseWorkerNode}, @@ -125,12 +125,7 @@ var _ = Describe("ValkeyCluster Chaos", Label("chaos"), Ordered, func() { dataSize = envIntOrDefault("CHAOS_DATA_SIZE", 3, 1 /* min */) targetShards = envOrDefault("CHAOS_TARGET_SHARDS", "random") mode = envOneOf("CHAOS_MODE", "random", []string{"random", "sequential"}) - // Scale timeout with cluster size: 15s per pod, minimum 5 minutes. - defaultTimeout := time.Duration(shards*(replicas+1)) * 15 * time.Second - if defaultTimeout < 5*time.Minute { - defaultTimeout = 5 * time.Minute - } - recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", defaultTimeout) + recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", calcTimeout(shards, replicas)) tolerationSec = envIntOrDefault("CHAOS_TOLERATION_SECONDS", 0, 0 /* min */) seed = envInt64OrDefault("CHAOS_SEED", time.Now().UnixNano()) scenarios = filterScenarios(allScenarios, envOrDefault("CHAOS_SCENARIOS", "")) @@ -395,6 +390,9 @@ spec: shards = ctx.Shards replicas = ctx.Replicas + // Recalculate timeout based on current cluster size (may have scaled) + recoveryTimeout = envDurationOrDefault("CHAOS_RECOVERY_TIMEOUT", calcTimeout(shards, replicas)) + // Wait until CR is Ready, all pods are Running, and cluster health is ok. By(fmt.Sprintf("Iteration %d: waiting for cluster recovery", iteration)) recoveryStart := time.Now() @@ -415,6 +413,8 @@ spec: g.Expect(err).NotTo(HaveOccurred(), "K8s resources not ready: %v", err) err = utils.VerifyClusterHealth(clusterName, "default", shards, replicas) g.Expect(err).NotTo(HaveOccurred(), "cluster health: %v", err) + err = utils.VerifyConfigHashConsistency(clusterName, "default") + g.Expect(err).NotTo(HaveOccurred(), "config hash: %v", err) }, recoveryTimeout, 5*time.Second).Should(Succeed(), fmt.Sprintf("Iteration %d: cluster did not recover after %s (scenario=%s, shards=%v, seed=%d)", iteration, recoveryTimeout, scenario.Name, targetShardsForIteration, seed)) @@ -1000,6 +1000,15 @@ func envInt64OrDefault(key string, defaultVal int64) int64 { return i } +// calcTimeout returns a recovery timeout scaled to the cluster size: 15s per pod, minimum 5 minutes. +func calcTimeout(shards, replicas int) time.Duration { + t := time.Duration(shards*(replicas+1)) * 15 * time.Second + if t < 5*time.Minute { + t = 5 * time.Minute + } + return t +} + func envDurationOrDefault(key string, defaultVal time.Duration) time.Duration { v := os.Getenv(key) if v == "" { diff --git a/test/utils/chaos.go b/test/utils/chaos.go index 5aaee699..f7b7f62f 100644 --- a/test/utils/chaos.go +++ b/test/utils/chaos.go @@ -35,16 +35,31 @@ func DeletePod(name, namespace string) error { return err } -// GetPodNameByLabels returns the first pod name matching the given labels. +// GetPodNameByLabels returns a Ready pod name matching the given labels, +// falling back to any matching pod if none are Ready. func GetPodNameByLabels(namespace string, labels map[string]string) (string, error) { selector := labelsToSelector(labels) cmd := exec.Command("kubectl", "get", "pods", "-n", namespace, - "-l", selector, "-o", "jsonpath={.items[0].metadata.name}") + "-l", selector, + "-o", `jsonpath={range .items[*]}{.metadata.name}{" "}{.status.containerStatuses[0].ready}{"\n"}{end}`) output, err := Run(cmd) if err != nil { return "", err } - return strings.TrimSpace(output), nil + var fallback string + for _, line := range strings.Split(output, "\n") { + parts := strings.SplitN(strings.TrimSpace(line), " ", 2) + if len(parts) == 2 && parts[1] == "true" { + return parts[0], nil + } + if fallback == "" { + fallback = parts[0] + } + } + if fallback != "" { + return fallback, nil + } + return "", fmt.Errorf("no pods found with labels %v", labels) } // DeleteWorkload deletes a StatefulSet or Deployment by name. @@ -701,3 +716,28 @@ func StopBackgroundClient(namespace string) { "-n", namespace, "--ignore-not-found=true", "--grace-period=0", "--force") _, _ = Run(cmd) } + +// VerifyConfigHashConsistency checks that all ValkeyNodes in the cluster have +// the same spec.serverConfigHash. A mismatch means the rolling update hasn't +// finished — some nodes still have the old config. +func VerifyConfigHashConsistency(clusterName, namespace string) error { + cmd := exec.Command("kubectl", "get", "valkeynodes", "-n", namespace, + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={range .items[*]}{.spec.serverConfigHash}{\"\\n\"}{end}") + output, err := Run(cmd) + if err != nil { + return fmt.Errorf("failed to get config hashes: %w", err) + } + var expected string + for _, hash := range strings.Split(strings.TrimSpace(output), "\n") { + if hash == "" { + continue + } + if expected == "" { + expected = hash + } else if hash != expected { + return fmt.Errorf("config hash mismatch: expected %s, got %s", expected, hash) + } + } + return nil +}