Skip to content

Commit 80fd09b

Browse files
committed
feat: add an async observer wrapper
1 parent eeb6fde commit 80fd09b

File tree

5 files changed

+277
-14
lines changed

5 files changed

+277
-14
lines changed

async_routine.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ type AsyncRoutine interface {
4343
id() string
4444
}
4545

46+
type routineData map[string]string
47+
4648
type asyncRoutine struct {
4749
routineId string
4850
name string
@@ -56,7 +58,7 @@ type asyncRoutine struct {
5658
status RoutineStatus
5759
ctx context.Context
5860
originatorOpId string
59-
data map[string]string
61+
data routineData
6062
}
6163

6264
func (r *asyncRoutine) Name() string {

async_routine_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string {
5353

5454
// RemoveObserver removes the given RoutineObserver from the list of observers
5555
func (arm *asyncRoutineManager) RemoveObserver(observerId string) {
56+
observer, ok := arm.observers.Get(observerId)
57+
if !ok {
58+
return
59+
}
60+
61+
if obs, ok := observer.(*asyncRoutineObserver); ok {
62+
obs.stopObserving()
63+
}
64+
5665
arm.observers.Remove(observerId)
5766
}
5867

async_routine_observer.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"time"
8+
)
9+
10+
// Event types for routine lifecycle and snapshotting
11+
type routineLifeCycleEventType = int
12+
type routineSnapshottingEventType = int
13+
14+
const (
15+
routineStarted routineLifeCycleEventType = iota
16+
routineEnded
17+
routineTimeboxExceeded
18+
)
19+
20+
const (
21+
routineCount routineSnapshottingEventType = iota
22+
routineByNameCount
23+
)
24+
25+
// DefaultAsyncObserverBufferSize defines the default buffer size for the event channel
26+
// used by async routine observers. This value determines how many events can be queued
27+
// before send operations block or, if not guaranteed, events are dropped.
28+
const DefaultAsyncObserverBufferSize = 16
29+
30+
type asyncRoutineObserverOption func(o *asyncRoutineObserver)
31+
32+
// WithObserverTimeout sets a timeout for the observer routine.
33+
func WithObserverTimeout(timeout time.Duration) func(o *asyncRoutineObserver) {
34+
return func(o *asyncRoutineObserver) {
35+
o.timeout = &timeout
36+
}
37+
}
38+
39+
// WithObserverRoutineData adds custom data to the observer routine.
40+
func WithObserverRoutineData(key string, value string) func(o *asyncRoutineObserver) {
41+
return func(o *asyncRoutineObserver) {
42+
if o.observerNotifierRoutineData == nil {
43+
o.observerNotifierRoutineData = make(routineData)
44+
}
45+
o.observerNotifierRoutineData[key] = value
46+
}
47+
}
48+
49+
// WithChannelSize sets the buffer size for the async observer's event channel.
50+
// Use this as an option when creating the async observer.
51+
func WithChannelSize(size int) func(o *asyncRoutineObserver) {
52+
return func(o *asyncRoutineObserver) {
53+
if o.channel != nil {
54+
close(o.channel)
55+
}
56+
o.channel = make(chan asyncRoutineEvent, size)
57+
}
58+
}
59+
60+
// WithGuaranteedDelivery enables guaranteed delivery mode for the async observer's events.
61+
// When enabled, the observer will not drop events if the channel is full, but will block until space is available.
62+
// Use this as an option when creating the observer.
63+
func WithGuaranteedDelivery() func(o *asyncRoutineObserver) {
64+
return func(o *asyncRoutineObserver) {
65+
o.guaranteedDelivery = true
66+
}
67+
}
68+
69+
func NewAsyncRoutineObserver(delegate RoutinesObserver, options ...asyncRoutineObserverOption) RoutinesObserver {
70+
asyncObserver := asyncRoutineObserver{
71+
delegate: delegate,
72+
guaranteedDelivery: false,
73+
}
74+
75+
asyncObserver.channelIsOpen.Store(true)
76+
77+
for _, option := range options {
78+
option(&asyncObserver)
79+
}
80+
81+
if asyncObserver.channel == nil {
82+
asyncObserver.channel = make(chan asyncRoutineEvent, DefaultAsyncObserverBufferSize)
83+
}
84+
85+
asyncObserver.startObserving()
86+
return &asyncObserver
87+
}
88+
89+
var _ RoutinesObserver = (*asyncRoutineObserver)(nil)
90+
91+
type asyncRoutineObserver struct {
92+
delegate RoutinesObserver
93+
channel chan asyncRoutineEvent
94+
channelIsOpen atomic.Bool
95+
timeout *time.Duration
96+
observerNotifierRoutineData routineData
97+
closeOnce sync.Once
98+
guaranteedDelivery bool
99+
}
100+
101+
func (a *asyncRoutineObserver) RoutineStarted(routine AsyncRoutine) {
102+
a.notify(&routineLifecycleEvent{
103+
evtType: routineStarted,
104+
routine: routine,
105+
})
106+
}
107+
108+
func (a *asyncRoutineObserver) RoutineFinished(routine AsyncRoutine) {
109+
a.notify(&routineLifecycleEvent{
110+
evtType: routineEnded,
111+
routine: routine,
112+
})
113+
}
114+
115+
func (a *asyncRoutineObserver) RoutineExceededTimebox(routine AsyncRoutine) {
116+
a.notify(&routineLifecycleEvent{
117+
evtType: routineTimeboxExceeded,
118+
routine: routine,
119+
})
120+
}
121+
122+
func (a *asyncRoutineObserver) RunningRoutineCount(count int) {
123+
a.notify(&routineCountEvent{
124+
count: count,
125+
})
126+
}
127+
128+
func (a *asyncRoutineObserver) RunningRoutineByNameCount(name string, count int) {
129+
a.notify(&routineByNameCountEvent{
130+
routineName: name,
131+
count: count,
132+
})
133+
}
134+
135+
func (a *asyncRoutineObserver) manageLifecycleEvent(evt *routineLifecycleEvent) {
136+
switch evt.evtType {
137+
case routineStarted:
138+
a.delegate.RoutineStarted(evt.routine)
139+
case routineEnded:
140+
a.delegate.RoutineFinished(evt.routine)
141+
case routineTimeboxExceeded:
142+
a.delegate.RoutineExceededTimebox(evt.routine)
143+
}
144+
}
145+
146+
func (a *asyncRoutineObserver) startObserving() {
147+
routine := NewAsyncRoutine("async-observer-notifier", context.Background(), func() {
148+
for evt := range a.channel {
149+
switch evt.(type) {
150+
case *routineLifecycleEvent:
151+
a.manageLifecycleEvent(evt.(*routineLifecycleEvent))
152+
case *routineCountEvent:
153+
a.delegate.RunningRoutineCount(evt.(*routineCountEvent).count)
154+
case *routineByNameCountEvent:
155+
evt := evt.(*routineByNameCountEvent)
156+
a.delegate.RunningRoutineByNameCount(evt.routineName, evt.count)
157+
}
158+
}
159+
})
160+
161+
for key, value := range a.observerNotifierRoutineData {
162+
routine = routine.WithData(key, value)
163+
}
164+
165+
if a.timeout != nil {
166+
routine = routine.Timebox(*a.timeout)
167+
}
168+
169+
routine.Run()
170+
}
171+
172+
func (a *asyncRoutineObserver) stopObserving() {
173+
a.closeOnce.Do(func() {
174+
a.channelIsOpen.Store(false)
175+
close(a.channel)
176+
})
177+
}
178+
179+
// notify sends an event to the observer channel, non-blocking if closed.
180+
func (a *asyncRoutineObserver) notify(evt asyncRoutineEvent) {
181+
if a.guaranteedDelivery {
182+
if a.channelIsOpen.Load() {
183+
a.channel <- evt
184+
}
185+
// channel is closed. Event is lost. This happens when `stopObserving` is called.
186+
return
187+
}
188+
189+
select {
190+
case a.channel <- evt:
191+
default:
192+
// Event dropped
193+
}
194+
}
195+
196+
// --- Event types and interfaces ---
197+
198+
type asyncRoutineEvent interface {
199+
getEventType() int
200+
}
201+
202+
var _ asyncRoutineEvent = (*routineLifecycleEvent)(nil)
203+
var _ asyncRoutineEvent = (*routineCountEvent)(nil)
204+
var _ asyncRoutineEvent = (*routineByNameCountEvent)(nil)
205+
206+
type routineLifecycleEvent struct {
207+
evtType routineLifeCycleEventType
208+
routine AsyncRoutine
209+
}
210+
211+
func (r *routineLifecycleEvent) getEventType() int {
212+
return r.evtType
213+
}
214+
215+
type routineCountEvent struct {
216+
count int
217+
}
218+
219+
func (r routineCountEvent) getEventType() int {
220+
return routineCount
221+
}
222+
223+
type routineByNameCountEvent struct {
224+
routineName string
225+
count int
226+
}
227+
228+
func (r routineByNameCountEvent) getEventType() int {
229+
return routineByNameCount
230+
}

async_routine_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ package async
22

33
import (
44
"context"
5+
. "github.com/onsi/ginkgo/v2"
56
"sync"
67
"time"
78

89
"go.uber.org/mock/gomock"
910

10-
. "github.com/onsi/ginkgo/v2/dsl/core"
1111
. "github.com/onsi/gomega"
1212
)
1313

1414
var _ = Describe("AsyncRoutine", func() {
15-
It("Run Async Routine", func() {
15+
DescribeTable("Run Async Routine", func(asyncObserver bool) {
1616
mockCtrl := gomock.NewController(GinkgoT())
1717
routineRan := false
1818
var wg sync.WaitGroup
@@ -22,7 +22,12 @@ var _ = Describe("AsyncRoutine", func() {
2222
observer := NewMockRoutinesObserver(mockCtrl)
2323

2424
manager := newAsyncRoutineManager()
25-
_ = manager.AddObserver(observer)
25+
26+
if asyncObserver {
27+
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
28+
} else {
29+
_ = manager.AddObserver(observer)
30+
}
2631

2732
observer.EXPECT().RoutineStarted(gomock.Any()).AnyTimes()
2833
observer.EXPECT().RoutineFinished(gomock.Any()).AnyTimes().Do(
@@ -46,5 +51,8 @@ var _ = Describe("AsyncRoutine", func() {
4651
Expect(routine.FinishedAt()).ToNot(BeNil())
4752
Expect(routine.FinishedAt().After(*routine.StartedAt())).To(BeTrue())
4853
Expect(routine.Status()).To(Equal(RoutineStatusFinished))
49-
})
54+
},
55+
Entry("with sync observer", false),
56+
Entry("with async observer", true),
57+
)
5058
})

async_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,18 @@ import (
66

77
"go.uber.org/mock/gomock"
88

9-
"github.com/onsi/ginkgo/v2"
10-
. "github.com/onsi/ginkgo/v2/dsl/core"
9+
. "github.com/onsi/ginkgo/v2"
1110
. "github.com/onsi/gomega"
1211

1312
"github.com/openshift-online/async-routine/opid"
1413
)
1514

16-
var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
15+
var _ = Describe("Async Routine Monitor", Ordered, func() {
1716
var mockCtrl *gomock.Controller
1817
BeforeEach(func() {
1918
mockCtrl = gomock.NewController(GinkgoT())
2019
})
21-
It("Track async routine execution", func() {
20+
DescribeTable("Track async routine execution", func(asyncObserver bool) {
2221
manager := newAsyncRoutineManager()
2322
defer manager.Monitor().Stop()
2423

@@ -64,7 +63,13 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
6463
wg.Done()
6564
})
6665

67-
observerId := manager.AddObserver(observer)
66+
var observerId string
67+
if asyncObserver {
68+
observerId = manager.AddObserver(NewAsyncRoutineObserver(observer))
69+
} else {
70+
observerId = manager.AddObserver(observer)
71+
}
72+
6873
manager.Monitor().Start()
6974
defer manager.RemoveObserver(observerId)
7075

@@ -89,8 +94,10 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
8994
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
9095
Expect(executionLog["RoutineFinished"]).
9196
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
92-
})
93-
It("Snapshotting", func() {
97+
},
98+
Entry("with sync observer", false),
99+
Entry("with async observer", true))
100+
DescribeTable("Snapshotting", func(asyncObserver bool) {
94101

95102
manager := newAsyncRoutineManager(WithSnapshottingInterval(time.Second))
96103
Expect(manager.Monitor().IsSnapshottingEnabled()).To(BeTrue())
@@ -155,7 +162,12 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
155162
AnyTimes().
156163
Do(func(name string, count int) { methodCalled("RunningRoutineByNameCount") })
157164

158-
_ = manager.AddObserver(observer)
165+
if asyncObserver {
166+
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
167+
} else {
168+
_ = manager.AddObserver(observer)
169+
}
170+
159171
manager.Monitor().Start()
160172

161173
NewAsyncRoutine("count up to 4 - 1", ctx, func() {
@@ -209,5 +221,7 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
209221
manager.Monitor().Stop()
210222
snapshot = manager.GetSnapshot()
211223
Expect(snapshot.GetTotalRoutineCount()).To(Equal(0))
212-
})
224+
},
225+
Entry("with sync observer", false),
226+
Entry("with async observer", true))
213227
})

0 commit comments

Comments
 (0)