Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
accumulator.add(fiveSecondsSeven) shouldBe empty // second 0:35
}

"not align snapshot when optimistic tick alignment is false" in {
// When the kamon.metric.optimistic-tick-alignment is false
// If accumulating over 11 seconds, the snapshots should be generated at 00:00:11, 00:00:22, 00:00:33 and so on.
// Thus the snapshot next tick is never aligned
applyConfig("kamon.metric.optimistic-tick-alignment = no")

val accumulator = newAccumulator(11, 0)
// as the first add snapshot.to determines the first ticker use zero second
// to send snapshot at the seconds that are multiples of the duration
accumulator.add(zeroSecond) shouldBe empty // second 0:0
accumulator.add(fiveSecondsOne) shouldBe empty // second 0:5
accumulator.add(fiveSecondsTwo) shouldBe empty // second 0:10

val s15 = accumulator.add(fiveSecondsThree).value // second 0:15
s15.from shouldBe(zeroSecond.from)
s15.to shouldBe(fiveSecondsThree.to)

accumulator.add(fiveSecondsFour) shouldBe empty // second 0:20
val s25 = accumulator.add(fiveSecondsFive).value // second 0:25
s25.from shouldBe(fiveSecondsFour.from)
s25.to shouldBe(fiveSecondsFive.to)

accumulator.add(fiveSecondsSix) shouldBe empty // second 0:30
}

"do best effort to align when snapshots themselves are not aligned" in {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")

val accumulator = newAccumulator(30, 0)
accumulator.add(tenSecondsOne) shouldBe empty // second 0:13
accumulator.add(tenSecondsTwo) shouldBe empty // second 0:23
Expand Down Expand Up @@ -139,6 +166,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
}

"produce a snapshot when enough data has been accumulated" in {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")

val accumulator = newAccumulator(15, 1)
accumulator.add(fiveSecondsOne) shouldBe empty
accumulator.add(fiveSecondsTwo) shouldBe empty
Expand Down Expand Up @@ -167,6 +196,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
val alignedZeroTime = Clock.nextAlignedInstant(Kamon.clock().instant(), Duration.ofSeconds(60)).minusSeconds(60)
val unAlignedZeroTime = alignedZeroTime.plusSeconds(3)

val zeroSecond = createPeriodSnapshot(alignedZeroTime, alignedZeroTime, 13)

// Aligned snapshots, every 5 seconds from second 00.
val fiveSecondsOne = createPeriodSnapshot(alignedZeroTime, alignedZeroTime.plusSeconds(5), 22)
val fiveSecondsTwo = createPeriodSnapshot(alignedZeroTime.plusSeconds(5), alignedZeroTime.plusSeconds(10), 33)
Expand Down Expand Up @@ -221,6 +252,7 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
)

override protected def beforeAll(): Unit = {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")
applyConfig("kamon.metric.tick-interval = 10 seconds")
}
}
21 changes: 16 additions & 5 deletions core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ object PeriodSnapshot {
private var _accumulatingFrom: Option[Instant] = None

def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = {

// Initialize the next tick based on incoming snapshots.
if(_nextTick == Instant.EPOCH)
_nextTick = Clock.nextAlignedInstant(periodSnapshot.to, period)
if (_nextTick == Instant.EPOCH) {
_nextTick = nextInstant(periodSnapshot.to)
}

// short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the
// snapshots have a longer period than the duration).
Expand All @@ -116,10 +116,9 @@ object PeriodSnapshot {

for(from <- _accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield {
val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true)
_nextTick = Clock.nextAlignedInstant(_nextTick, period)
_nextTick = nextInstant(_nextTick)
_accumulatingFrom = None
clearAccumulatedData()

accumulatedPeriodSnapshot
}
}
Expand All @@ -133,6 +132,18 @@ object PeriodSnapshot {
Duration.between(instant, _nextTick.minus(margin)).toMillis() <= 0
}

private def nextInstant(from: Instant): Instant = {
if (isOptimisticAlignmentEnabled()) {
Clock.nextAlignedInstant(from, period)
} else {
Instant.ofEpochMilli(from.toEpochMilli + period.toMillis)
}
}

private def isOptimisticAlignmentEnabled(): Boolean = {
Kamon.config().getBoolean("kamon.metric.optimistic-tick-alignment")
}

private def isSameDurationAsTickInterval(): Boolean = {
Kamon.config().getDuration("kamon.metric.tick-interval") == period
}
Expand Down