Skip to content

Commit 0b28f2d

Browse files
committed
feat(fc): Initial wiring of the flow control layer
This commit introduces the initial integration of the new Flow Control layer into the Endpoint Picker (EPP). The primary goal is to provide a mechanism for more sophisticated request admission control, queuing, and multitenancy management. This new layer is gated by the ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER feature flag. When enabled, the Director delegates admission control decisions to the new flowController component. The legacy saturation-based shedding logic is preserved for when the feature is disabled. Comprehensive unit tests for the admitRequest function have been added to validate the behavior of both the legacy and new flow control paths.
1 parent 9286c12 commit 0b28f2d

File tree

4 files changed

+327
-22
lines changed

4 files changed

+327
-22
lines changed

cmd/epp/runner/runner.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ import (
5050
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5151
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
5252
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
53+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
54+
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
55+
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
5356
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5457
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
5558
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
@@ -68,11 +71,25 @@ import (
6871
)
6972

7073
const (
71-
// enableExperimentalDatalayerV2 defines the environment variable
72-
// used as feature flag for the pluggable data layer.
74+
// enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer.
7375
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
76+
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
77+
// control layer.
78+
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
7479
)
7580

81+
// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
82+
var flowControlConfig = flowcontrol.Config{
83+
Controller: fccontroller.Config{}, // Use all defaults.
84+
Registry: fcregistry.Config{
85+
// Define domain of accepted priority levels as this field is required. Use defaults for all optional fields.
86+
// TODO: this should not be hardcoded.
87+
PriorityBands: []fcregistry.PriorityBandConfig{
88+
{Priority: 0, PriorityName: "Default"},
89+
},
90+
},
91+
}
92+
7693
var (
7794
grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy")
7895
grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes")
@@ -271,7 +288,46 @@ func (r *Runner) Run(ctx context.Context) error {
271288

272289
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
273290

274-
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
291+
// --- Flow Control Initialization (Experimental) ---
292+
293+
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
294+
var flowController *fccontroller.FlowController
295+
if enableFlowControl {
296+
setupLog.Info("Initializing experimental Flow Control layer")
297+
cfg, err := flowControlConfig.ValidateAndApplyDefaults()
298+
if err != nil {
299+
setupLog.Error(err, "failed to initialize Flow Control layer")
300+
return fmt.Errorf("invalid Flow Control config: %w", err)
301+
}
302+
303+
registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog)
304+
if err != nil {
305+
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
306+
}
307+
fc, err := fccontroller.NewFlowController(
308+
ctx,
309+
cfg.Controller,
310+
registry,
311+
saturationDetector,
312+
setupLog,
313+
)
314+
if err != nil {
315+
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
316+
}
317+
flowController = fc
318+
319+
go registry.Run(ctx)
320+
} else {
321+
setupLog.Info("Experimental Flow Control layer is disabled")
322+
}
323+
324+
director := requestcontrol.NewDirectorWithConfig(
325+
datastore,
326+
scheduler,
327+
saturationDetector,
328+
flowController,
329+
r.requestControlConfig,
330+
enableFlowControl)
275331

276332
// --- Setup ExtProc Server Runner ---
277333
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/requestcontrol/director.go

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3434
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3535
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
36+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
3637
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3738
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -59,15 +60,29 @@ type SaturationDetector interface {
5960
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
6061
}
6162

63+
// flowController defines the minimal interface required by the Director for flow control.
64+
type flowController interface {
65+
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
66+
}
67+
6268
// NewDirectorWithConfig creates a new Director instance with all dependencies.
63-
func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
69+
func NewDirectorWithConfig(
70+
datastore Datastore,
71+
scheduler Scheduler,
72+
saturationDetector SaturationDetector,
73+
fc flowController,
74+
config *Config,
75+
enableFlowControl bool,
76+
) *Director {
6477
return &Director{
6578
datastore: datastore,
6679
scheduler: scheduler,
6780
saturationDetector: saturationDetector,
81+
flowController: fc,
6882
preRequestPlugins: config.preRequestPlugins,
6983
postResponsePlugins: config.postResponsePlugins,
7084
defaultPriority: 0, // define default priority explicitly
85+
enableFlowControl: enableFlowControl,
7186
}
7287
}
7388

@@ -76,12 +91,14 @@ type Director struct {
7691
datastore Datastore
7792
scheduler Scheduler
7893
saturationDetector SaturationDetector
94+
flowController flowController
7995
preRequestPlugins []PreRequest
8096
postResponsePlugins []PostResponse
8197
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
8298
// no need to set this in the constructor, since the value we want is the default int val
8399
// and value types cannot be nil
84-
defaultPriority int
100+
defaultPriority int
101+
enableFlowControl bool
85102
}
86103

87104
// HandleRequest orchestrates the request lifecycle.
@@ -141,7 +158,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
141158
}
142159

143160
// Admission Control check
144-
if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil {
161+
if err := d.admitRequest(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil {
145162
return reqCtx, err
146163
}
147164

@@ -209,27 +226,43 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
209226

210227
// admitRequest handles admission control to decide whether or not to accept the request
211228
// based on the request priority and saturation state.
212-
func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, requestPriority int, fairnessID string) error {
229+
func (d *Director) admitRequest(ctx context.Context, reqCtx *handlers.RequestContext, candidatePods []backendmetrics.PodMetrics, requestPriority int) error {
213230
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
214231

215-
loggerTrace.Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID)
232+
loggerTrace.Info("Entering admission control", "priority", requestPriority, "fairnessID", reqCtx.FairnessID, "flowControlEnabled", d.flowController != nil)
216233

217234
// This will be removed in favor of a more robust implementation (Flow Control) in the very near future.
218235
// TODO: Make this a configurable value.
219236
// Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347
220-
if requestPriority >= 0 {
221-
loggerTrace.Info("Non-sheddable request bypassing saturation check.")
222-
return nil
223-
}
224-
225-
if d.saturationDetector.IsSaturated(ctx, candidatePods) {
237+
isSheddable := requestPriority < 0
238+
if isSheddable && d.saturationDetector.IsSaturated(ctx, candidatePods) {
226239
return errutil.Error{
227240
Code: errutil.InferencePoolResourceExhausted,
228241
Msg: "system saturated, sheddable request dropped",
229242
}
230243
}
231244

232-
return nil
245+
if !d.enableFlowControl {
246+
loggerTrace.Info("Non-sheddable request bypassing saturation check.")
247+
return nil
248+
}
249+
250+
fairnessID := reqCtx.FairnessID
251+
if fairnessID == "" {
252+
fairnessID = "default-flow"
253+
}
254+
255+
fcReq := &flowControlRequest{
256+
ctx: ctx,
257+
requestID: reqCtx.SchedulingRequest.RequestId,
258+
fairnessID: fairnessID,
259+
priority: requestPriority,
260+
requestByteSize: uint64(reqCtx.RequestSize),
261+
candidatePods: candidatePods,
262+
}
263+
264+
outcome, err := d.flowController.EnqueueAndWait(fcReq)
265+
return translateFlowControlOutcome(outcome, err)
233266
}
234267

235268
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
@@ -323,3 +356,53 @@ func (d *Director) runPostResponsePlugins(ctx context.Context, request *scheduli
323356
loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName())
324357
}
325358
}
359+
360+
// --- Flow Control Integration ---
361+
362+
// flowControlRequest is an adapter that implements the types.FlowControlRequest interface, wrapping the director's
363+
// internal request context.
364+
type flowControlRequest struct {
365+
ctx context.Context
366+
requestID string
367+
fairnessID string
368+
priority int
369+
requestByteSize uint64
370+
candidatePods []backendmetrics.PodMetrics
371+
}
372+
373+
var _ types.FlowControlRequest = &flowControlRequest{}
374+
375+
func (r *flowControlRequest) Context() context.Context { return r.ctx }
376+
func (r *flowControlRequest) ID() string { return r.requestID }
377+
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
378+
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
379+
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
380+
return r.candidatePods
381+
}
382+
func (r *flowControlRequest) FlowKey() types.FlowKey {
383+
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
384+
}
385+
386+
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
387+
// contract.
388+
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
389+
msg := "request rejected by flow control"
390+
if err != nil {
391+
msg = err.Error()
392+
}
393+
394+
switch outcome {
395+
case types.QueueOutcomeDispatched:
396+
return nil
397+
case types.QueueOutcomeRejectedCapacity:
398+
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
399+
case types.QueueOutcomeEvictedTTL:
400+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
401+
case types.QueueOutcomeEvictedContextCancelled:
402+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
403+
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
404+
return errutil.Error{Code: errutil.Internal, Msg: msg}
405+
default:
406+
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
407+
}
408+
}

0 commit comments

Comments
 (0)