diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs index d9a0104f..a2c87bd2 100644 --- a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs @@ -80,7 +80,7 @@ public void WhenNewItemsAreAddedTheyArePromotedBasedOnFrequency() LogLru(); for (int k = 0; k < 2; k++) - { + { for (int j = 0; j < 6; j++) { for (int i = 0; i < 15; i++) @@ -264,6 +264,74 @@ public void WriteUpdatesProtectedLruOrder() cache.TryGet(7, out var _).Should().BeTrue(); } + [Fact] + public void WhenHitRateChangesWindowSizeIsAdapted() + { + cache = new ConcurrentLfu(1, 20, new NullScheduler()); + + // First completely fill the cache, push entries into protected + for (int i = 0; i < 20; i++) + { + cache.GetOrAdd(i, k => k); + } + + // W [19] Protected [] Probation [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] + cache.PendingMaintenance(); + LogLru(); + + for (int i = 0; i < 15; i++) + { + cache.GetOrAdd(i, k => k); + } + + // W [19] Protected [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] Probation [15,16,17,18] + cache.PendingMaintenance(); + LogLru(); + + // The reset sample size is 200, so do 200 cache hits + // W [19] Protected [12,13,14,15,16,17,18,0,1,2,3,4,5,6,7] Probation [8,9,10,11] + for (int j = 0; j < 10; j++) + for (int i = 0; i < 20; i++) + { + cache.GetOrAdd(i, k => k); + } + + cache.PendingMaintenance(); + LogLru(); + + // then miss 200 times + // W [300] Protected [12,13,14,15,16,17,18,0,1,2,3,4,5,6,7] Probation [9,10,11,227] + for (int i = 0; i < 201; i++) + { + cache.GetOrAdd(i + 100, k => k); + } + + cache.PendingMaintenance(); + LogLru(); + + // then miss 200 more times (window adaptation +1 window slots) + // W [399,400] Protected [14,15,16,17,18,0,1,2,3,4,5,6,7,227] Probation [9,10,11,12] + for (int i = 0; i < 201; i++) + { + cache.GetOrAdd(i + 200, k => k); + } + + cache.PendingMaintenance(); + LogLru(); + + // make 2 requests to new keys, if window is size is now 2 both will exist: + cache.GetOrAdd(666, k => k); + cache.GetOrAdd(667, k => k); + + cache.PendingMaintenance(); + LogLru(); + + cache.TryGet(666, out var _).Should().BeTrue(); + cache.TryGet(667, out var _).Should().BeTrue(); + + this.output.WriteLine($"Scheduler ran {cache.Scheduler.RunCount} times."); + } + [Fact] public void ReadSchedulesMaintenanceWhenBufferIsFull() { diff --git a/BitFaster.Caching.UnitTests/Lfu/LfuCapacityPartitionTests.cs b/BitFaster.Caching.UnitTests/Lfu/LfuCapacityPartitionTests.cs index cdce6fe9..08e1ba62 100644 --- a/BitFaster.Caching.UnitTests/Lfu/LfuCapacityPartitionTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/LfuCapacityPartitionTests.cs @@ -6,11 +6,19 @@ using BitFaster.Caching.Lfu; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace BitFaster.Caching.UnitTests.Lfu { public class LfuCapacityPartitionTests { + private readonly ITestOutputHelper output; + + public LfuCapacityPartitionTests(ITestOutputHelper output) + { + this.output = output; + } + [Fact] public void WhenCapacityIsLessThan3CtorThrows() { @@ -37,5 +45,151 @@ public void CtorSetsExpectedCapacity(int capacity, int expectedWindow, int expec partition.Protected.Should().Be(expectedProtected); partition.Probation.Should().Be(expectedProbation); } + + [Fact] + public void WhenHitRateKeepsDecreasingWindowIsCappedAt80Percent() + { + int max = 100; + var partition = new LfuCapacityPartition(max); + var metrics = new TestMetrics(); + + SetHitRate(partition, metrics, max, 0.9); + + for (int i = 0; i < 20; i++) + { + SetHitRate(partition, metrics, max, 0.1); + } + + partition.Window.Should().Be(80); + partition.Protected.Should().Be(16); + } + + [Fact] + public void WhenHitRateIsStableWindowConverges() + { + int max = 100; + var partition = new LfuCapacityPartition(max); + var metrics = new TestMetrics(); + + // start by causing some adaptation in window so that steady state is not window = 1 + SetHitRate(partition, metrics, max, 0.9); + + for (int i = 0; i < 5; i++) + { + SetHitRate(partition, metrics, max, 0.1); + } + + this.output.WriteLine("Decrease hit rate"); + SetHitRate(partition, metrics, max, 0.0); + // window is now larger + + // go into steady state with small up and down fluctuation in hit rate + List windowSizes = new List(200); + this.output.WriteLine("Stable hit rate"); + + double inc = 0.01; + for (int i = 0; i < 200; i++) + { + double c = i % 2 == 0 ? inc : -inc; + SetHitRate(partition, metrics, max, 0.9 + c); + + windowSizes.Add(partition.Window); + } + + // verify that hit rate has converged, last 50 samples have low variance + var last50 = windowSizes.Skip(150).Take(50).ToArray(); + + var minWindow = last50.Min(); + var maxWindow = last50.Max(); + + (maxWindow - minWindow).Should().BeLessThanOrEqualTo(1); + } + + [Fact] + public void WhenHitRateFluctuatesWindowIsAdapted() + { + int max = 100; + var partition = new LfuCapacityPartition(max); + var metrics = new TestMetrics(); + + var snapshot = new WindowSnapshot(); + + // steady state, window stays at 1 initially + SetHitRate(partition, metrics, max, 0.9); + SetHitRate(partition, metrics, max, 0.9); + snapshot.Capture(partition); + + // Decrease hit rate, verify window increases each time + this.output.WriteLine("1. Decrease hit rate"); + SetHitRate(partition, metrics, max, 0.1); + snapshot.AssertWindowIncreased(partition); + SetHitRate(partition, metrics, max, 0.1); + snapshot.AssertWindowIncreased(partition); + + // Increase hit rate, verify window continues to increase + this.output.WriteLine("2. Increase hit rate"); + SetHitRate(partition, metrics, max, 0.9); + snapshot.AssertWindowIncreased(partition); + + // Decrease hit rate, verify window decreases + this.output.WriteLine("3. Decrease hit rate"); + SetHitRate(partition, metrics, max, 0.1); + snapshot.AssertWindowDecreased(partition); + + // Increase hit rate, verify window continues to decrease + this.output.WriteLine("4. Increase hit rate"); + SetHitRate(partition, metrics, max, 0.9); + snapshot.AssertWindowDecreased(partition); + SetHitRate(partition, metrics, max, 0.9); + snapshot.AssertWindowDecreased(partition); + } + + private void SetHitRate(LfuCapacityPartition p, TestMetrics m, int max, double hitRate) + { + int total = max * 10; + m.Hits += (long)(total * hitRate); + m.Misses += total - (long)(total * hitRate); + + p.OptimizePartitioning(m, total); + + this.output.WriteLine($"W: {p.Window} P: {p.Protected}"); + } + + private class WindowSnapshot + { + private int prev; + + public void Capture(LfuCapacityPartition p) + { + prev = p.Window; + } + + public void AssertWindowIncreased(LfuCapacityPartition p) + { + p.Window.Should().BeGreaterThan(prev); + prev = p.Window; + } + + public void AssertWindowDecreased(LfuCapacityPartition p) + { + p.Window.Should().BeLessThan(prev); + prev = p.Window; + } + } + + private class TestMetrics : ICacheMetrics + { + public double HitRatio => (double)Hits / (double)Total; + + public long Total => Hits + Misses; + + public long Hits { get; set; } + + public long Misses { get; set; } + + public long Evicted { get; set; } + + public long Updated { get; set; } + } } } diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index d26f078f..259a91c8 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -22,6 +22,7 @@ using BitFaster.Caching.Buffers; using BitFaster.Caching.Lru; using BitFaster.Caching.Scheduler; +using static BitFaster.Caching.Lfu.LfuCapacityPartition; namespace BitFaster.Caching.Lfu { @@ -75,6 +76,8 @@ public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler) this.dictionary = new ConcurrentDictionary>(concurrencyLevel, capacity, comparer); this.readBuffer = new StripedBuffer>(concurrencyLevel, BufferSize); + + // TODO: how big should this be in total? We shouldn't allow more than some capacity % of writes in the buffer this.writeBuffer = new StripedBuffer>(concurrencyLevel, BufferSize); this.cmSketch = new CmSketch(1, comparer); @@ -396,8 +399,9 @@ private bool Maintenance() ArrayPool>.Shared.Return(localDrainBuffer); #endif - // TODO: hill climb EvictEntries(); + this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); + ReFitProtected(); // Reset to idle if either // 1. We drained both input buffers (all work done) @@ -521,13 +525,13 @@ private int EvictFromWindow() private void EvictFromMain(int candidates) { - //var victimQueue = Position.Probation; + // var victimQueue = Position.Probation; var victim = this.probationLru.First; var candidate = this.probationLru.Last; while (this.windowLru.Count + this.probationLru.Count + this.protectedLru.Count > this.Capacity) { - // TODO: is this logic reachable? + // TODO: this logic is only reachable if entries have time expiry, and are removed early. // Search the admission window for additional candidates //if (candidates == 0) //{ @@ -555,7 +559,7 @@ private void EvictFromMain(int candidates) // break; //} - //// Evict immediately if only one of the entries is present + // Evict immediately if only one of the entries is present //if (victim == null) //{ // var previous = candidate.Previous; @@ -581,13 +585,17 @@ private void EvictFromMain(int candidates) if (AdmitCandidate(candidate.Key, victim.Key)) { var evictee = victim; - victim = victim.Previous; + + // victim is initialized to first, and iterates forwards + victim = victim.Next; Evict(evictee); } else { var evictee = candidate; + + // candidate is initialized to last, and iterates backwards candidate = candidate.Previous; Evict(evictee); @@ -611,6 +619,20 @@ private void Evict(LfuNode evictee) this.metrics.evictedCount++; } + private void ReFitProtected() + { + // If hill climbing decreased protected, there may be too many items + // - demote overflow to probation. + while (this.protectedLru.Count > this.capacity.Protected) + { + var demoted = this.protectedLru.First; + this.protectedLru.RemoveFirst(); + + demoted.Position = Position.Probation; + this.probationLru.AddLast(demoted); + } + } + [DebuggerDisplay("{Format()}")] private class DrainStatus { diff --git a/BitFaster.Caching/Lfu/LfuCapacityPartition.cs b/BitFaster.Caching/Lfu/LfuCapacityPartition.cs index d5e76b66..c0248a51 100644 --- a/BitFaster.Caching/Lfu/LfuCapacityPartition.cs +++ b/BitFaster.Caching/Lfu/LfuCapacityPartition.cs @@ -1,19 +1,40 @@ using System; using System.Collections.Generic; -using System.Drawing; using System.Text; namespace BitFaster.Caching.Lfu { public class LfuCapacityPartition { - private readonly int windowCapacity; - private readonly int protectedCapacity; - private readonly int probationCapacity; + private readonly int max; + + private int windowCapacity; + private int protectedCapacity; + private int probationCapacity; + + private double previousHitRate; + private long previousHitCount; + private long previousMissCount; + + private double mainRatio = DefaultMainPercentage; + private double stepSize; + + private const double HillClimberRestartThreshold = 0.05d; + private const double HillClimberStepPercent = 0.0625d; + private const double HillClimberStepDecayRate = 0.98d; + + private const double DefaultMainPercentage = 0.99d; + + private const double MaxMainPercentage = 0.999d; + private const double MinMainPercentage = 0.2d; public LfuCapacityPartition(int totalCapacity) { - (windowCapacity, protectedCapacity, probationCapacity) = ComputeQueueCapacity(totalCapacity); + this.max = totalCapacity; + (windowCapacity, protectedCapacity, probationCapacity) = ComputeQueueCapacity(totalCapacity, DefaultMainPercentage); + InitializeStepSize(totalCapacity); + + previousHitRate = 1.0; } public int Window => this.windowCapacity; @@ -22,16 +43,62 @@ public LfuCapacityPartition(int totalCapacity) public int Probation => this.probationCapacity; - public int Capacity => this.windowCapacity + this.protectedCapacity + this.probationCapacity; + public int Capacity => this.max; + + // Apply changes to the ratio of window to main, window = recency-biased, main = frequency-biased. + public void OptimizePartitioning(ICacheMetrics metrics, int sampleThreshold) + { + long newHits = metrics.Hits; + long newMisses = metrics.Misses; + + long sampleHits = newHits - previousHitCount; + long sampleMisses = newMisses - previousMissCount; + long sampleCount = sampleHits + sampleMisses; + + if (sampleCount < sampleThreshold) + { + return; + } + + double sampleHitRate = (double)sampleHits / sampleCount; + + double hitRateChange = sampleHitRate - previousHitRate; + double amount = (hitRateChange >= 0) ? stepSize : -stepSize; + + double nextStepSize = (Math.Abs(hitRateChange) >= HillClimberRestartThreshold) + ? HillClimberStepPercent * (amount >= 0 ? 1 : -1) + : HillClimberStepDecayRate * amount; + + stepSize = nextStepSize; + + previousHitCount = newHits; + previousMissCount = newMisses; + previousHitRate = sampleHitRate; + + mainRatio -= amount; + mainRatio = Clamp(mainRatio, MinMainPercentage, MaxMainPercentage); + + (windowCapacity, protectedCapacity, probationCapacity) = ComputeQueueCapacity(max, mainRatio); + } + + private void InitializeStepSize(int cacheSize) + { + stepSize = HillClimberStepPercent; + } + + private double Clamp(double input, double min, double max) + { + return Math.Max(min, Math.Min(input, max)); + } - private static (int window, int mainProtected, int mainProbation) ComputeQueueCapacity(int capacity) + private static (int window, int mainProtected, int mainProbation) ComputeQueueCapacity(int capacity, double mainPercentage) { if (capacity < 3) { throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be greater than or equal to 3."); } - int window = capacity - (int)(0.99 * capacity); + int window = capacity - (int)(mainPercentage * capacity); int mainProtected = (int)(0.8 * (capacity - window)); int mainProbation = capacity - window - mainProtected;