diff --git a/async_routine.go b/async_routine.go index ec1b577..bbe1768 100644 --- a/async_routine.go +++ b/async_routine.go @@ -43,6 +43,8 @@ type AsyncRoutine interface { id() string } +type routineData map[string]string + type asyncRoutine struct { routineId string name string @@ -56,7 +58,7 @@ type asyncRoutine struct { status RoutineStatus ctx context.Context originatorOpId string - data map[string]string + data routineData } func (r *asyncRoutine) Name() string { diff --git a/async_routine_manager.go b/async_routine_manager.go index c3b9a97..225e43c 100644 --- a/async_routine_manager.go +++ b/async_routine_manager.go @@ -53,6 +53,15 @@ func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string { // RemoveObserver removes the given RoutineObserver from the list of observers func (arm *asyncRoutineManager) RemoveObserver(observerId string) { + observer, ok := arm.observers.Get(observerId) + if !ok { + return + } + + if obs, ok := observer.(*asyncRoutineObserver); ok { + obs.stopObserving() + } + arm.observers.Remove(observerId) } diff --git a/async_routine_observer.go b/async_routine_observer.go new file mode 100644 index 0000000..1dc9416 --- /dev/null +++ b/async_routine_observer.go @@ -0,0 +1,230 @@ +package async + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +// Event types for routine lifecycle and snapshotting +type routineLifeCycleEventType = int +type routineSnapshottingEventType = int + +const ( + routineStarted routineLifeCycleEventType = iota + routineEnded + routineTimeboxExceeded +) + +const ( + routineCount routineSnapshottingEventType = iota + routineByNameCount +) + +// DefaultAsyncObserverBufferSize defines the default buffer size for the event channel +// used by async routine observers. This value determines how many events can be queued +// before send operations block or, if not guaranteed, events are dropped. +const DefaultAsyncObserverBufferSize = 16 + +type asyncRoutineObserverOption func(o *asyncRoutineObserver) + +// WithObserverTimeout sets a timeout for the observer routine. +func WithObserverTimeout(timeout time.Duration) func(o *asyncRoutineObserver) { + return func(o *asyncRoutineObserver) { + o.timeout = &timeout + } +} + +// WithObserverRoutineData adds custom data to the observer routine. +func WithObserverRoutineData(key string, value string) func(o *asyncRoutineObserver) { + return func(o *asyncRoutineObserver) { + if o.observerNotifierRoutineData == nil { + o.observerNotifierRoutineData = make(routineData) + } + o.observerNotifierRoutineData[key] = value + } +} + +// WithChannelSize sets the buffer size for the async observer's event channel. +// Use this as an option when creating the async observer. +func WithChannelSize(size int) func(o *asyncRoutineObserver) { + return func(o *asyncRoutineObserver) { + if o.channel != nil { + close(o.channel) + } + o.channel = make(chan asyncRoutineEvent, size) + } +} + +// WithGuaranteedDelivery enables guaranteed delivery mode for the async observer's events. +// When enabled, the observer will not drop events if the channel is full, but will block until space is available. +// Use this as an option when creating the observer. +func WithGuaranteedDelivery() func(o *asyncRoutineObserver) { + return func(o *asyncRoutineObserver) { + o.guaranteedDelivery = true + } +} + +func NewAsyncRoutineObserver(delegate RoutinesObserver, options ...asyncRoutineObserverOption) RoutinesObserver { + asyncObserver := asyncRoutineObserver{ + delegate: delegate, + guaranteedDelivery: false, + } + + asyncObserver.channelIsOpen.Store(true) + + for _, option := range options { + option(&asyncObserver) + } + + if asyncObserver.channel == nil { + asyncObserver.channel = make(chan asyncRoutineEvent, DefaultAsyncObserverBufferSize) + } + + asyncObserver.startObserving() + return &asyncObserver +} + +var _ RoutinesObserver = (*asyncRoutineObserver)(nil) + +type asyncRoutineObserver struct { + delegate RoutinesObserver + channel chan asyncRoutineEvent + channelIsOpen atomic.Bool + timeout *time.Duration + observerNotifierRoutineData routineData + closeOnce sync.Once + guaranteedDelivery bool +} + +func (a *asyncRoutineObserver) RoutineStarted(routine AsyncRoutine) { + a.notify(&routineLifecycleEvent{ + evtType: routineStarted, + routine: routine, + }) +} + +func (a *asyncRoutineObserver) RoutineFinished(routine AsyncRoutine) { + a.notify(&routineLifecycleEvent{ + evtType: routineEnded, + routine: routine, + }) +} + +func (a *asyncRoutineObserver) RoutineExceededTimebox(routine AsyncRoutine) { + a.notify(&routineLifecycleEvent{ + evtType: routineTimeboxExceeded, + routine: routine, + }) +} + +func (a *asyncRoutineObserver) RunningRoutineCount(count int) { + a.notify(&routineCountEvent{ + count: count, + }) +} + +func (a *asyncRoutineObserver) RunningRoutineByNameCount(name string, count int) { + a.notify(&routineByNameCountEvent{ + routineName: name, + count: count, + }) +} + +func (a *asyncRoutineObserver) manageLifecycleEvent(evt *routineLifecycleEvent) { + switch evt.evtType { + case routineStarted: + a.delegate.RoutineStarted(evt.routine) + case routineEnded: + a.delegate.RoutineFinished(evt.routine) + case routineTimeboxExceeded: + a.delegate.RoutineExceededTimebox(evt.routine) + } +} + +func (a *asyncRoutineObserver) startObserving() { + routine := NewAsyncRoutine("async-observer-notifier", context.Background(), func() { + for evt := range a.channel { + switch evt.(type) { + case *routineLifecycleEvent: + a.manageLifecycleEvent(evt.(*routineLifecycleEvent)) + case *routineCountEvent: + a.delegate.RunningRoutineCount(evt.(*routineCountEvent).count) + case *routineByNameCountEvent: + evt := evt.(*routineByNameCountEvent) + a.delegate.RunningRoutineByNameCount(evt.routineName, evt.count) + } + } + }) + + for key, value := range a.observerNotifierRoutineData { + routine = routine.WithData(key, value) + } + + if a.timeout != nil { + routine = routine.Timebox(*a.timeout) + } + + routine.Run() +} + +func (a *asyncRoutineObserver) stopObserving() { + a.closeOnce.Do(func() { + a.channelIsOpen.Store(false) + close(a.channel) + }) +} + +// notify sends an event to the observer channel, non-blocking if closed. +func (a *asyncRoutineObserver) notify(evt asyncRoutineEvent) { + if a.guaranteedDelivery { + if a.channelIsOpen.Load() { + a.channel <- evt + } + // channel is closed. Event is lost. This happens when `stopObserving` is called. + return + } + + select { + case a.channel <- evt: + default: + // Event dropped + } +} + +// --- Event types and interfaces --- + +type asyncRoutineEvent interface { + getEventType() int +} + +var _ asyncRoutineEvent = (*routineLifecycleEvent)(nil) +var _ asyncRoutineEvent = (*routineCountEvent)(nil) +var _ asyncRoutineEvent = (*routineByNameCountEvent)(nil) + +type routineLifecycleEvent struct { + evtType routineLifeCycleEventType + routine AsyncRoutine +} + +func (r *routineLifecycleEvent) getEventType() int { + return r.evtType +} + +type routineCountEvent struct { + count int +} + +func (r routineCountEvent) getEventType() int { + return routineCount +} + +type routineByNameCountEvent struct { + routineName string + count int +} + +func (r routineByNameCountEvent) getEventType() int { + return routineByNameCount +} diff --git a/async_routine_test.go b/async_routine_test.go index a330c5c..f109a33 100644 --- a/async_routine_test.go +++ b/async_routine_test.go @@ -2,17 +2,17 @@ package async import ( "context" + . "github.com/onsi/ginkgo/v2" "sync" "time" "go.uber.org/mock/gomock" - . "github.com/onsi/ginkgo/v2/dsl/core" . "github.com/onsi/gomega" ) var _ = Describe("AsyncRoutine", func() { - It("Run Async Routine", func() { + DescribeTable("Run Async Routine", func(asyncObserver bool) { mockCtrl := gomock.NewController(GinkgoT()) routineRan := false var wg sync.WaitGroup @@ -22,7 +22,12 @@ var _ = Describe("AsyncRoutine", func() { observer := NewMockRoutinesObserver(mockCtrl) manager := newAsyncRoutineManager() - _ = manager.AddObserver(observer) + + if asyncObserver { + _ = manager.AddObserver(NewAsyncRoutineObserver(observer)) + } else { + _ = manager.AddObserver(observer) + } observer.EXPECT().RoutineStarted(gomock.Any()).AnyTimes() observer.EXPECT().RoutineFinished(gomock.Any()).AnyTimes().Do( @@ -46,5 +51,8 @@ var _ = Describe("AsyncRoutine", func() { Expect(routine.FinishedAt()).ToNot(BeNil()) Expect(routine.FinishedAt().After(*routine.StartedAt())).To(BeTrue()) Expect(routine.Status()).To(Equal(RoutineStatusFinished)) - }) + }, + Entry("with sync observer", false), + Entry("with async observer", true), + ) }) diff --git a/async_test.go b/async_test.go index e179d26..62a3570 100644 --- a/async_test.go +++ b/async_test.go @@ -6,19 +6,18 @@ import ( "go.uber.org/mock/gomock" - "github.com/onsi/ginkgo/v2" - . "github.com/onsi/ginkgo/v2/dsl/core" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/openshift-online/async-routine/opid" ) -var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() { +var _ = Describe("Async Routine Monitor", Ordered, func() { var mockCtrl *gomock.Controller BeforeEach(func() { mockCtrl = gomock.NewController(GinkgoT()) }) - It("Track async routine execution", func() { + DescribeTable("Track async routine execution", func(asyncObserver bool) { manager := newAsyncRoutineManager() defer manager.Monitor().Stop() @@ -64,7 +63,13 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() { wg.Done() }) - observerId := manager.AddObserver(observer) + var observerId string + if asyncObserver { + observerId = manager.AddObserver(NewAsyncRoutineObserver(observer)) + } else { + observerId = manager.AddObserver(observer) + } + manager.Monitor().Start() defer manager.RemoveObserver(observerId) @@ -89,8 +94,10 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() { To(ConsistOf("count up to 9", "count up to 9", "count up to 4")) Expect(executionLog["RoutineFinished"]). To(ConsistOf("count up to 9", "count up to 9", "count up to 4")) - }) - It("Snapshotting", func() { + }, + Entry("with sync observer", false), + Entry("with async observer", true)) + DescribeTable("Snapshotting", func(asyncObserver bool) { manager := newAsyncRoutineManager(WithSnapshottingInterval(time.Second)) Expect(manager.Monitor().IsSnapshottingEnabled()).To(BeTrue()) @@ -155,7 +162,12 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() { AnyTimes(). Do(func(name string, count int) { methodCalled("RunningRoutineByNameCount") }) - _ = manager.AddObserver(observer) + if asyncObserver { + _ = manager.AddObserver(NewAsyncRoutineObserver(observer)) + } else { + _ = manager.AddObserver(observer) + } + manager.Monitor().Start() NewAsyncRoutine("count up to 4 - 1", ctx, func() { @@ -209,5 +221,7 @@ var _ = Describe("Async Routine Monitor", ginkgo.Ordered, func() { manager.Monitor().Stop() snapshot = manager.GetSnapshot() Expect(snapshot.GetTotalRoutineCount()).To(Equal(0)) - }) + }, + Entry("with sync observer", false), + Entry("with async observer", true)) })