Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v1/gpu_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

// GPUStatus defines the observed state of GPU.
// NOTE: When new fields added, remember to update syncGPUMetadataAndStatusFromCluster
type GPUStatus struct {
// +kubebuilder:default=Pending
Phase TensorFusionGPUPhase `json:"phase"`
Expand Down
4 changes: 3 additions & 1 deletion charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ spec:
metadata:
type: object
status:
description: GPUStatus defines the observed state of GPU.
description: |-
GPUStatus defines the observed state of GPU.
NOTE: When new fields added, remember to update syncGPUMetadataAndStatusFromCluster
properties:
available:
properties:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/tensor-fusion.ai_gpus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ spec:
metadata:
type: object
status:
description: GPUStatus defines the observed state of GPU.
description: |-
GPUStatus defines the observed state of GPU.
NOTE: When new fields added, remember to update syncGPUMetadataAndStatusFromCluster
properties:
available:
properties:
Expand Down
8 changes: 6 additions & 2 deletions internal/gpuallocator/gpuallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,9 @@ func syncGPUMetadataAndStatusFromCluster(old *tfv1.GPU, gpu *tfv1.GPU) {
old.Status.NodeSelector = gpu.Status.NodeSelector
old.Status.GPUModel = gpu.Status.GPUModel
old.Status.UsedBy = gpu.Status.UsedBy
old.Status.Vendor = gpu.Status.Vendor
old.Status.NUMANode = gpu.Status.NUMANode
old.Status.Index = gpu.Status.Index
}

func (s *GpuAllocator) handleGPUUpdateCapacityDiff(old, gpu *tfv1.GPU) {
Expand Down Expand Up @@ -1437,13 +1440,14 @@ func removeRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace
}

func (s *GpuAllocator) ComposeAllocationRequest(pod *v1.Pod) (*tfv1.AllocRequest, string, error) {
// allow Pods with no requests/limits to use TensorFusion, Pod webhook will ensure at least one request/limit is set
gpuRequestResource, err := utils.GetGPUResource(pod, true)
if err != nil {
return &tfv1.AllocRequest{}, "invalid gpu request annotation", err
log.FromContext(s.ctx).Error(err, "Invalid gpu request annotation", "pod", pod.Name, "namespace", pod.Namespace)
}
gpuLimitResource, err := utils.GetGPUResource(pod, false)
if err != nil {
return &tfv1.AllocRequest{}, "invalid gpu limit annotation", err
log.FromContext(s.ctx).Error(err, "Invalid gpu limit annotation", "pod", pod.Name, "namespace", pod.Namespace)
}

count := 1
Expand Down
29 changes: 20 additions & 9 deletions internal/utils/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,37 @@ func AddOrOverrideTFClientMissingAnnotationsBeforePatch(pod *v1.Pod, tfInfo Tens
}

// add full annotations
pod.Annotations[constants.TFLOPSLimitAnnotation] = tfInfo.Profile.Resources.Limits.Tflops.String()
pod.Annotations[constants.VRAMLimitAnnotation] = tfInfo.Profile.Resources.Limits.Vram.String()
if tfInfo.Profile.Qos == "" {
pod.Annotations[constants.QoSLevelAnnotation] = string(tfv1.QoSMedium)
} else {
pod.Annotations[constants.QoSLevelAnnotation] = string(tfInfo.Profile.Qos)
if !tfInfo.Profile.Resources.Limits.Tflops.IsZero() {
pod.Annotations[constants.TFLOPSLimitAnnotation] = tfInfo.Profile.Resources.Limits.Tflops.String()
}
if !tfInfo.Profile.Resources.Limits.Vram.IsZero() {
pod.Annotations[constants.VRAMLimitAnnotation] = tfInfo.Profile.Resources.Limits.Vram.String()
}
if !tfInfo.Profile.Resources.Requests.Tflops.IsZero() {
pod.Annotations[constants.TFLOPSRequestAnnotation] = tfInfo.Profile.Resources.Requests.Tflops.String()
}
if !tfInfo.Profile.Resources.Requests.Vram.IsZero() {
pod.Annotations[constants.VRAMRequestAnnotation] = tfInfo.Profile.Resources.Requests.Vram.String()
}
pod.Annotations[constants.TFLOPSRequestAnnotation] = tfInfo.Profile.Resources.Requests.Tflops.String()
pod.Annotations[constants.VRAMRequestAnnotation] = tfInfo.Profile.Resources.Requests.Vram.String()

if !tfInfo.Profile.Resources.Requests.ComputePercent.IsZero() {
pod.Annotations[constants.ComputeRequestAnnotation] = tfInfo.Profile.Resources.Requests.ComputePercent.String()
}
if !tfInfo.Profile.Resources.Limits.ComputePercent.IsZero() {
pod.Annotations[constants.ComputeLimitAnnotation] = tfInfo.Profile.Resources.Limits.ComputePercent.String()
}
if tfInfo.Profile.Qos == "" {
pod.Annotations[constants.QoSLevelAnnotation] = string(tfv1.QoSMedium)
} else {
pod.Annotations[constants.QoSLevelAnnotation] = string(tfInfo.Profile.Qos)
}
pod.Annotations[constants.GpuCountAnnotation] = fmt.Sprintf("%d", tfInfo.Profile.GPUCount)
pod.Annotations[constants.GpuPoolKey] = tfInfo.Profile.PoolName
if tfInfo.Profile.GPUModel != "" {
pod.Annotations[constants.GPUModelAnnotation] = tfInfo.Profile.GPUModel
}
if tfInfo.Profile.GPUVendor != "" {
pod.Annotations[constants.GpuVendorAnnotation] = tfInfo.Profile.GPUVendor
}
pod.Annotations[constants.IsLocalGPUAnnotation] = strconv.FormatBool(tfInfo.Profile.IsLocalGPU)
pod.Annotations[constants.SidecarWorkerAnnotation] = strconv.FormatBool(tfInfo.Profile.SidecarWorker)
// add inject container annotation for client Pod, in case user doesn't specify it
Expand Down Expand Up @@ -185,6 +195,7 @@ func AppendTFWorkerLabelsAndAnnotationsAfterTemplate(
return strconv.Itoa(int(index))
}), ",")
}
annotations[constants.ComputingIsolationModeAnnotation] = string(workload.Spec.ComputeIsolation)
return labels, annotations
}

Expand Down
27 changes: 9 additions & 18 deletions internal/webhook/v1/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque
if err := m.Client.Get(ctx, client.ObjectKey{Name: tfInfo.Profile.PoolName}, pool); err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("gpu pool(%s) does not exist", tfInfo.Profile.PoolName))
}
tfInfo.Profile.Qos = calculateQoSLevel(tfInfo.Profile, pool)

if workload, err := m.createOrUpdateWorkload(ctx, pod, &tfInfo, pool); err != nil {
if workload, err := m.createOrUpdateWorkload(ctx, pod, &tfInfo); err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("create tf workload: %w", err))
} else {
// Pod mutating webhook can not get Pod UID,
Expand Down Expand Up @@ -159,13 +160,12 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque
}

// Check if final profile is valid and contains valid GPU resource requests
if tfInfo.Profile.Resources.Limits.Tflops.IsZero() && tfInfo.Profile.Resources.Limits.ComputePercent.IsZero() {
if tfInfo.Profile.Resources.Requests.Tflops.IsZero() &&
tfInfo.Profile.Resources.Requests.ComputePercent.IsZero() &&
tfInfo.Profile.Resources.Requests.Vram.IsZero() {
return admission.Errored(http.StatusInternalServerError,
fmt.Errorf("tflops limit is not set, please set tensor-fusion.ai/tflops-limit or tensor-fusion.ai/compute-percent-limit annotation on Pod"))
}
if tfInfo.Profile.Resources.Limits.Vram.IsZero() {
return admission.Errored(http.StatusInternalServerError,
fmt.Errorf("vram limit is not set, please set tensor-fusion.ai/vram-limit annotation on Pod"))
fmt.Errorf("tflops request is not set, please set tensor-fusion.ai/tflops-request or/and tensor-fusion.ai/compute-percent-request"+
" or/and tensor-fusion.ai/vram-request annotation on Pod"))
}

// Add defaults and tensor-fusion injection logic
Expand Down Expand Up @@ -213,18 +213,9 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(
ctx context.Context,
pod *corev1.Pod,
tfInfo *utils.TensorFusionInfo,
pool *tfv1.GPUPool) (*tfv1.TensorFusionWorkload, error) {
) (*tfv1.TensorFusionWorkload, error) {
// Create the desired spec for comparison
desiredSpec := tfv1.WorkloadProfileSpec{
Replicas: nil,
PoolName: tfInfo.Profile.PoolName,
Resources: tfInfo.Profile.Resources,
Qos: calculateQoSLevel(tfInfo.Profile, pool),
IsLocalGPU: tfInfo.Profile.IsLocalGPU,
GPUCount: tfInfo.Profile.GPUCount,
GPUModel: tfInfo.Profile.GPUModel,
AutoScalingConfig: tfInfo.Profile.AutoScalingConfig,
}
desiredSpec := *tfInfo.Profile.DeepCopy()

workload := &tfv1.TensorFusionWorkload{}
err := m.Client.Get(ctx, client.ObjectKey{Name: tfInfo.WorkloadName, Namespace: pod.Namespace}, workload)
Expand Down
55 changes: 55 additions & 0 deletions internal/webhook/v1/pod_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func isAddOperation(patch jsonpatch.JsonPatchOperation) bool {
return patch.Operation == "add"
}

// nolint:goconst
var _ = Describe("TensorFusionPodMutator", func() {
var (
mutator *TensorFusionPodMutator
Expand Down Expand Up @@ -359,6 +360,60 @@ var _ = Describe("TensorFusionPodMutator", func() {
Expect(op.Value).To(Equal("3"))
})

It("should handle gpu resource limits with only one label", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-local-gpu",
Namespace: "default",
Labels: map[string]string{
constants.TensorFusionEnabledLabelKey: "true",
},
Annotations: map[string]string{
constants.GpuPoolKey: "mock",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "main",
Image: "test-image",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceName(constants.NvidiaGPUKey): resource.MustParse("3"),
},
},
},
},
},
}
podBytes, err := json.Marshal(pod)
Expect(err).NotTo(HaveOccurred())
req := admission.Request{
AdmissionRequest: admissionv1.AdmissionRequest{
Object: runtime.RawExtension{
Raw: podBytes,
},
Operation: admissionv1.Create,
Namespace: "default",
},
}
resp := mutator.Handle(ctx, req)
Expect(resp.Allowed).To(BeTrue())

op, found := lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1compute-percent-request"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("100"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1gpu-count"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("3"))
})

It("should handle ignore gpu resource limits when annotation is present", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
87 changes: 47 additions & 40 deletions internal/webhook/v1/tf_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,32 +181,44 @@ func parseAutoScalingAnnotations(pod *corev1.Pod, workloadProfile *tfv1.Workload
}

func parseGPUResourcesAnnotations(pod *corev1.Pod, workloadProfile *tfv1.WorkloadProfile) error {
// extract any containers has GPU count limits and set to annotation
isMigratedFromContainerLimits := false
gpuCount, hasValue := pod.Annotations[constants.GpuCountAnnotation]
if hasValue {
val, err := strconv.ParseInt(gpuCount, 10, 32)
if err != nil {
return fmt.Errorf("invalid gpuCount value: %w", err)
}
workloadProfile.Spec.GPUCount = uint32(val)
} else if workloadProfile.Spec.GPUCount == 0 {
for _, container := range pod.Spec.Containers {
if quantity, ok := container.Resources.Limits[constants.NvidiaGPUKey]; ok {
gpuNumber, err := strconv.Atoi(quantity.String())
if err != nil || gpuNumber <= 0 {
ctrl.Log.Error(err, "unrecognized nvidia.com/gpu in resources, not a valid number", "pod", pod.Name, "container", container.Name)
} else {
workloadProfile.Spec.GPUCount = uint32(gpuNumber)
// For seamless migration with only one tensor-fusion.ai/enabled label
// and one tensor-fusion.ai/vram-limit annotation, convert this to 100% computing-percent
workloadProfile.Spec.Resources.Limits.ComputePercent = resource.MustParse("100")
isMigratedFromContainerLimits = true
break
}
}
}
}

if tflopsLimit, hasValue := parseResourceQuantity(pod, constants.TFLOPSLimitAnnotation); hasValue {
workloadProfile.Spec.Resources.Limits.Tflops = tflopsLimit
// clean compute percent limit when tflops limit is set in annotation
if isMigratedFromContainerLimits {
workloadProfile.Spec.Resources.Limits.ComputePercent = resource.Quantity{}
}
}
if vramLimit, hasValue := parseResourceQuantity(pod, constants.VRAMLimitAnnotation); hasValue {
workloadProfile.Spec.Resources.Limits.Vram = vramLimit
}

computeRequest, hasValue := parseResourceQuantity(pod, constants.ComputeLimitAnnotation)
if hasValue {
workloadProfile.Spec.Resources.Limits.ComputePercent = computeRequest
}
computeLimit, hasValue := parseResourceQuantity(pod, constants.ComputeRequestAnnotation)
if hasValue {
workloadProfile.Spec.Resources.Requests.ComputePercent = computeLimit
} else {
workloadProfile.Spec.Resources.Requests.ComputePercent = workloadProfile.Spec.Resources.Limits.ComputePercent
}

// tflops - computePercent are mutually exclusive
if !workloadProfile.Spec.Resources.Requests.Tflops.IsZero() && !workloadProfile.Spec.Resources.Requests.ComputePercent.IsZero() {
return fmt.Errorf("tflops- and computePercent request are mutually exclusive, please specify only one")
}
if !workloadProfile.Spec.Resources.Limits.Tflops.IsZero() && !workloadProfile.Spec.Resources.Limits.ComputePercent.IsZero() {
return fmt.Errorf("tflops- and computePercent limit are mutually exclusive, please specify only one")
}

if tflopsRequest, hasValue := parseResourceQuantity(pod, constants.TFLOPSRequestAnnotation); hasValue {
workloadProfile.Spec.Resources.Requests.Tflops = tflopsRequest
} else if workloadProfile.Spec.Resources.Requests.Tflops.IsZero() {
Expand All @@ -218,31 +230,26 @@ func parseGPUResourcesAnnotations(pod *corev1.Pod, workloadProfile *tfv1.Workloa
workloadProfile.Spec.Resources.Requests.Vram = workloadProfile.Spec.Resources.Limits.Vram
}

qosLevel, hasValue := pod.Annotations[constants.QoSLevelAnnotation]
// Percentage way to specify GPU resource request, not recommended, should use TFLOPs instead
computeLimit, hasValue := parseResourceQuantity(pod, constants.ComputeLimitAnnotation)
if hasValue {
workloadProfile.Spec.Qos = tfv1.QoSLevel(qosLevel)
workloadProfile.Spec.Resources.Limits.ComputePercent = computeLimit
}
computeRequest, hasValue := parseResourceQuantity(pod, constants.ComputeRequestAnnotation)
if hasValue {
workloadProfile.Spec.Resources.Requests.ComputePercent = computeRequest
} else if workloadProfile.Spec.Resources.Requests.Tflops.IsZero() && workloadProfile.Spec.Resources.Requests.ComputePercent.IsZero() {
workloadProfile.Spec.Resources.Requests.ComputePercent = workloadProfile.Spec.Resources.Limits.ComputePercent
}

// extract any containers has GPU count limits and set to annotation
gpuCount, hasValue := pod.Annotations[constants.GpuCountAnnotation]
// tflops - computePercent are mutually exclusive
if !workloadProfile.Spec.Resources.Requests.Tflops.IsZero() && !workloadProfile.Spec.Resources.Requests.ComputePercent.IsZero() {
return fmt.Errorf("tflops- and computePercent request are mutually exclusive, please specify only one")
}

qosLevel, hasValue := pod.Annotations[constants.QoSLevelAnnotation]
if hasValue {
val, err := strconv.ParseInt(gpuCount, 10, 32)
if err != nil {
return fmt.Errorf("invalid gpuCount value: %w", err)
}
workloadProfile.Spec.GPUCount = uint32(val)
} else if workloadProfile.Spec.GPUCount == 0 {
for _, container := range pod.Spec.Containers {
if quantity, ok := container.Resources.Limits[constants.NvidiaGPUKey]; ok {
gpuNumber, err := strconv.Atoi(quantity.String())
if err != nil || gpuNumber <= 0 {
ctrl.Log.Error(err, "unrecognized nvidia.com/gpu in resources, not a valid number", "pod", pod.Name, "container", container.Name)
} else {
workloadProfile.Spec.GPUCount = uint32(gpuNumber)
break
}
}
}
workloadProfile.Spec.Qos = tfv1.QoSLevel(qosLevel)
}

gpuVendor, hasValue := pod.Annotations[constants.GpuVendorAnnotation]
Expand Down
Loading