Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion async_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type AsyncRoutine interface {
id() string
}

type routineData map[string]string

type asyncRoutine struct {
routineId string
name string
Expand All @@ -56,7 +58,7 @@ type asyncRoutine struct {
status RoutineStatus
ctx context.Context
originatorOpId string
data map[string]string
data routineData
}

func (r *asyncRoutine) Name() string {
Expand Down
9 changes: 9 additions & 0 deletions async_routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string {

// RemoveObserver removes the given RoutineObserver from the list of observers
func (arm *asyncRoutineManager) RemoveObserver(observerId string) {
observer, ok := arm.observers.Get(observerId)
if !ok {
return
}

if obs, ok := observer.(*asyncRoutineObserver); ok {
obs.stopObserving()
}

arm.observers.Remove(observerId)
}

Expand Down
230 changes: 230 additions & 0 deletions async_routine_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package async

import (
"context"
"sync"
"sync/atomic"
"time"
)

// Event types for routine lifecycle and snapshotting
type routineLifeCycleEventType = int
type routineSnapshottingEventType = int

const (
routineStarted routineLifeCycleEventType = iota
routineEnded
routineTimeboxExceeded
)

const (
routineCount routineSnapshottingEventType = iota
routineByNameCount
)

// DefaultAsyncObserverBufferSize defines the default buffer size for the event channel
// used by async routine observers. This value determines how many events can be queued
// before send operations block or, if not guaranteed, events are dropped.
const DefaultAsyncObserverBufferSize = 16

type asyncRoutineObserverOption func(o *asyncRoutineObserver)

// WithObserverTimeout sets a timeout for the observer routine.
func WithObserverTimeout(timeout time.Duration) func(o *asyncRoutineObserver) {
return func(o *asyncRoutineObserver) {
o.timeout = &timeout
}
}

// WithObserverRoutineData adds custom data to the observer routine.
func WithObserverRoutineData(key string, value string) func(o *asyncRoutineObserver) {
return func(o *asyncRoutineObserver) {
if o.observerNotifierRoutineData == nil {
o.observerNotifierRoutineData = make(routineData)
}
o.observerNotifierRoutineData[key] = value
}
}

// WithChannelSize sets the buffer size for the async observer's event channel.
// Use this as an option when creating the async observer.
func WithChannelSize(size int) func(o *asyncRoutineObserver) {
return func(o *asyncRoutineObserver) {
if o.channel != nil {
close(o.channel)
}
o.channel = make(chan asyncRoutineEvent, size)
}
}

// WithGuaranteedDelivery enables guaranteed delivery mode for the async observer's events.
// When enabled, the observer will not drop events if the channel is full, but will block until space is available.
// Use this as an option when creating the observer.
func WithGuaranteedDelivery() func(o *asyncRoutineObserver) {
return func(o *asyncRoutineObserver) {
o.guaranteedDelivery = true
}
}

func NewAsyncRoutineObserver(delegate RoutinesObserver, options ...asyncRoutineObserverOption) RoutinesObserver {
asyncObserver := asyncRoutineObserver{
delegate: delegate,
guaranteedDelivery: false,
}

asyncObserver.channelIsOpen.Store(true)

for _, option := range options {
option(&asyncObserver)
}

if asyncObserver.channel == nil {
asyncObserver.channel = make(chan asyncRoutineEvent, DefaultAsyncObserverBufferSize)
}

asyncObserver.startObserving()
return &asyncObserver
}

var _ RoutinesObserver = (*asyncRoutineObserver)(nil)

type asyncRoutineObserver struct {
delegate RoutinesObserver
channel chan asyncRoutineEvent
channelIsOpen atomic.Bool
timeout *time.Duration
observerNotifierRoutineData routineData
closeOnce sync.Once
guaranteedDelivery bool
}

func (a *asyncRoutineObserver) RoutineStarted(routine AsyncRoutine) {
a.notify(&routineLifecycleEvent{
evtType: routineStarted,
routine: routine,
})
}

func (a *asyncRoutineObserver) RoutineFinished(routine AsyncRoutine) {
a.notify(&routineLifecycleEvent{
evtType: routineEnded,
routine: routine,
})
}

func (a *asyncRoutineObserver) RoutineExceededTimebox(routine AsyncRoutine) {
a.notify(&routineLifecycleEvent{
evtType: routineTimeboxExceeded,
routine: routine,
})
}

func (a *asyncRoutineObserver) RunningRoutineCount(count int) {
a.notify(&routineCountEvent{
count: count,
})
}

func (a *asyncRoutineObserver) RunningRoutineByNameCount(name string, count int) {
a.notify(&routineByNameCountEvent{
routineName: name,
count: count,
})
}

func (a *asyncRoutineObserver) manageLifecycleEvent(evt *routineLifecycleEvent) {
switch evt.evtType {
case routineStarted:
a.delegate.RoutineStarted(evt.routine)
case routineEnded:
a.delegate.RoutineFinished(evt.routine)
case routineTimeboxExceeded:
a.delegate.RoutineExceededTimebox(evt.routine)
}
}

func (a *asyncRoutineObserver) startObserving() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this be getting ctx as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What usage of the context are you thinking about?

routine := NewAsyncRoutine("async-observer-notifier", context.Background(), func() {
for evt := range a.channel {
switch evt.(type) {
case *routineLifecycleEvent:
a.manageLifecycleEvent(evt.(*routineLifecycleEvent))
case *routineCountEvent:
a.delegate.RunningRoutineCount(evt.(*routineCountEvent).count)
case *routineByNameCountEvent:
evt := evt.(*routineByNameCountEvent)
a.delegate.RunningRoutineByNameCount(evt.routineName, evt.count)
}
}
})

for key, value := range a.observerNotifierRoutineData {
routine = routine.WithData(key, value)
}

if a.timeout != nil {
routine = routine.Timebox(*a.timeout)
}

routine.Run()
}

func (a *asyncRoutineObserver) stopObserving() {
a.closeOnce.Do(func() {
a.channelIsOpen.Store(false)
close(a.channel)
})
}

// notify sends an event to the observer channel, non-blocking if closed.
func (a *asyncRoutineObserver) notify(evt asyncRoutineEvent) {
if a.guaranteedDelivery {
if a.channelIsOpen.Load() {
a.channel <- evt
}
// channel is closed. Event is lost. This happens when `stopObserving` is called.
return
}

select {
case a.channel <- evt:
default:
// Event dropped
}
}

// --- Event types and interfaces ---

type asyncRoutineEvent interface {
getEventType() int
}

var _ asyncRoutineEvent = (*routineLifecycleEvent)(nil)
var _ asyncRoutineEvent = (*routineCountEvent)(nil)
var _ asyncRoutineEvent = (*routineByNameCountEvent)(nil)

type routineLifecycleEvent struct {
evtType routineLifeCycleEventType
routine AsyncRoutine
}

func (r *routineLifecycleEvent) getEventType() int {
return r.evtType
}

type routineCountEvent struct {
count int
}

func (r routineCountEvent) getEventType() int {
return routineCount
}

type routineByNameCountEvent struct {
routineName string
count int
}

func (r routineByNameCountEvent) getEventType() int {
return routineByNameCount
}
16 changes: 12 additions & 4 deletions async_routine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package async

import (
"context"
. "github.com/onsi/ginkgo/v2"
"sync"
"time"

"go.uber.org/mock/gomock"

. "github.com/onsi/ginkgo/v2/dsl/core"
. "github.com/onsi/gomega"
)

var _ = Describe("AsyncRoutine", func() {
It("Run Async Routine", func() {
DescribeTable("Run Async Routine", func(asyncObserver bool) {
mockCtrl := gomock.NewController(GinkgoT())
routineRan := false
var wg sync.WaitGroup
Expand All @@ -22,7 +22,12 @@ var _ = Describe("AsyncRoutine", func() {
observer := NewMockRoutinesObserver(mockCtrl)

manager := newAsyncRoutineManager()
_ = manager.AddObserver(observer)

if asyncObserver {
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
} else {
_ = manager.AddObserver(observer)
}

observer.EXPECT().RoutineStarted(gomock.Any()).AnyTimes()
observer.EXPECT().RoutineFinished(gomock.Any()).AnyTimes().Do(
Expand All @@ -46,5 +51,8 @@ var _ = Describe("AsyncRoutine", func() {
Expect(routine.FinishedAt()).ToNot(BeNil())
Expect(routine.FinishedAt().After(*routine.StartedAt())).To(BeTrue())
Expect(routine.Status()).To(Equal(RoutineStatusFinished))
})
},
Entry("with sync observer", false),
Entry("with async observer", true),
)
})
32 changes: 23 additions & 9 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ import (

"go.uber.org/mock/gomock"

"github.com/onsi/ginkgo/v2"
. "github.com/onsi/ginkgo/v2/dsl/core"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/openshift-online/async-routine/opid"
)

var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
var _ = Describe("Async Routine Monitor", Ordered, func() {
var mockCtrl *gomock.Controller
BeforeEach(func() {
mockCtrl = gomock.NewController(GinkgoT())
})
It("Track async routine execution", func() {
DescribeTable("Track async routine execution", func(asyncObserver bool) {
manager := newAsyncRoutineManager()
defer manager.Monitor().Stop()

Expand Down Expand Up @@ -64,7 +63,13 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
wg.Done()
})

observerId := manager.AddObserver(observer)
var observerId string
if asyncObserver {
observerId = manager.AddObserver(NewAsyncRoutineObserver(observer))
} else {
observerId = manager.AddObserver(observer)
}

manager.Monitor().Start()
defer manager.RemoveObserver(observerId)

Expand All @@ -89,8 +94,10 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
Expect(executionLog["RoutineFinished"]).
To(ConsistOf("count up to 9", "count up to 9", "count up to 4"))
})
It("Snapshotting", func() {
},
Entry("with sync observer", false),
Entry("with async observer", true))
DescribeTable("Snapshotting", func(asyncObserver bool) {

manager := newAsyncRoutineManager(WithSnapshottingInterval(time.Second))
Expect(manager.Monitor().IsSnapshottingEnabled()).To(BeTrue())
Expand Down Expand Up @@ -155,7 +162,12 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
AnyTimes().
Do(func(name string, count int) { methodCalled("RunningRoutineByNameCount") })

_ = manager.AddObserver(observer)
if asyncObserver {
_ = manager.AddObserver(NewAsyncRoutineObserver(observer))
} else {
_ = manager.AddObserver(observer)
}

manager.Monitor().Start()

NewAsyncRoutine("count up to 4 - 1", ctx, func() {
Expand Down Expand Up @@ -209,5 +221,7 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() {
manager.Monitor().Stop()
snapshot = manager.GetSnapshot()
Expect(snapshot.GetTotalRoutineCount()).To(Equal(0))
})
},
Entry("with sync observer", false),
Entry("with async observer", true))
})