diff --git a/meter.go b/meter.go index 223669b..8808f09 100644 --- a/meter.go +++ b/meter.go @@ -43,7 +43,8 @@ func NewMeter() Meter { arbiter.meters[m] = struct{}{} if !arbiter.started { arbiter.started = true - go arbiter.tick() + arbiter.cancel = make(chan struct{}) + go arbiter.tick(arbiter.ticker, arbiter.cancel) } return m } @@ -146,8 +147,20 @@ func newStandardMeter() *StandardMeter { func (m *StandardMeter) Stop() { if atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { arbiter.Lock() + defer arbiter.Unlock() + delete(arbiter.meters, m) - arbiter.Unlock() + + if len(arbiter.meters) > 0 { + return + } + + // reset the arbiter to a newly intialized state, allows + // background goroutine (arbiter.tick) to terminate cleanly. + arbiter.ticker.Stop() + close(arbiter.cancel) + arbiter.ticker = time.NewTicker(5e9) + arbiter.started = false } } @@ -228,16 +241,19 @@ type meterArbiter struct { started bool meters map[*StandardMeter]struct{} ticker *time.Ticker + cancel chan struct{} } var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})} // Ticks meters on the scheduled interval -func (ma *meterArbiter) tick() { +func (ma *meterArbiter) tick(ticker *time.Ticker, cancel <-chan struct{}) { for { select { - case <-ma.ticker.C: + case <-ticker.C: ma.tickMeters() + case <-cancel: + return } } } diff --git a/meter_test.go b/meter_test.go index ecef37d..eb1d041 100644 --- a/meter_test.go +++ b/meter_test.go @@ -34,7 +34,7 @@ func TestMeterConcurrency(t *testing.T) { } m := newStandardMeter() ma.meters[m] = struct{}{} - go ma.tick() + go ma.tick(ma.ticker, nil) wg := &sync.WaitGroup{} reps := 100 for i := 0; i < reps; i++ { @@ -67,7 +67,7 @@ func TestMeterDecay(t *testing.T) { } m := newStandardMeter() ma.meters[m] = struct{}{} - go ma.tick() + go ma.tick(ma.ticker, nil) m.Mark(1) rateMean := m.RateMean() time.Sleep(100 * time.Millisecond)