Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit 4335ab7

Browse files
committed
chore: Add missing source file
1 parent 5800a41 commit 4335ab7

1 file changed

Lines changed: 216 additions & 0 deletions

File tree

pkg/operator/group.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package operator
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/go-logr/logr"
9+
"k8s.io/client-go/rest"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
"sigs.k8s.io/controller-runtime/pkg/config"
12+
runtimeManager "sigs.k8s.io/controller-runtime/pkg/manager"
13+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
14+
15+
opv1a1 "github.com/l7mp/dcontroller/pkg/api/operator/v1alpha1"
16+
viewv1a1 "github.com/l7mp/dcontroller/pkg/api/view/v1alpha1"
17+
"github.com/l7mp/dcontroller/pkg/apiserver"
18+
"github.com/l7mp/dcontroller/pkg/composite"
19+
"github.com/l7mp/dcontroller/pkg/manager"
20+
)
21+
22+
// StatusChannelBufferSize defines the longest backlog on the status channel.
23+
const StatusChannelBufferSize = 64
24+
25+
type opEntry struct {
26+
op *Operator
27+
errorChan chan error
28+
cancel context.CancelFunc
29+
}
30+
31+
// Group is a set of operators that share the same lifecycle and a common APIServer
32+
type Group struct {
33+
operators map[string]*opEntry
34+
config *rest.Config
35+
clientMpx composite.ClientMultiplexer
36+
mu sync.Mutex
37+
apiServer *apiserver.APIServer
38+
errorChan chan error
39+
ctx context.Context
40+
started bool
41+
logger, log logr.Logger
42+
}
43+
44+
// NewController creates a new Kubernetes controller that handles the Operator CRDs.
45+
func NewGroup(config *rest.Config, logger logr.Logger) *Group {
46+
if logger.GetSink() == nil {
47+
logger = logr.Discard()
48+
}
49+
50+
return &Group{
51+
clientMpx: composite.NewClientMultiplexer(logger),
52+
config: config,
53+
operators: make(map[string]*opEntry),
54+
errorChan: make(chan error),
55+
logger: logger,
56+
log: logger.WithName("op-group"),
57+
}
58+
}
59+
60+
// GetClient returns a controller runtime client that multiplexes all registered operator clients.
61+
func (g *Group) GetClient() client.Client {
62+
return g.clientMpx
63+
}
64+
65+
// SetAPIServer allows to set the embedded API server shared by the operatos for this group. The
66+
// API server lifecycle is not managed by the operator; make sure to run apiServer.Start before
67+
// calling Start on the oparator group.
68+
func (g *Group) SetAPIServer(apiServer *apiserver.APIServer) {
69+
g.apiServer = apiServer
70+
for _, e := range g.operators {
71+
e.op.SetAPIServer(apiServer)
72+
}
73+
}
74+
75+
// GetErrorChannel returns the error channel the group uses to surface errors.
76+
func (g *Group) GetErrorChannel() chan error {
77+
return g.errorChan
78+
}
79+
80+
// GetOperator returns the operator with the given name.
81+
func (g *Group) GetOperator(name string) *Operator {
82+
g.mu.Lock()
83+
op := g.operators[name]
84+
g.mu.Unlock()
85+
return op.op
86+
}
87+
88+
// AddOperator registers an operator with the group.
89+
func (g *Group) AddOperator(name string, spec *opv1a1.OperatorSpec) (*Operator, error) {
90+
g.log.V(4).Info("adding operator", "name", name)
91+
92+
// disable leader-election, health-check and the metrics server on the embedded manager
93+
off := true
94+
opts := runtimeManager.Options{
95+
LeaderElection: false,
96+
HealthProbeBindAddress: "0",
97+
// Disable global controller name uniquness test
98+
Controller: config.Controller{
99+
SkipNameValidation: &off,
100+
},
101+
Metrics: metricsserver.Options{
102+
BindAddress: "0",
103+
},
104+
Logger: g.logger,
105+
}
106+
107+
// First create a manager for this operator
108+
mgr, err := manager.New(g.config, name, manager.Options{Options: opts})
109+
if err != nil {
110+
return nil, fmt.Errorf("failed to create manager for operator %s: %w",
111+
name, err)
112+
}
113+
114+
errorChan := make(chan error, StatusChannelBufferSize)
115+
operator := New(name, mgr, spec, Options{
116+
APIServer: g.apiServer,
117+
ErrorChannel: errorChan,
118+
Logger: g.logger,
119+
})
120+
121+
e := &opEntry{op: operator, errorChan: errorChan}
122+
g.mu.Lock()
123+
g.operators[name] = e
124+
g.mu.Unlock()
125+
126+
// register the operator in the client multiplexer
127+
if err := g.clientMpx.RegisterClient(viewv1a1.Group(name), mgr.GetClient()); err != nil {
128+
g.log.Error(err, "failed to register operator in the multiplex client", "operator", name)
129+
}
130+
131+
// start the new operator if we are already running
132+
if g.started {
133+
g.startOp(e)
134+
}
135+
136+
return operator, nil
137+
}
138+
139+
func (g *Group) UpsertOperator(name string, spec *opv1a1.OperatorSpec) (*Operator, error) {
140+
g.log.V(4).Info("upserting operator", "name", name)
141+
142+
// if this is a modification event we first remove old operator and create a new one
143+
if e := g.getOperatorEntry(name); e != nil {
144+
g.DeleteOperator(name)
145+
}
146+
147+
return g.AddOperator(name, spec)
148+
}
149+
150+
func (g *Group) DeleteOperator(name string) {
151+
g.log.V(4).Info("deleting operator", "name", name)
152+
153+
e := g.getOperatorEntry(name)
154+
if e == nil {
155+
return
156+
}
157+
158+
if e.cancel != nil {
159+
e.cancel()
160+
}
161+
162+
// unregister the operator in the client multiplexer
163+
if err := g.clientMpx.UnregisterClient(viewv1a1.Group(e.op.name)); err != nil {
164+
g.log.Error(err, "failed to unregister operator in the multiplex client", "name", e.op.name)
165+
}
166+
167+
g.mu.Lock()
168+
delete(g.operators, name)
169+
g.mu.Unlock()
170+
}
171+
172+
// Start starts the operators registered with the group. It blocks
173+
func (g *Group) Start(ctx context.Context) error {
174+
defer close(g.errorChan)
175+
g.log.V(2).Info("starting operator group", "num-operators", len(g.operators),
176+
"APIServer", fmt.Sprintf("%t", g.apiServer == nil))
177+
178+
g.mu.Lock()
179+
es := []*opEntry{}
180+
for _, e := range g.operators {
181+
es = append(es, e)
182+
}
183+
g.ctx = ctx
184+
g.started = true
185+
g.mu.Unlock()
186+
187+
for _, e := range es {
188+
g.startOp(e)
189+
}
190+
191+
<-ctx.Done()
192+
193+
return nil
194+
}
195+
196+
func (g *Group) startOp(e *opEntry) {
197+
// Store the cancel function into the operator entry
198+
ctx, cancel := context.WithCancel(g.ctx)
199+
e.cancel = cancel
200+
201+
// pass the errors on to our caller
202+
go func() {
203+
for err := range e.errorChan {
204+
g.errorChan <- err
205+
}
206+
}()
207+
208+
go e.op.Start(ctx) //nolint:errcheck
209+
}
210+
211+
func (g *Group) getOperatorEntry(name string) *opEntry {
212+
g.mu.Lock()
213+
op := g.operators[name]
214+
g.mu.Unlock()
215+
return op
216+
}

0 commit comments

Comments
 (0)