Skip to content

Commit 3481892

Browse files
committed
feature: Add draft implementation for @gather
1 parent 4532099 commit 3481892

13 files changed

Lines changed: 890 additions & 315 deletions

File tree

pkg/cache/delta.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import (
99
"github.com/hsnlab/dcontroller/pkg/object"
1010
)
1111

12-
type DeltaType = toolscache.DeltaType
12+
type DeltaType string
1313

1414
const (
15-
Added = toolscache.Added
16-
Deleted = toolscache.Deleted
17-
Updated = toolscache.Updated
18-
Replaced = toolscache.Replaced
19-
Sync = toolscache.Sync
20-
Upserted = "Upserted" // for events that are either an update/replace or an add
15+
Added = DeltaType(toolscache.Added)
16+
Deleted = DeltaType(toolscache.Deleted)
17+
Updated = DeltaType(toolscache.Updated)
18+
Replaced = DeltaType(toolscache.Replaced)
19+
Sync = DeltaType(toolscache.Sync)
20+
Upserted = DeltaType("Upserted") // for events that are either an update/replace or an add
2121
)
2222

2323
// NilDelta is a placeholder for a delta that means no change.

pkg/expression/expression.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,21 @@ func (e *Expression) Evaluate(ctx EvalCtx) (any, error) {
216216
return nil, NewExpressionError(e, err)
217217
}
218218

219-
var ret any
220219
// evaluate expressons
221220
for _, arg := range args {
222221
res, err := arg.Evaluate(ctx)
223222
if err != nil {
224223
return nil, errors.New("failed to evaluate expression")
225224
}
226-
ret, err = object.MergeAny(ret, res)
225+
ctx.Object, err = object.MergeAny(ctx.Object, res)
227226
if err != nil {
228227
return nil, err
229228
}
230-
ctx.Object = ret
231229
}
232230

233-
ctx.Log.V(8).Info("eval ready", "expression", e.String(), "result", ret)
231+
ctx.Log.V(8).Info("eval ready", "expression", e.String(), "result", ctx.Object)
234232

235-
return ret, nil
233+
return ctx.Object, nil
236234

237235
case "@filter":
238236
args, err := AsExpOrExpList(e.Arg)

pkg/expression/expression_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,8 +644,8 @@ var _ = Describe("Expressions", func() {
644644
}
645645
})
646646

647-
It("should merge two JSONPath setters", func() {
648-
jsonData := `{"@merge":[{"spec":{"a":"aaa"}},{"$.spec.b.d":12}]}`
647+
It("should merge two setters with a JSONPath setter", func() {
648+
jsonData := `{"@merge":[{"spec":1},{"spec":{"a":"aaa"}},{"$.spec.b.d":12}]}`
649649
var exp Expression
650650
err := json.Unmarshal([]byte(jsonData), &exp)
651651
Expect(err).NotTo(HaveOccurred())

pkg/expression/jsonpath.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func GetJSONPath(ctx EvalCtx, key string) (any, error) {
2929
subject = ctx.Subject
3030
}
3131

32-
ret, err := GetJSONPathExp(key, subject)
32+
ret, err := GetJSONPathRaw(key, subject)
3333
if err != nil {
3434
return nil, err
3535
}
@@ -83,7 +83,7 @@ func SetJSONPath(ctx EvalCtx, key string, value, data any) error {
8383
}
8484

8585
// then call the low-level set util
86-
if err := SetJSONPathExp(key, value, data); err != nil {
86+
if err := SetJSONPathRaw(key, value, data); err != nil {
8787
return fmt.Errorf("JSONPath expression error: cannot set "+
8888
"key %q to value %q: %w", key, value, err)
8989
}
@@ -93,9 +93,9 @@ func SetJSONPath(ctx EvalCtx, key string, value, data any) error {
9393

9494
// low-level utils
9595

96-
// GetJSONPathExp evaluates a JSONPath expression on the specified object and returns the result or
96+
// GetJSONPathRaw evaluates a JSONPath expression on the specified object and returns the result or
9797
// an error.
98-
func GetJSONPathExp(query string, object any) (any, error) {
98+
func GetJSONPathRaw(query string, object any) (any, error) {
9999
je, err := jp.ParseString(query)
100100
if err != nil {
101101
return nil, err
@@ -112,10 +112,10 @@ func GetJSONPathExp(query string, object any) (any, error) {
112112
return values[0], nil
113113
}
114114

115-
// SetJSONPathExp sets a key (possibly represented with a JSONPath expression) to a value (can also
115+
// SetJSONPathRaw sets a key (possibly represented with a JSONPath expression) to a value (can also
116116
// be a JSONPath expression, which will be evaluated using the object argument) in the given data
117117
// structure.
118-
func SetJSONPathExp(key string, value, target any) error {
118+
func SetJSONPathRaw(key string, value, target any) error {
119119
je, err := jp.ParseString(key)
120120
if err != nil {
121121
return err
@@ -124,14 +124,16 @@ func SetJSONPathExp(key string, value, target any) error {
124124
return je.Set(target, value)
125125
}
126126

127-
// lit := []Expression{}
128-
// for _, arg := range args {
129-
// lit = append(lit, Expression{Op: "@any", Literal: arg, Raw: util.Stringify(arg)})
130-
// }
131-
// argList, err := asExpList(exp.Arg)
132-
// Expect(err).NotTo(HaveOccurred())
133-
// argList = append(argList, Expression{
134-
// Op: "@list",
135-
// Literal: lit,
136-
// })
137-
// exp.Arg.Literal = argList
127+
// SetJSONPathRawExp is the same as SetJSONPathRaw but takes the key as a string expression.
128+
func SetJSONPathRawExp(keyExp *Expression, value, data any) error {
129+
key, err := keyExp.GetLiteralString()
130+
if err != nil {
131+
return err
132+
}
133+
134+
if err := SetJSONPathRaw(key, value, data); err != nil {
135+
return err
136+
}
137+
138+
return nil
139+
}

pkg/expression/jsonpath_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,19 @@ var _ = Describe("JSONPath", func() {
8787
obj, err := object.NewViewObjectFromNativeObject("Service", input)
8888
Expect(err).NotTo(HaveOccurred())
8989

90-
res, err := GetJSONPathExp(`$.metadata.name`, obj.UnstructuredContent())
90+
res, err := GetJSONPathRaw(`$.metadata.name`, obj.UnstructuredContent())
9191
Expect(err).NotTo(HaveOccurred())
9292
s, err := AsString(res)
9393
Expect(err).NotTo(HaveOccurred())
9494
Expect(s).To(Equal("testservice-ok"))
9595

96-
res, err = GetJSONPathExp(`$["metadata"]["namespace"]`, obj.UnstructuredContent())
96+
res, err = GetJSONPathRaw(`$["metadata"]["namespace"]`, obj.UnstructuredContent())
9797
Expect(err).NotTo(HaveOccurred())
9898
s, err = AsString(res)
9999
Expect(err).NotTo(HaveOccurred())
100100
Expect(s).To(Equal("testnamespace"))
101101

102-
res, err = GetJSONPathExp(`$.metadata`, obj.UnstructuredContent())
102+
res, err = GetJSONPathRaw(`$.metadata`, obj.UnstructuredContent())
103103
Expect(err).NotTo(HaveOccurred())
104104
d, ok := res.(Unstructured)
105105
Expect(ok).To(BeTrue())
@@ -108,13 +108,13 @@ var _ = Describe("JSONPath", func() {
108108
Expect(d).To(HaveKey("name"))
109109
Expect(d["name"]).To(Equal("testservice-ok"))
110110

111-
res, err = GetJSONPathExp(`$.spec.ports[1].port`, obj.UnstructuredContent())
111+
res, err = GetJSONPathRaw(`$.spec.ports[1].port`, obj.UnstructuredContent())
112112
Expect(err).NotTo(HaveOccurred())
113113
i, err := AsInt(res)
114114
Expect(err).NotTo(HaveOccurred())
115115
Expect(i).To(Equal(int64(2)))
116116

117-
res, err = GetJSONPathExp(`$.spec.ports[?(@.name == 'udp-ok')].protocol`,
117+
res, err = GetJSONPathRaw(`$.spec.ports[?(@.name == 'udp-ok')].protocol`,
118118
obj.UnstructuredContent())
119119
Expect(err).NotTo(HaveOccurred())
120120
s, err = AsString(res)
@@ -138,14 +138,14 @@ var _ = Describe("JSONPath", func() {
138138
})
139139

140140
// must use the alternative form
141-
res, err := GetJSONPathExp(`$["metadata"]["annotations"]["kubernetes.io/service-name"]`,
141+
res, err := GetJSONPathRaw(`$["metadata"]["annotations"]["kubernetes.io/service-name"]`,
142142
obj.UnstructuredContent())
143143
Expect(err).NotTo(HaveOccurred())
144144
s, err := AsString(res)
145145
Expect(err).NotTo(HaveOccurred())
146146
Expect(s).To(Equal("example"))
147147

148-
res, err = GetJSONPathExp(`$["metadata"]["annotations"]["kubernetes.io[service-name]"]`,
148+
res, err = GetJSONPathRaw(`$["metadata"]["annotations"]["kubernetes.io[service-name]"]`,
149149
obj.UnstructuredContent())
150150
Expect(err).NotTo(HaveOccurred())
151151
s, err = AsString(res)

pkg/pipeline/aggregation.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const aggregateOp = "@aggregate"
1616
// of objects in a view.
1717
type Aggregation struct {
1818
*opv1a1.Aggregation
19+
Stages []*Stage
1920
engine Engine
2021
}
2122

@@ -24,16 +25,24 @@ func NewAggregation(engine Engine, config *opv1a1.Aggregation) *Aggregation {
2425
if config == nil {
2526
return nil
2627
}
27-
return &Aggregation{
28+
29+
a := &Aggregation{
2830
Aggregation: config,
31+
Stages: make([]*Stage, len(config.Expressions)),
2932
engine: engine,
3033
}
34+
35+
for i, e := range config.Expressions {
36+
a.Stages[i] = NewStage(engine, &e)
37+
}
38+
39+
return a
3140
}
3241

3342
func (a *Aggregation) String() string {
3443
ss := []string{}
35-
for _, e := range a.Expressions {
36-
ss = append(ss, e.String())
44+
for _, s := range a.Stages {
45+
ss = append(ss, s.String())
3746
}
3847
return fmt.Sprintf("%s:[%s]", aggregateOp, strings.Join(ss, ","))
3948
}

0 commit comments

Comments
 (0)