Skip to content

Commit 623dc69

Browse files
committed
Merge branch 'master' into use-central-flags
2 parents dba5a39 + a0ee591 commit 623dc69

File tree

22 files changed

+968
-166
lines changed

22 files changed

+968
-166
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
_ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry
6161
csitrans "k8s.io/csi-translation-lib"
6262
"k8s.io/klog/v2"
63-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
64-
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller/metrics"
63+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
64+
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller/metrics"
6565

6666
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
6767
"github.com/kubernetes-csi/csi-lib-utils/metrics"
@@ -78,6 +78,7 @@ import (
7878
)
7979

8080
var (
81+
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
8182
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
8283
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
8384
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
@@ -92,6 +93,9 @@ var (
9293

9394
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to provision when fstype is unspecified in the StorageClass. If the default is not set and fstype is unset in the StorageClass, then no fstype will be set")
9495

96+
kubeAPICapacityQPS = flag.Float32("kube-api-capacity-qps", 1, "QPS to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 1.0.")
97+
kubeAPICapacityBurst = flag.Int("kube-api-capacity-burst", 5, "Burst to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 5.")
98+
9599
enableCapacity = flag.Bool("enable-capacity", false, "This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.")
96100
capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.")
97101
capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")
@@ -165,9 +169,9 @@ func main() {
165169
standardflags.Configuration.KubeConfig = kubeconfigEnv
166170
}
167171

168-
if standardflags.Configuration.Master != "" || standardflags.Configuration.KubeConfig != "" {
172+
if *master != "" || standardflags.Configuration.KubeConfig != "" {
169173
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
170-
config, err = clientcmd.BuildConfigFromFlags(standardflags.Configuration.Master, standardflags.Configuration.KubeConfig)
174+
config, err = clientcmd.BuildConfigFromFlags(*master, standardflags.Configuration.KubeConfig)
171175
} else {
172176
klog.Infof("Building kube configs for running in cluster...")
173177
config, err = rest.InClusterConfig()
@@ -387,7 +391,6 @@ func main() {
387391
controller.Threadiness(int(*workerThreads)),
388392
controller.CreateProvisionedPVLimiter(workqueue.DefaultTypedControllerRateLimiter[string]()),
389393
controller.ClaimsInformer(claimInformer),
390-
controller.NodesLister(nodeLister),
391394
controller.RetryIntervalMax(standardflags.Configuration.RetryIntervalMax),
392395
}
393396

@@ -398,6 +401,10 @@ func main() {
398401
if supportsMigrationFromInTreePluginName != "" {
399402
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
400403
}
404+
var pvcNodeStore *ctrl.InMemoryStore
405+
if ctrl.SupportsTopology(pluginCapabilities) {
406+
pvcNodeStore = ctrl.NewInMemoryStore()
407+
}
401408

402409
// Create the provisioner: it implements the Provisioner interface expected by
403410
// the controller
@@ -427,12 +434,15 @@ func main() {
427434
nodeDeployment,
428435
*controllerPublishReadOnly,
429436
*preventVolumeModeConversion,
437+
pvcNodeStore,
430438
)
431439

432440
var capacityController *capacity.Controller
433441
if *enableCapacity {
434442
// Publishing storage capacity information uses its own client
435443
// with separate rate limiting.
444+
config.QPS = *kubeAPICapacityQPS
445+
config.Burst = *kubeAPICapacityBurst
436446
clientset, err := kubernetes.NewForConfig(config)
437447
if err != nil {
438448
klog.Fatalf("Failed to create client: %v", err)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/onsi/ginkgo/v2 v2.23.4
3434
github.com/onsi/gomega v1.37.0
3535
k8s.io/kubernetes v1.34.0
36-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1
36+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0
3737
)
3838

3939
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5E
374374
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
375375
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
376376
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
377-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1 h1:rQcMFPcZ12y82+BA7b29gCWyoI/+/gAQUZu/Cw+8bC0=
378-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1/go.mod h1:kPy4hBso6PNhP9PdlTDdBZqxP1RKg7DFFM7lIR1bA8k=
377+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0 h1:bqSqBfqtToTDMDz+FEzfqofXAp5ptt6Z7ShR0g05PGA=
378+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0/go.mod h1:1xSe5kgJcKbrtNdD5WoytKUoByAGDl3wVHlKP0RZIC8=
379379
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco=
380380
sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE=
381381
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=

pkg/capacity/provision.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121

2222
v1 "k8s.io/api/core/v1"
23-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
23+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2424
)
2525

2626
type provisionWrapper struct {

pkg/controller/cache.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
)
9+
10+
// TopologyInfo holds the data for NodeLabels and TopologyKeys
11+
type TopologyInfo struct {
12+
NodeLabels map[string]string
13+
TopologyKeys []string
14+
RequisiteTerms []topologyTerm
15+
}
16+
17+
// TopologyProvider is an interface that defines the behavior for looking up
18+
// a TopologyInfo object by its pvc UID.
19+
type TopologyProvider interface {
20+
GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error)
21+
// The entry is deleted when provision succeeds or returns a final error.
22+
Delete(pvcUID types.UID) error
23+
24+
// Update methods now perform an "upsert" and don't return errors.
25+
UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string)
26+
UpdateTopologyKeys(pvcUID types.UID, newKeys []string)
27+
UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm)
28+
}
29+
30+
// InMemoryStore is a concrete implementation of TopologyProvider.
31+
// It uses an in-memory map for quick lookups.
32+
type InMemoryStore struct {
33+
// The map key is the object's name.
34+
data map[types.UID]*TopologyInfo
35+
// Adding a mutex for thread-safe access
36+
mutex sync.RWMutex
37+
}
38+
39+
// NewInMemoryStore creates and initializes a new store.
40+
func NewInMemoryStore() *InMemoryStore {
41+
return &InMemoryStore{
42+
data: make(map[types.UID]*TopologyInfo),
43+
}
44+
}
45+
46+
// Delete implements the TopologyProvider interface.
47+
// It uses the built-in delete() function to remove the item from the map.
48+
func (s *InMemoryStore) Delete(pvcUID types.UID) error {
49+
s.mutex.Lock()
50+
defer s.mutex.Unlock()
51+
// First, check if the key exists to provide a helpful error.
52+
_, found := s.data[pvcUID]
53+
if !found {
54+
return nil
55+
}
56+
delete(s.data, pvcUID)
57+
return nil
58+
}
59+
60+
// GetByPvcUID implements the TopologyProvider interface.
61+
func (s *InMemoryStore) GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error) {
62+
if s == nil {
63+
return nil, fmt.Errorf("pvcNodeStore is nil")
64+
}
65+
s.mutex.RLock()
66+
defer s.mutex.RUnlock()
67+
info, found := s.data[pvcUID]
68+
if !found {
69+
return nil, fmt.Errorf("topology object with pvcUID '%s' not found", pvcUID)
70+
}
71+
72+
// Return a deep copy to prevent data races
73+
infoCopy := &TopologyInfo{}
74+
if info.NodeLabels != nil {
75+
infoCopy.NodeLabels = make(map[string]string)
76+
for k, v := range info.NodeLabels {
77+
infoCopy.NodeLabels[k] = v
78+
}
79+
}
80+
81+
if info.TopologyKeys != nil {
82+
infoCopy.TopologyKeys = make([]string, len(info.TopologyKeys))
83+
copy(infoCopy.TopologyKeys, info.TopologyKeys)
84+
}
85+
86+
if info.RequisiteTerms != nil {
87+
infoCopy.RequisiteTerms = make([]topologyTerm, len(info.RequisiteTerms))
88+
for i, term := range info.RequisiteTerms {
89+
newTerm := make(topologyTerm, len(term))
90+
copy(newTerm, term)
91+
infoCopy.RequisiteTerms[i] = newTerm
92+
}
93+
}
94+
95+
return infoCopy, nil
96+
}
97+
98+
// UpdateNodeLabels finds an object by pvcUID and replaces its NodeLabels.
99+
func (s *InMemoryStore) UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string) {
100+
s.mutex.Lock()
101+
defer s.mutex.Unlock()
102+
info, found := s.data[pvcUID]
103+
if !found {
104+
s.data[pvcUID] = &TopologyInfo{NodeLabels: newLabels}
105+
} else {
106+
info.NodeLabels = newLabels
107+
}
108+
}
109+
110+
// UpdateTopologyKeys finds an object by pvcUID and replaces its TopologyKeys.
111+
func (s *InMemoryStore) UpdateTopologyKeys(pvcUID types.UID, newKeys []string) {
112+
s.mutex.Lock()
113+
defer s.mutex.Unlock()
114+
info, found := s.data[pvcUID]
115+
if !found {
116+
s.data[pvcUID] = &TopologyInfo{TopologyKeys: newKeys}
117+
} else {
118+
info.TopologyKeys = newKeys
119+
}
120+
}
121+
122+
// UpdateRequisiteTerms finds an object by pvcUID and replaces its RequisiteTerms.
123+
func (s *InMemoryStore) UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm) {
124+
s.mutex.Lock()
125+
defer s.mutex.Unlock()
126+
info, found := s.data[pvcUID]
127+
if !found {
128+
s.data[pvcUID] = &TopologyInfo{RequisiteTerms: requisiteTerms}
129+
} else {
130+
info.RequisiteTerms = requisiteTerms
131+
}
132+
}

0 commit comments

Comments
 (0)