Skip to content

Commit 3935805

Browse files
committed
feat: add an async observer wrapper
1 parent eeb6fde commit 3935805

File tree

5 files changed

+272
-14
lines changed

5 files changed

+272
-14
lines changed

async_routine.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ type AsyncRoutine interface {
4343
id() string
4444
}
4545

46+
type routineData map[string]string
47+
4648
type asyncRoutine struct {
4749
routineId string
4850
name string
@@ -56,7 +58,7 @@ type asyncRoutine struct {
5658
status RoutineStatus
5759
ctx context.Context
5860
originatorOpId string
59-
data map[string]string
61+
data routineData
6062
}
6163

6264
func (r *asyncRoutine) Name() string {

async_routine_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string {
5353

5454
// RemoveObserver removes the given RoutineObserver from the list of observers
5555
func (arm *asyncRoutineManager) RemoveObserver(observerId string) {
56+
observer, ok := arm.observers.Get(observerId)
57+
if !ok {
58+
return
59+
}
60+
61+
if obs, ok := observer.(*asyncRoutineObserver); ok {
62+
obs.stopObserving()
63+
}
64+
5665
arm.observers.Remove(observerId)
5766
}
5867

async_routine_observer.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"time"
8+
)
9+
10+
// Event types for routine lifecycle and snapshotting
11+
type routineLifeCycleEventType = int
12+
type routineSnapshottingEventType = int
13+
14+
const (
15+
routineStarted routineLifeCycleEventType = iota
16+
routineEnded
17+
routineTimeboxExceeded
18+
)
19+
20+
const (
21+
routineCount routineSnapshottingEventType = iota
22+
routineByNameCount
23+
)
24+
25+
type asyncRoutineObserverOption func(o *asyncRoutineObserver)
26+
27+
// WithObserverTimeout sets a timeout for the observer routine.
28+
func WithObserverTimeout(timeout time.Duration) func(o *asyncRoutineObserver) {
29+
return func(o *asyncRoutineObserver) {
30+
o.timeout = &timeout
31+
}
32+
}
33+
34+
// WithObserverRoutineData adds custom data to the observer routine.
35+
func WithObserverRoutineData(key string, value string) func(o *asyncRoutineObserver) {
36+
return func(o *asyncRoutineObserver) {
37+
if o.observerNotifierRoutineData == nil {
38+
o.observerNotifierRoutineData = make(routineData)
39+
}
40+
o.observerNotifierRoutineData[key] = value
41+
}
42+
}
43+
44+
// WithChannelSize sets the buffer size for the async observer's event channel.
45+
// Use this as an option when creating the async observer.
46+
func WithChannelSize(size int) func(o *asyncRoutineObserver) {
47+
return func(o *asyncRoutineObserver) {
48+
if o.channel != nil {
49+
close(o.channel)
50+
}
51+
o.channel = make(chan asyncRoutineEvent, size)
52+
}
53+
}
54+
55+
// WithGuaranteedDelivery enables guaranteed delivery mode for the async observer's events.
56+
// When enabled, the observer will not drop events if the channel is full, but will block until space is available.
57+
// Use this as an option when creating the observer.
58+
func WithGuaranteedDelivery() func(o *asyncRoutineObserver) {
59+
return func(o *asyncRoutineObserver) {
60+
o.guaranteedDelivery = true
61+
}
62+
}
63+
64+
func NewAsyncRoutineObserver(delegate RoutinesObserver, options ...asyncRoutineObserverOption) RoutinesObserver {
65+
asyncObserver := asyncRoutineObserver{
66+
delegate: delegate,
67+
guaranteedDelivery: false,
68+
}
69+
70+
asyncObserver.channelIsOpen.Store(true)
71+
72+
for _, option := range options {
73+
option(&asyncObserver)
74+
}
75+
76+
if asyncObserver.channel == nil {
77+
asyncObserver.channel = make(chan asyncRoutineEvent, 16)
78+
}
79+
80+
asyncObserver.startObserving()
81+
return &asyncObserver
82+
}
83+
84+
var _ RoutinesObserver = (*asyncRoutineObserver)(nil)
85+
86+
type asyncRoutineObserver struct {
87+
delegate RoutinesObserver
88+
channel chan asyncRoutineEvent
89+
channelIsOpen atomic.Bool
90+
timeout *time.Duration
91+
observerNotifierRoutineData routineData
92+
closeOnce sync.Once
93+
guaranteedDelivery bool
94+
}
95+
96+
func (a *asyncRoutineObserver) RoutineStarted(routine AsyncRoutine) {
97+
a.notify(&routineLifecycleEvent{
98+
evtType: routineStarted,
99+
routine: routine,
100+
})
101+
}
102+
103+
func (a *asyncRoutineObserver) RoutineFinished(routine AsyncRoutine) {
104+
a.notify(&routineLifecycleEvent{
105+
evtType: routineEnded,
106+
routine: routine,
107+
})
108+
}
109+
110+
func (a *asyncRoutineObserver) RoutineExceededTimebox(routine AsyncRoutine) {
111+
a.notify(&routineLifecycleEvent{
112+
evtType: routineTimeboxExceeded,
113+
routine: routine,
114+
})
115+
}
116+
117+
func (a *asyncRoutineObserver) RunningRoutineCount(count int) {
118+
a.notify(&routineCountEvent{
119+
count: count,
120+
})
121+
}
122+
123+
func (a *asyncRoutineObserver) RunningRoutineByNameCount(name string, count int) {
124+
a.notify(&routineByNameCountEvent{
125+
routineName: name,
126+
count: count,
127+
})
128+
}
129+
130+
func (a *asyncRoutineObserver) manageLifecycleEvent(evt *routineLifecycleEvent) {
131+
switch evt.evtType {
132+
case routineStarted:
133+
a.delegate.RoutineStarted(evt.routine)
134+
case routineEnded:
135+
a.delegate.RoutineFinished(evt.routine)
136+
case routineTimeboxExceeded:
137+
a.delegate.RoutineExceededTimebox(evt.routine)
138+
}
139+
}
140+
141+
func (a *asyncRoutineObserver) startObserving() {
142+
routine := NewAsyncRoutine("async-observer-notifier", context.Background(), func() {
143+
for evt := range a.channel {
144+
switch evt.(type) {
145+
case *routineLifecycleEvent:
146+
a.manageLifecycleEvent(evt.(*routineLifecycleEvent))
147+
case *routineCountEvent:
148+
a.delegate.RunningRoutineCount(evt.(*routineCountEvent).count)
149+
case *routineByNameCountEvent:
150+
evt := evt.(*routineByNameCountEvent)
151+
a.delegate.RunningRoutineByNameCount(evt.routineName, evt.count)
152+
}
153+
}
154+
})
155+
156+
for key, value := range a.observerNotifierRoutineData {
157+
routine = routine.WithData(key, value)
158+
}
159+
160+
if a.timeout != nil {
161+
routine = routine.Timebox(*a.timeout)
162+
}
163+
164+
routine.Run()
165+
}
166+
167+
func (a *asyncRoutineObserver) stopObserving() {
168+
a.closeOnce.Do(func() {
169+
a.channelIsOpen.Store(false)
170+
close(a.channel)
171+
})
172+
}
173+
174+
// notify sends an event to the observer channel, non-blocking if closed.
175+
func (a *asyncRoutineObserver) notify(evt asyncRoutineEvent) {
176+
if a.guaranteedDelivery {
177+
if a.channelIsOpen.Load() {
178+
a.channel <- evt
179+
}
180+
// channel is closed. Event is lost. This happens when `stopObserving` is called.
181+
return
182+
}
183+
184+
select {
185+
case a.channel <- evt:
186+
default:
187+
// Event dropped
188+
}
189+
}
190+
191+
// --- Event types and interfaces ---
192+
193+
type asyncRoutineEvent interface {
194+
getEventType() int
195+
}
196+
197+
var _ asyncRoutineEvent = (*routineLifecycleEvent)(nil)
198+
var _ asyncRoutineEvent = (*routineCountEvent)(nil)
199+
var _ asyncRoutineEvent = (*routineByNameCountEvent)(nil)
200+
201+
type routineLifecycleEvent struct {
202+
evtType routineLifeCycleEventType
203+
routine AsyncRoutine
204+
}
205+
206+
func (r *routineLifecycleEvent) getEventType() int {
207+
return r.evtType
208+
}
209+
210+
type routineCountEvent struct {
211+
count int
212+
}
213+
214+
func (r routineCountEvent) getEventType() int {
215+
return routineCount
216+
}
217+
218+
type routineByNameCountEvent struct {
219+
routineName string
220+
count int
221+
}
222+
223+
func (r routineByNameCountEvent) getEventType() int {
224+
return routineByNameCount
225+
}

async_routine_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ package async
22

33
import (
44
"context"
5+
. "github.com/onsi/ginkgo/v2"
56
"sync"
67
"time"
78

89
"go.uber.org/mock/gomock"
910

10-
. "github.com/onsi/ginkgo/v2/dsl/core"
1111
. "github.com/onsi/gomega"
1212
)
1313

1414
var _ = Describe("AsyncRoutine", func() {
15-
It("Run Async Routine", func() {
15+
DescribeTable("Run Async Routine", func(asyncObserver bool) {
1616
mockCtrl := gomock.NewController(GinkgoT())
1717
routineRan := false
1818
var wg sync.WaitGroup
@@ -22,7 +22,12 @@ var _ = Describe("AsyncRoutine", func() {
2222
observer := NewMockRoutinesObserver(mockCtrl)
2323

2424
manager := newAsyncRoutineManager()
25-
_ = manager.AddObserver(observer)
25+
26+
if asyncObserver {
27+
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
28+
} else {
29+
_ = manager.AddObserver(observer)
30+
}
2631

2732
observer.EXPECT().RoutineStarted(gomock.Any()).AnyTimes()
2833
observer.EXPECT().RoutineFinished(gomock.Any()).AnyTimes().Do(
@@ -46,5 +51,8 @@ var _ = Describe("AsyncRoutine", func() {
4651
Expect(routine.FinishedAt()).ToNot(BeNil())
4752
Expect(routine.FinishedAt().After(*routine.StartedAt())).To(BeTrue())
4853
Expect(routine.Status()).To(Equal(RoutineStatusFinished))
49-
})
54+
},
55+
Entry("with sync observer", false),
56+
Entry("with async observer", true),
57+
)
5058
})

async_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,18 @@ import (
66

77
"go.uber.org/mock/gomock"
88

9-
"github.com/onsi/ginkgo/v2"
10-
. "github.com/onsi/ginkgo/v2/dsl/core"
9+
. "github.com/onsi/ginkgo/v2"
1110
. "github.com/onsi/gomega"
1211

1312
"github.com/openshift-online/async-routine/opid"
1413
)
1514

16-
var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
15+
var _ = Describe("Async Routine Monitor", Ordered, func() {
1716
var mockCtrl *gomock.Controller
1817
BeforeEach(func() {
1918
mockCtrl = gomock.NewController(GinkgoT())
2019
})
21-
It("Track async routine execution", func() {
20+
DescribeTable("Track async routine execution", func(asyncObserver bool) {
2221
manager := newAsyncRoutineManager()
2322
defer manager.Monitor().Stop()
2423

@@ -64,7 +63,13 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
6463
wg.Done()
6564
})
6665

67-
observerId := manager.AddObserver(observer)
66+
var observerId string
67+
if asyncObserver {
68+
observerId = manager.AddObserver(NewAsyncRoutineObserver(observer))
69+
} else {
70+
observerId = manager.AddObserver(observer)
71+
}
72+
6873
manager.Monitor().Start()
6974
defer manager.RemoveObserver(observerId)
7075

@@ -89,8 +94,10 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
8994
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
9095
Expect(executionLog["RoutineFinished"]).
9196
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
92-
})
93-
It("Snapshotting", func() {
97+
},
98+
Entry("with sync observer", false),
99+
Entry("with async observer", true))
100+
DescribeTable("Snapshotting", func(asyncObserver bool) {
94101

95102
manager := newAsyncRoutineManager(WithSnapshottingInterval(time.Second))
96103
Expect(manager.Monitor().IsSnapshottingEnabled()).To(BeTrue())
@@ -155,7 +162,12 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
155162
AnyTimes().
156163
Do(func(name string, count int) { methodCalled("RunningRoutineByNameCount") })
157164

158-
_ = manager.AddObserver(observer)
165+
if asyncObserver {
166+
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
167+
} else {
168+
_ = manager.AddObserver(observer)
169+
}
170+
159171
manager.Monitor().Start()
160172

161173
NewAsyncRoutine("count up to 4 - 1", ctx, func() {
@@ -209,5 +221,7 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
209221
manager.Monitor().Stop()
210222
snapshot = manager.GetSnapshot()
211223
Expect(snapshot.GetTotalRoutineCount()).To(Equal(0))
212-
})
224+
},
225+
Entry("with sync observer", false),
226+
Entry("with async observer", true))
213227
})

0 commit comments

Comments
 (0)