Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (p *CPUPressureSuppression) evictPodsByReclaimMetrics(now time.Time, filter
) ([]*v1alpha1.EvictPod, error) {
totalCPURequest := resource.Quantity{}
for _, pod := range filteredPods {
totalCPURequest.Add(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod)))
totalCPURequest.Add(native.CPUQuantityGetter().Get(native.SumUpPodRequestResources(pod)))
}

general.InfoS("info", "reclaim cpu request", totalCPURequest.String(), "reclaimMetrics", reclaimMetrics)
Expand Down Expand Up @@ -248,7 +248,7 @@ func (p *CPUPressureSuppression) evictPodsByReclaimMetrics(now time.Time, filter
}
}
evictPods = append(evictPods, evictPod)
totalCPURequest.Sub(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod)))
totalCPURequest.Sub(native.CPUQuantityGetter().Get(native.SumUpPodRequestResources(pod)))
}
} else {
p.lastToleranceTime.Delete(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (o *resourcePackageHintOptimizer) getResourcePackageAllocatable(resourcePac
}

// Use the native package to get CPU quantity safely
cpuQuantity := native.CPUQuantityGetter()(*pkg.Allocatable)
cpuQuantity := native.CPUQuantityGetter().Get(*pkg.Allocatable)
if cpuQuantity.IsZero() {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (p *NativePolicy) getContainerRequestedCores(allocationInfo *state.Allocati
return 0
}

cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests)
cpuQuantity := native.CPUQuantityGetter().Get(container.Resources.Requests)
allocationInfo.RequestQuantity = general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000, 0)
general.Infof("get cpu request quantity: %.3f for pod: %s/%s container: %s from podWatcher",
allocationInfo.RequestQuantity, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func GetContainerRequestedCores(metaServer *metaserver.MetaServer, allocationInf
return allocationInfo.RequestQuantity
}

cpuQuantity := native.CPUQuantityGetter()(container.Resources.Requests)
cpuQuantity := native.CPUQuantityGetter().Get(container.Resources.Requests)
metaValue := general.MaxFloat64(float64(cpuQuantity.MilliValue())/1000.0, 0)

// optimize this logic someday:
Expand Down
71 changes: 42 additions & 29 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
apiconsts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresHintHandler,
}

for _, resourceName := range policyImplement.extraResourceNames {
native.AddMemoryQuantityResource(v1.ResourceName(resourceName))
}

policyImplement.asyncLimitedWorkersMap = map[string]*asyncworker.AsyncLimitedWorkers{
memoryPluginAsyncWorkTopicMovePage: asyncworker.NewAsyncLimitedWorkers(memoryPluginAsyncWorkTopicMovePage, movePagesWorkLimit, wrappedEmitter),
}
Expand Down Expand Up @@ -1259,7 +1263,7 @@ func (p *DynamicPolicy) getPodSpecAggregatedMemoryRequestBytes(podUID string) (u

requestBytes := uint64(0)
for _, container := range pod.Spec.Containers {
containerMemoryQuantity := native.MemoryQuantityGetter()(container.Resources.Requests)
containerMemoryQuantity := native.MemoryQuantityGetter().Get(container.Resources.Requests)
requestBytes += uint64(general.Max(int(containerMemoryQuantity.Value()), 0))
}

Expand All @@ -1276,7 +1280,7 @@ func (p *DynamicPolicy) getContainerSpecMemoryRequestBytes(podUID, containerName
return 0, errors.New("container not found")
}

memoryQuantity := native.MemoryQuantityGetter()(container.Resources.Requests)
memoryQuantity := native.MemoryQuantityGetter().Get(container.Resources.Requests)
requestBytes := uint64(general.Max(int(memoryQuantity.Value()), 0))

return requestBytes, nil
Expand All @@ -1298,46 +1302,55 @@ func (p *DynamicPolicy) hasLastLevelEnhancementKey(lastLevelEnhancementKey strin
}

func (p *DynamicPolicy) checkNonBindingShareCoresMemoryResource(req *pluginapi.ResourceRequest) (bool, error) {
reqInt, _, err := util.GetPodAggregatedRequestResource(req)
reqResources, _, err := util.GetPodAggregatedRequestResourceMap(req)
if err != nil {
return false, fmt.Errorf("GetQuantityMapFromResourceReq failed with error: %v", err)
return false, fmt.Errorf("GetPodAggregatedRequestResourceMap failed with error: %v", err)
}

shareCoresAllocated := uint64(reqInt)
podEntries := p.state.GetPodResourceEntries()[v1.ResourceMemory]
for podUid, containerEntries := range podEntries {
for _, containerAllocation := range containerEntries {
// skip the current pod
if podUid == req.PodUid {
continue
}
// shareCoresAllocated should involve both main and sidecar containers
if containerAllocation.CheckDedicated() && !containerAllocation.CheckNUMABinding() {
shareCoresAllocated += p.getContainerRequestedMemoryBytes(containerAllocation)
podResourceEntries := p.state.GetPodResourceEntries()
machineState := p.state.GetMachineState()

for resourceName, reqInt := range reqResources {
shareCoresAllocated := uint64(reqInt)
podEntries := podResourceEntries[resourceName]
for podUid, containerEntries := range podEntries {
for _, containerAllocation := range containerEntries {
// skip the current pod
if podUid == req.PodUid {
continue
}
// shareCoresAllocated should involve both main and sidecar containers
if containerAllocation.CheckDedicated() && !containerAllocation.CheckNUMABinding() {
shareCoresAllocated += p.getContainerRequestedMemoryBytes(containerAllocation)
}
}
}
}

machineState := p.state.GetMachineState()
resourceState := machineState[v1.ResourceMemory]
numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
numaAllocatableWithoutNUMABindingPods := uint64(0)
for _, numaID := range numaWithoutNUMABindingPods.ToSliceInt() {
numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable
}
resourceState := machineState[resourceName]
if resourceState == nil {
return false, fmt.Errorf("resourceState is nil for resource: %s", resourceName)
}

numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
numaAllocatableWithoutNUMABindingPods := uint64(0)
for _, numaID := range numaWithoutNUMABindingPods.ToSliceInt() {
numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable
}

general.Infof("[checkNonBindingShareCoresMemoryResource] node memory allocated: %d, allocatable: %d", shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
if shareCoresAllocated > numaAllocatableWithoutNUMABindingPods {
general.Warningf("[checkNonBindingShareCoresMemoryResource] no enough memory resource for non-binding share cores pod: %s/%s, container: %s (allocated: %d, allocatable: %d)",
req.PodNamespace, req.PodName, req.ContainerName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
return false, nil
general.Infof("[checkNonBindingShareCoresMemoryResource] resource: %s allocated: %d, allocatable: %d",
resourceName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
if shareCoresAllocated > numaAllocatableWithoutNUMABindingPods {
general.Warningf("[checkNonBindingShareCoresMemoryResource] no enough resource for non-binding share cores pod: %s/%s, container: %s, resource: %s (allocated: %d, allocatable: %d)",
req.PodNamespace, req.PodName, req.ContainerName, resourceName, shareCoresAllocated, numaAllocatableWithoutNUMABindingPods)
return false, nil
}
}

general.InfoS("checkNonBindingShareCoresMemoryResource memory successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
"reqInt", reqInt)
"reqResources", reqResources)

return true, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ import (

var errNoAvailableMemoryHints = fmt.Errorf("no available memory hints")

func noPreferenceResourceHints(req *pluginapi.ResourceRequest) map[string]*pluginapi.ListOfTopologyHints {
resourceHints := make(map[string]*pluginapi.ListOfTopologyHints, len(req.ResourceRequests))
for resourceName := range req.ResourceRequests {
resourceHints[resourceName] = nil
}

if len(resourceHints) == 0 {
resourceHints[string(v1.ResourceMemory)] = nil
}

return resourceHints
}

func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
req *pluginapi.ResourceRequest,
) (*pluginapi.ResourceHintsResponse, error) {
Expand Down Expand Up @@ -71,10 +84,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
}
}

return util.PackResourceHintsResponse(req, string(v1.ResourceMemory),
map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): nil, // indicates that there is no numa preference
})
return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), noPreferenceResourceHints(req))
}

func (p *DynamicPolicy) systemCoresHintHandler(_ context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) {
Expand Down Expand Up @@ -133,10 +143,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context,
// currently, we set cpuset of sidecar to the cpuset of its main container,
// so there is no numa preference here.
if req.ContainerType == pluginapi.ContainerType_SIDECAR {
return util.PackResourceHintsResponse(req, string(v1.ResourceMemory),
map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): nil,
})
return util.PackResourceHintsResponse(req, string(v1.ResourceMemory), noPreferenceResourceHints(req))
}

requestedResources, _, err := util.GetPodAggregatedRequestResourceMap(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) {
PodName: "pod1",
ContainerName: "container1",
ContainerType: pluginapi.ContainerType_SIDECAR,
ResourceRequests: map[string]float64{
string(v1.ResourceMemory): 1024 * 1024 * 1024,
"hugepages-2Mi": 2 * 1024 * 1024 * 1024,
},
},
},
wantErr: false,
Expand All @@ -78,6 +82,7 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) {
ResourceName: string(v1.ResourceMemory),
ResourceHints: map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): nil,
"hugepages-2Mi": nil,
},
},
},
Expand Down Expand Up @@ -628,3 +633,90 @@ func TestDynamicPolicy_numaBindingHintHandler(t *testing.T) {
})
}
}

func TestDynamicPolicy_sharedCoresHintHandlerNoPreferenceIncludesRequestedResources(t *testing.T) {
t.Parallel()

tmpDir, err := os.MkdirTemp("", "checkpoint-TestSharedCoresHintHandlerNoPreference")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
require.NoError(t, err)

machineInfo := &info.MachineInfo{
Topology: []info.Node{
{
Id: 0,
Memory: 100 * 1024 * 1024 * 1024,
HugePages: []info.HugePagesInfo{
{
PageSize: 2 * 1024,
NumPages: 1024,
},
},
},
{
Id: 1,
Memory: 100 * 1024 * 1024 * 1024,
HugePages: []info.HugePagesInfo{
{
PageSize: 2 * 1024,
NumPages: 1024,
},
},
},
{
Id: 2,
Memory: 100 * 1024 * 1024 * 1024,
HugePages: []info.HugePagesInfo{
{
PageSize: 2 * 1024,
NumPages: 1024,
},
},
},
{
Id: 3,
Memory: 100 * 1024 * 1024 * 1024,
HugePages: []info.HugePagesInfo{
{
PageSize: 2 * 1024,
NumPages: 1024,
},
},
},
},
}

policy, err := getTestDynamicPolicyWithExtraResourcesWithInitialization(cpuTopology, machineInfo, tmpDir)
require.NoError(t, err)

req := &pluginapi.ResourceRequest{
PodUid: "pod1_uid",
PodName: "pod1",
ContainerName: "container1",
ContainerType: pluginapi.ContainerType_MAIN,
ResourceName: string(v1.ResourceMemory),
ResourceRequests: map[string]float64{
string(v1.ResourceMemory): 1024 * 1024 * 1024,
"hugepages-2Mi": 2 * 1024 * 1024 * 1024,
},
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores,
},
}

resp, err := policy.sharedCoresHintHandler(context.Background(), req)
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): nil,
"hugepages-2Mi": nil,
}, resp.ResourceHints)

req.ResourceRequests["hugepages-2Mi"] = 10 * 1024 * 1024 * 1024
resp, err = policy.sharedCoresHintHandler(context.Background(), req)
require.ErrorIs(t, err, errNoAvailableMemoryHints)
require.Nil(t, resp)
}
Loading
Loading