diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_suppression.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_suppression.go index a93009ea79..418f837160 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_suppression.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_suppression.go @@ -212,7 +212,7 @@ func (p *CPUPressureSuppression) evictPodsByReclaimMetrics(now time.Time, filter ) ([]*v1alpha1.EvictPod, error) { totalCPURequest := resource.Quantity{} for _, pod := range filteredPods { - totalCPURequest.Add(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod))) + totalCPURequest.Add(native.CPUQuantityGetter().Get(native.SumUpPodRequestResources(pod))) } general.InfoS("info", "reclaim cpu request", totalCPURequest.String(), "reclaimMetrics", reclaimMetrics) @@ -248,7 +248,7 @@ func (p *CPUPressureSuppression) evictPodsByReclaimMetrics(now time.Time, filter } } evictPods = append(evictPods, evictPod) - totalCPURequest.Sub(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod))) + totalCPURequest.Sub(native.CPUQuantityGetter().Get(native.SumUpPodRequestResources(pod))) } } else { p.lastToleranceTime.Delete(key) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/resourcepackage/optimizer.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/resourcepackage/optimizer.go index 23ca20e774..d0326f85a9 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/resourcepackage/optimizer.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/resourcepackage/optimizer.go @@ -176,7 +176,7 @@ func (o *resourcePackageHintOptimizer) getResourcePackageAllocatable(resourcePac } // Use the native package to get CPU quantity safely - cpuQuantity := native.CPUQuantityGetter()(*pkg.Allocatable) + cpuQuantity := native.CPUQuantityGetter().Get(*pkg.Allocatable) if cpuQuantity.IsZero() { continue } diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go index 3417f17615..bdac2d6eed 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go @@ -652,7 +652,7 @@ func (p *NativePolicy) getContainerRequestedCores(allocationInfo *state.Allocati return 0 } - cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests) + cpuQuantity := native.CPUQuantityGetter().Get(container.Resources.Requests) allocationInfo.RequestQuantity = general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000, 0) general.Infof("get cpu request quantity: %.3f for pod: %s/%s container: %s from podWatcher", allocationInfo.RequestQuantity, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName) diff --git a/pkg/agent/qrm-plugins/cpu/util/util.go b/pkg/agent/qrm-plugins/cpu/util/util.go index 3fcb888430..ed6ddc1827 100644 --- a/pkg/agent/qrm-plugins/cpu/util/util.go +++ b/pkg/agent/qrm-plugins/cpu/util/util.go @@ -228,7 +228,7 @@ func GetContainerRequestedCores(metaServer *metaserver.MetaServer, allocationInf return allocationInfo.RequestQuantity } - cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests) + cpuQuantity := native.CPUQuantityGetter().Get(container.Resources.Requests) metaValue := general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000.0, 0) // optimize this logic someday: diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 8a71493811..69807671c5 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -257,6 +257,10 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration apiconsts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresHintHandler, } + for _, resourceName := range policyImplement.extraResourceNames { + native.AddMemoryQuantityResource(v1.ResourceName(resourceName)) + } + policyImplement.asyncLimitedWorkersMap = map[string]*asyncworker.AsyncLimitedWorkers{ memoryPluginAsyncWorkTopicMovePage: asyncworker.NewAsyncLimitedWorkers(memoryPluginAsyncWorkTopicMovePage, movePagesWorkLimit, wrappedEmitter), } @@ -1259,7 +1263,7 @@ func (p *DynamicPolicy) getPodSpecAggregatedMemoryRequestBytes(podUID string) (u requestBytes := uint64(0) for _, container := range pod.Spec.Containers { - containerMemoryQuantity := native.MemoryQuantityGetter()(container.Resources.Requests) + containerMemoryQuantity := native.MemoryQuantityGetter().Get(container.Resources.Requests) requestBytes += uint64(general.Max(int(containerMemoryQuantity.Value()), 0)) } @@ -1276,7 +1280,7 @@ func (p *DynamicPolicy) getContainerSpecMemoryRequestBytes(podUID, containerName return 0, errors.New("container not found") } - memoryQuantity := native.MemoryQuantityGetter()(container.Resources.Requests) + memoryQuantity := native.MemoryQuantityGetter().Get(container.Resources.Requests) requestBytes := uint64(general.Max(int(memoryQuantity.Value()), 0)) return requestBytes, nil @@ -1298,46 +1302,55 @@ func (p *DynamicPolicy) hasLastLevelEnhancementKey(lastLevelEnhancementKey strin } func (p *DynamicPolicy) checkNonBindingShareCoresMemoryResource(req *pluginapi.ResourceRequest) (bool, error) { - reqInt, _, err := util.GetPodAggregatedRequestResource(req) + reqResources, _, err := util.GetPodAggregatedRequestResourceMap(req) if err != nil { - return false, fmt.Errorf("GetQuantityMapFromResourceReq failed with error: %v", err) + return false, fmt.Errorf("GetPodAggregatedRequestResourceMap failed with error: %v", err) } - shareCoresAllocated := uint64(reqInt) - podEntries := p.state.GetPodResourceEntries()[v1.ResourceMemory] - for podUid, containerEntries := range podEntries { - for _, containerAllocation := range containerEntries { - // skip the current pod - if podUid == req.PodUid { - continue - } - // shareCoresAllocated should involve both main and sidecar containers - if containerAllocation.CheckDedicated() && !containerAllocation.CheckNUMABinding() { - shareCoresAllocated += p.getContainerRequestedMemoryBytes(containerAllocation) + podResourceEntries := p.state.GetPodResourceEntries() + machineState := p.state.GetMachineState() + + for resourceName, reqInt := range reqResources { + shareCoresAllocated := uint64(reqInt) + podEntries := podResourceEntries[resourceName] + for podUid, containerEntries := range podEntries { + for _, containerAllocation := range containerEntries { + // skip the current pod + if podUid == req.PodUid { + continue + } + // shareCoresAllocated should involve both main and sidecar containers + if containerAllocation.CheckDedicated() && !containerAllocation.CheckNUMABinding() { + shareCoresAllocated += p.getContainerRequestedMemoryBytes(containerAllocation) + } } } - } - machineState := p.state.GetMachineState() - resourceState := machineState[v1.ResourceMemory] - numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods() - numaAllocatableWithoutNUMABindingPods := uint64(0) - for _, numaID := range numaWithoutNUMABindingPods.ToSliceInt() { - numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable - } + resourceState := machineState[resourceName] + if resourceState == nil { + return false, fmt.Errorf("resourceState is nil for resource: %s", resourceName) + } + + numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods() + numaAllocatableWithoutNUMABindingPods := uint64(0) + for _, numaID := range numaWithoutNUMABindingPods.ToSliceInt() { + numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable + } - general.Infof("[checkNonBindingShareCoresMemoryResource] node memory allocated: %d, allocatable: %d", shareCoresAllocated, numaAllocatableWithoutNUMABindingPods) - if shareCoresAllocated > numaAllocatableWithoutNUMABindingPods { - general.Warningf("[checkNonBindingShareCoresMemoryResource] no enough memory resource for non-binding share cores pod: %s/%s, container: %s (allocated: %d, allocatable: %d)", - req.PodNamespace, req.PodName, req.ContainerName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods) - return false, nil + general.Infof("[checkNonBindingShareCoresMemoryResource] resource: %s allocated: %d, allocatable: %d", + resourceName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods) + if shareCoresAllocated > numaAllocatableWithoutNUMABindingPods { + general.Warningf("[checkNonBindingShareCoresMemoryResource] no enough resource for non-binding share cores pod: %s/%s, container: %s, resource: %s (allocated: %d, allocatable: %d)", + req.PodNamespace, req.PodName, req.ContainerName, resourceName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods) + return false, nil + } } general.InfoS("checkNonBindingShareCoresMemoryResource memory successfully", "podNamespace", req.PodNamespace, "podName", req.PodName, "containerName", req.ContainerName, - "reqInt", reqInt) + "reqResources", reqResources) return true, nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go index 6cabeffebe..e96405b86e 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go @@ -39,6 +39,19 @@ import ( var errNoAvailableMemoryHints = fmt.Errorf("no available memory hints") +func noPreferenceResourceHints(req *pluginapi.ResourceRequest) map[string]*pluginapi.ListOfTopologyHints { + resourceHints := make(map[string]*pluginapi.ListOfTopologyHints, len(req.ResourceRequests)) + for resourceName := range req.ResourceRequests { + resourceHints[resourceName] = nil + } + + if len(resourceHints) == 0 { + resourceHints[string(v1.ResourceMemory)] = nil + } + + return resourceHints +} + func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, req *pluginapi.ResourceRequest, ) (*pluginapi.ResourceHintsResponse, error) { @@ -71,10 +84,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, } } - return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), - map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceMemory): nil, // indicates that there is no numa preference - }) + return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), noPreferenceResourceHints(req)) } func (p *DynamicPolicy) systemCoresHintHandler(_ context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) { @@ -133,10 +143,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context, // currently, we set cpuset of sidecar to the cpuset of its main container, // so there is no numa preference here. if req.ContainerType == pluginapi.ContainerType_SIDECAR { - return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), - map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceMemory): nil, - }) + return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), noPreferenceResourceHints(req)) } requestedResources, _, err := util.GetPodAggregatedRequestResourceMap(req) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers_test.go index 53aeb7b0e0..087dd728d1 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers_test.go @@ -67,6 +67,10 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) { PodName: "pod1", ContainerName: "container1", ContainerType: pluginapi.ContainerType_SIDECAR, + ResourceRequests: map[string]float64{ + string(v1.ResourceMemory): 1024 * 1024 * 1024, + "hugepages-2Mi": 2 * 1024 * 1024 * 1024, + }, }, }, wantErr: false, @@ -78,6 +82,7 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) { ResourceName: string(v1.ResourceMemory), ResourceHints: map[string]*pluginapi.ListOfTopologyHints{ string(v1.ResourceMemory): nil, + "hugepages-2Mi": nil, }, }, }, @@ -628,3 +633,90 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) { }) } } + +func TestDynamicPolicy_sharedCoresHintHandlerNoPreferenceIncludesRequestedResources(t *testing.T) { + t.Parallel() + + tmpDir, err := os.MkdirTemp("", "checkpoint-TestSharedCoresHintHandlerNoPreference") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) + require.NoError(t, err) + + machineInfo := &info.MachineInfo{ + Topology: []info.Node{ + { + Id: 0, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 1, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 2, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 3, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + }, + } + + policy, err := getTestDynamicPolicyWithExtraResourcesWithInitialization(cpuTopology, machineInfo, tmpDir) + require.NoError(t, err) + + req := &pluginapi.ResourceRequest{ + PodUid: "pod1_uid", + PodName: "pod1", + ContainerName: "container1", + ContainerType: pluginapi.ContainerType_MAIN, + ResourceName: string(v1.ResourceMemory), + ResourceRequests: map[string]float64{ + string(v1.ResourceMemory): 1024 * 1024 * 1024, + "hugepages-2Mi": 2 * 1024 * 1024 * 1024, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + } + + resp, err := policy.sharedCoresHintHandler(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceMemory): nil, + "hugepages-2Mi": nil, + }, resp.ResourceHints) + + req.ResourceRequests["hugepages-2Mi"] = 10 * 1024 * 1024 * 1024 + resp, err = policy.sharedCoresHintHandler(context.Background(), req) + require.ErrorIs(t, err, errNoAvailableMemoryHints) + require.Nil(t, resp) +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 00d75a4534..72f3744f74 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -52,6 +52,7 @@ import ( maputil "k8s.io/kubernetes/pkg/util/maps" "github.com/kubewharf/katalyst-api/pkg/consts" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" katalystbase "github.com/kubewharf/katalyst-core/cmd/base" appagent "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" @@ -5424,6 +5425,79 @@ func Test_getContainerRequestedMemoryBytes(t *testing.T) { as.Equal(uint64(0), dynamicPolicy.getContainerRequestedMemoryBytes(pod3Container4Allocation)) } +func Test_getContainerRequestedMemoryBytes_EarlyReturns(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + policy *DynamicPolicy + allocationInfo *state.AllocationInfo + expected uint64 + }{ + { + name: "nil allocationInfo returns zero", + policy: &DynamicPolicy{}, + expected: 0, + }, + { + name: "nil metaServer returns aggregated quantity", + policy: &DynamicPolicy{}, + allocationInfo: &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-uid", + PodName: "pod-name", + ContainerName: "container-name", + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + }, + AggregatedQuantity: 2048, + }, + expected: 2048, + }, + { + name: "aggregated requests annotation returns aggregated quantity", + policy: &DynamicPolicy{metaServer: &metaserver.MetaServer{}}, + allocationInfo: &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-uid", + PodName: "pod-name", + ContainerName: "container-name", + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + Annotations: map[string]string{ + apiconsts.PodAnnotationAggregatedRequestsKey: "present", + }, + }, + AggregatedQuantity: 4096, + }, + expected: 4096, + }, + { + name: "inplace resize annotation returns aggregated quantity", + policy: &DynamicPolicy{metaServer: &metaserver.MetaServer{}}, + allocationInfo: &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-uid", + PodName: "pod-name", + ContainerName: "container-name", + QoSLevel: consts.PodAnnotationQoSLevelSharedCores, + Annotations: map[string]string{ + apiconsts.PodAnnotationInplaceUpdateResizingKey: "true", + }, + }, + AggregatedQuantity: 8192, + }, + expected: 8192, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.expected, tt.policy.getContainerRequestedMemoryBytes(tt.allocationInfo)) + }) + } +} + func Test_adjustAllocationEntries(t *testing.T) { t.Parallel() @@ -5695,6 +5769,89 @@ func Test_adjustAllocationEntries(t *testing.T) { as.Equal(uint64(6979321856), machineStateNew[v1.ResourceMemory][2].Free) } +func TestDynamicPolicy_checkNonBindingShareCoresMemoryResource(t *testing.T) { + t.Parallel() + + tmpDir, err := os.MkdirTemp("", "checkpoint-TestCheckNonBindingShareCoresMemoryResource") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) + require.NoError(t, err) + + machineInfo := &info.MachineInfo{ + Topology: []info.Node{ + { + Id: 0, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 1, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 2, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + { + Id: 3, + Memory: 100 * 1024 * 1024 * 1024, + HugePages: []info.HugePagesInfo{ + { + PageSize: 2 * 1024, + NumPages: 1024, + }, + }, + }, + }, + } + + dynamicPolicy, err := getTestDynamicPolicyWithExtraResourcesWithInitialization(cpuTopology, machineInfo, tmpDir) + require.NoError(t, err) + + req := &pluginapi.ResourceRequest{ + PodUid: "pod1_uid", + PodName: "pod1", + ContainerName: "container1", + ContainerType: pluginapi.ContainerType_MAIN, + ResourceName: string(v1.ResourceMemory), + ResourceRequests: map[string]float64{ + string(v1.ResourceMemory): 1024 * 1024 * 1024, + "hugepages-2Mi": 2 * 1024 * 1024 * 1024, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + } + + ok, err := dynamicPolicy.checkNonBindingShareCoresMemoryResource(req) + require.NoError(t, err) + require.True(t, ok) + + req.ResourceRequests["hugepages-2Mi"] = 10 * 1024 * 1024 * 1024 + ok, err = dynamicPolicy.checkNonBindingShareCoresMemoryResource(req) + require.NoError(t, err) + require.False(t, ok) +} + type mockMemoryAdvisor struct { mock.Mock advisorsvc.AdvisorServiceServer diff --git a/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go b/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go index 17da950219..58c0f1512f 100644 --- a/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go +++ b/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go @@ -1182,7 +1182,7 @@ func (p *topologyAdapterImpl) addContainerMemoryBandwidth(zoneAllocated map[util return nil, err } - cpuRequest := native.CPUQuantityGetter()(spec.Resources.Requests) + cpuRequest := native.CPUQuantityGetter().Get(spec.Resources.Requests) if cpuRequest.IsZero() { return zoneAllocated, nil } diff --git a/pkg/util/native/pod_sorter.go b/pkg/util/native/pod_sorter.go index d4834976d6..cf845fdd09 100644 --- a/pkg/util/native/pod_sorter.go +++ b/pkg/util/native/pod_sorter.go @@ -61,8 +61,8 @@ func PodCPURequestCmpFunc(i1, i2 interface{}) int { p1Request := SumUpPodRequestResources(i1.(*v1.Pod)) p2Request := SumUpPodRequestResources(i2.(*v1.Pod)) - p1CPUQuantity := CPUQuantityGetter()(p1Request) - p2CPUQuantity := CPUQuantityGetter()(p2Request) + p1CPUQuantity := CPUQuantityGetter().Get(p1Request) + p2CPUQuantity := CPUQuantityGetter().Get(p2Request) return p1CPUQuantity.Cmp(p2CPUQuantity) } diff --git a/pkg/util/native/resources.go b/pkg/util/native/resources.go index 29c9fd8e25..bf0c870d61 100644 --- a/pkg/util/native/resources.go +++ b/pkg/util/native/resources.go @@ -32,52 +32,103 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metrics" ) -type QuantityGetter func(resourceList v1.ResourceList) resource.Quantity +type QuantityResolveFn func(resourceList v1.ResourceList, resourceNames []v1.ResourceName) resource.Quantity + +type QuantityGetter struct { + resolve QuantityResolveFn + resourceNames []v1.ResourceName +} var ( quantityGetterMutex = sync.RWMutex{} - cpuQuantityGetter = DefaultCPUQuantityGetter - memoryQuantityGetter = DefaultMemoryQuantityGetter + cpuQuantityGetter = NewQuantityGetter(DefaultCPUQuantityGetter, v1.ResourceCPU, consts.ReclaimedResourceMilliCPU) + memoryQuantityGetter = NewQuantityGetter(DefaultMemoryQuantityGetter, v1.ResourceMemory, consts.ReclaimedResourceMemory) ) -func CPUQuantityGetter() QuantityGetter { +func NewQuantityGetter(resolve QuantityResolveFn, resourceNames ...v1.ResourceName) *QuantityGetter { + return &QuantityGetter{ + resolve: resolve, + resourceNames: append([]v1.ResourceName(nil), resourceNames...), + } +} + +func (q *QuantityGetter) Get(resourceList v1.ResourceList) resource.Quantity { + if q == nil || q.resolve == nil { + return resource.Quantity{} + } + + return q.resolve(resourceList, q.resourceNames) +} + +func (q *QuantityGetter) WithAddedResource(resourceName v1.ResourceName) *QuantityGetter { + if q == nil { + return NewQuantityGetter(nil, resourceName) + } + + for _, existing := range q.resourceNames { + if existing == resourceName { + return q + } + } + + resourceNames := append([]v1.ResourceName(nil), q.resourceNames...) + resourceNames = append(resourceNames, resourceName) + return NewQuantityGetter(q.resolve, resourceNames...) +} + +func CPUQuantityGetter() *QuantityGetter { quantityGetterMutex.RLock() defer quantityGetterMutex.RUnlock() return cpuQuantityGetter } -func SetCPUQuantityGetter(getter QuantityGetter) { +func SetCPUQuantityGetter(getter *QuantityGetter) { quantityGetterMutex.Lock() defer quantityGetterMutex.Unlock() cpuQuantityGetter = getter } -func MemoryQuantityGetter() QuantityGetter { +func MemoryQuantityGetter() *QuantityGetter { quantityGetterMutex.RLock() defer quantityGetterMutex.RUnlock() return memoryQuantityGetter } -func SetMemoryQuantityGetter(getter QuantityGetter) { +func SetMemoryQuantityGetter(getter *QuantityGetter) { quantityGetterMutex.Lock() defer quantityGetterMutex.Unlock() memoryQuantityGetter = getter } +func AddCPUQuantityResource(resourceName v1.ResourceName) { + quantityGetterMutex.Lock() + defer quantityGetterMutex.Unlock() + + cpuQuantityGetter = cpuQuantityGetter.WithAddedResource(resourceName) +} + +func AddMemoryQuantityResource(resourceName v1.ResourceName) { + quantityGetterMutex.Lock() + defer quantityGetterMutex.Unlock() + + memoryQuantityGetter = memoryQuantityGetter.WithAddedResource(resourceName) +} + // DefaultCPUQuantityGetter returns cpu quantity for resourceList. since we may have // different representations for cpu resource name, the prioritizes will be: // native cpu name -> reclaimed milli cpu name -func DefaultCPUQuantityGetter(resourceList v1.ResourceList) resource.Quantity { - if quantity, ok := resourceList[v1.ResourceCPU]; ok { - return quantity - } - - if quantity, ok := resourceList[consts.ReclaimedResourceMilliCPU]; ok { - return *resource.NewMilliQuantity(quantity.Value(), quantity.Format) +func DefaultCPUQuantityGetter(resourceList v1.ResourceList, resourceNames []v1.ResourceName) resource.Quantity { + for _, resourceName := range resourceNames { + if quantity, ok := resourceList[resourceName]; ok { + if resourceName == consts.ReclaimedResourceMilliCPU { + return *resource.NewMilliQuantity(quantity.Value(), quantity.Format) + } + return quantity + } } return resource.Quantity{} @@ -86,13 +137,11 @@ func DefaultCPUQuantityGetter(resourceList v1.ResourceList) resource.Quantity { // DefaultMemoryQuantityGetter returns memory quantity for resourceList. since we may have // different representations for memory resource name, the prioritizes will be: // native memory name -> reclaimed memory name -func DefaultMemoryQuantityGetter(resourceList v1.ResourceList) resource.Quantity { - if quantity, ok := resourceList[v1.ResourceMemory]; ok { - return quantity - } - - if quantity, ok := resourceList[consts.ReclaimedResourceMemory]; ok { - return quantity +func DefaultMemoryQuantityGetter(resourceList v1.ResourceList, resourceNames []v1.ResourceName) resource.Quantity { + for _, resourceName := range resourceNames { + if quantity, ok := resourceList[resourceName]; ok { + return quantity + } } return resource.Quantity{} diff --git a/pkg/util/native/resources_test.go b/pkg/util/native/resources_test.go index 90102abacb..d624bb983b 100644 --- a/pkg/util/native/resources_test.go +++ b/pkg/util/native/resources_test.go @@ -23,6 +23,8 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubewharf/katalyst-api/pkg/consts" ) var makePod = func(name string, request, limits v1.ResourceList) *v1.Pod { @@ -43,6 +45,14 @@ var makePod = func(name string, request, limits v1.ResourceList) *v1.Pod { return pod } +func getQuantityGetterResources(getter *QuantityGetter) []v1.ResourceName { + if getter == nil { + return nil + } + + return append([]v1.ResourceName(nil), getter.resourceNames...) +} + func TestNeedUpdateResources(t *testing.T) { t.Parallel() @@ -259,3 +269,61 @@ func TestAggregateAvgQuantities(t *testing.T) { }) } } + +func TestQuantityGetter(t *testing.T) { + t.Parallel() + + orig := NewQuantityGetter(DefaultMemoryQuantityGetter, v1.ResourceMemory) + getter := orig.WithAddedResource("hugepages-2Mi") + + assert.Equal(t, []v1.ResourceName{v1.ResourceMemory}, getQuantityGetterResources(orig)) + assert.Equal(t, []v1.ResourceName{v1.ResourceMemory, "hugepages-2Mi"}, getQuantityGetterResources(getter)) + + got := getter.Get(v1.ResourceList{ + "hugepages-2Mi": resource.MustParse("2Gi"), + }) + assert.True(t, got.Equal(resource.MustParse("2Gi"))) + + clone := NewQuantityGetter(getter.resolve, "hugepages-1Gi") + assert.Equal(t, []v1.ResourceName{"hugepages-1Gi"}, getQuantityGetterResources(clone)) + assert.Equal(t, []v1.ResourceName{v1.ResourceMemory, "hugepages-2Mi"}, getQuantityGetterResources(getter)) +} + +func TestGlobalQuantityGetterResourceMutation(t *testing.T) { + t.Parallel() + + origMemoryGetter := MemoryQuantityGetter() + origCPUGetter := CPUQuantityGetter() + + t.Cleanup(func() { + SetMemoryQuantityGetter(origMemoryGetter) + SetCPUQuantityGetter(origCPUGetter) + }) + + SetMemoryQuantityGetter(NewQuantityGetter(DefaultMemoryQuantityGetter, v1.ResourceMemory, consts.ReclaimedResourceMemory)) + SetCPUQuantityGetter(NewQuantityGetter(DefaultCPUQuantityGetter, v1.ResourceCPU, consts.ReclaimedResourceMilliCPU)) + + beforeMemoryGetter := MemoryQuantityGetter() + AddMemoryQuantityResource("hugepages-2Mi") + AddMemoryQuantityResource("hugepages-2Mi") + afterMemoryGetter := MemoryQuantityGetter() + assert.NotSame(t, beforeMemoryGetter, afterMemoryGetter) + assert.Equal(t, []v1.ResourceName{v1.ResourceMemory, consts.ReclaimedResourceMemory, "hugepages-2Mi"}, getQuantityGetterResources(afterMemoryGetter)) + + memoryQuantity := MemoryQuantityGetter().Get(v1.ResourceList{ + "hugepages-2Mi": resource.MustParse("2Gi"), + }) + assert.True(t, memoryQuantity.Equal(resource.MustParse("2Gi"))) + + beforeCPUGetter := CPUQuantityGetter() + AddCPUQuantityResource("batch.cpu") + AddCPUQuantityResource("batch.cpu") + afterCPUGetter := CPUQuantityGetter() + assert.NotSame(t, beforeCPUGetter, afterCPUGetter) + assert.Equal(t, []v1.ResourceName{v1.ResourceCPU, consts.ReclaimedResourceMilliCPU, "batch.cpu"}, getQuantityGetterResources(afterCPUGetter)) + + cpuQuantity := CPUQuantityGetter().Get(v1.ResourceList{ + consts.ReclaimedResourceMilliCPU: *resource.NewQuantity(2500, resource.DecimalSI), + }) + assert.True(t, cpuQuantity.Equal(*resource.NewMilliQuantity(2500, resource.DecimalSI))) +}