Skip to content

Commit 1cae789

Browse files
authored
Merge pull request kube-rs#1838 from doxxx93/predicate-cache-ttl
feat(predicate): add configurable cache TTL for predicate filtering
2 parents 7c63f56 + dbd0c51 commit 1cae789

File tree

8 files changed

+147
-20
lines changed

8 files changed

+147
-20
lines changed

examples/node_reflector.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ async fn main() -> anyhow::Result<()> {
2424
.default_backoff()
2525
.reflect(writer)
2626
.applied_objects()
27-
.predicate_filter(predicates::labels.combine(predicates::annotations));
27+
.predicate_filter(
28+
predicates::labels.combine(predicates::annotations),
29+
Default::default(),
30+
);
2831
let mut stream = pin!(stream);
2932

3033
// Periodically read our state in the background

examples/pod_reflector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
4141
})
4242
.reflect(writer)
4343
.applied_objects()
44-
.predicate_filter(predicates::resource_version);
44+
.predicate_filter(predicates::resource_version, Default::default());
4545
let mut stream = pin!(stream);
4646

4747
while let Some(pod) = stream.try_next().await? {

examples/shared_stream_controllers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
6262
let filtered = subscriber
6363
.clone()
6464
.map(|r| Ok(r.deref().clone()))
65-
.predicate_filter(predicates::resource_version)
65+
.predicate_filter(predicates::resource_version, Default::default())
6666
.filter_map(|r| future::ready(r.ok().map(Arc::new)));
6767

6868
// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to

kube-runtime/src/controller/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ where
730730
/// .default_backoff()
731731
/// .reflect(writer)
732732
/// .applied_objects()
733-
/// .predicate_filter(predicates::generation);
733+
/// .predicate_filter(predicates::generation, Default::default());
734734
///
735735
/// Controller::for_stream(deploys, reader)
736736
/// .run(reconcile, error_policy, Arc::new(()))
@@ -993,7 +993,7 @@ where
993993
/// # async fn doc(client: kube::Client) {
994994
/// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
995995
/// .touched_objects()
996-
/// .predicate_filter(predicates::generation);
996+
/// .predicate_filter(predicates::generation, Default::default());
997997
///
998998
/// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
999999
/// .owns_stream(sts_stream)
@@ -1271,7 +1271,7 @@ where
12711271
/// let cr: Api<CustomResource> = Api::all(client.clone());
12721272
/// let daemons = watcher(api, watcher::Config::default())
12731273
/// .touched_objects()
1274-
/// .predicate_filter(predicates::generation);
1274+
/// .predicate_filter(predicates::generation, Default::default());
12751275
///
12761276
/// Controller::new(cr, watcher::Config::default())
12771277
/// .watches_stream(daemons, mapper)

kube-runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ pub use scheduler::scheduler;
3737
pub use utils::WatchStreamExt;
3838
pub use watcher::{metadata_watcher, watcher};
3939

40-
pub use utils::{predicates, Predicate};
40+
pub use utils::{predicates, Predicate, PredicateConfig};
4141
pub use wait::conditions;

kube-runtime/src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ mod watch_ext;
1212
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
1313
pub use event_decode::EventDecode;
1414
pub use event_modify::EventModify;
15-
pub use predicate::{predicates, Predicate, PredicateFilter};
15+
pub use predicate::{predicates, Config as PredicateConfig, Predicate, PredicateFilter};
1616
pub use reflect::Reflect;
1717
pub use stream_backoff::StreamBackoff;
1818
pub use watch_ext::WatchStreamExt;

kube-runtime/src/utils/predicate.rs

Lines changed: 128 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
collections::{hash_map::DefaultHasher, HashMap},
1111
hash::{Hash, Hasher},
1212
marker::PhantomData,
13+
time::{Duration, Instant},
1314
};
1415

1516
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
@@ -114,6 +115,44 @@ where
114115
}
115116
}
116117

118+
/// Configuration for predicate filtering with cache TTL
119+
#[derive(Debug, Clone)]
120+
pub struct Config {
121+
/// Time-to-live for cache entries
122+
///
123+
/// Entries not seen for at least this long will be evicted from the cache.
124+
/// Default is 1 hour.
125+
ttl: Duration,
126+
}
127+
128+
impl Config {
129+
/// Set the time-to-live for cache entries
130+
///
131+
/// Entries not seen for at least this long will be evicted from the cache.
132+
#[must_use]
133+
pub fn ttl(mut self, ttl: Duration) -> Self {
134+
self.ttl = ttl;
135+
self
136+
}
137+
}
138+
139+
impl Default for Config {
140+
fn default() -> Self {
141+
Self {
142+
// Default to 1 hour TTL - long enough to avoid unnecessary reconciles
143+
// but short enough to prevent unbounded memory growth
144+
ttl: Duration::from_secs(3600),
145+
}
146+
}
147+
}
148+
149+
/// Cache entry storing predicate hash and last access time
150+
#[derive(Debug, Clone)]
151+
struct CacheEntry {
152+
hash: u64,
153+
last_seen: Instant,
154+
}
155+
117156
#[allow(clippy::pedantic)]
118157
#[pin_project]
119158
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
@@ -122,7 +161,8 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
122161
#[pin]
123162
stream: St,
124163
predicate: P,
125-
cache: HashMap<PredicateCacheKey, u64>,
164+
cache: HashMap<PredicateCacheKey, CacheEntry>,
165+
config: Config,
126166
// K: Resource necessary to get .meta() of an object during polling
127167
_phantom: PhantomData<K>,
128168
}
@@ -132,11 +172,12 @@ where
132172
K: Resource,
133173
P: Predicate<K>,
134174
{
135-
pub(super) fn new(stream: St, predicate: P) -> Self {
175+
pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
136176
Self {
137177
stream,
138178
predicate,
139179
cache: HashMap::new(),
180+
config,
140181
_phantom: PhantomData,
141182
}
142183
}
@@ -152,13 +193,29 @@ where
152193

153194
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154195
let mut me = self.project();
196+
197+
// Evict expired entries before processing new events
198+
let now = Instant::now();
199+
let ttl = me.config.ttl;
200+
me.cache
201+
.retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
202+
155203
Poll::Ready(loop {
156204
break match ready!(me.stream.as_mut().poll_next(cx)) {
157205
Some(Ok(obj)) => {
158206
if let Some(val) = me.predicate.hash_property(&obj) {
159207
let key = PredicateCacheKey::from(obj.meta());
160-
let changed = me.cache.get(&key) != Some(&val);
161-
me.cache.insert(key, val);
208+
let now = Instant::now();
209+
210+
// Check if the predicate value changed or entry doesn't exist
211+
let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
212+
213+
// Upsert the cache entry with new hash and timestamp
214+
me.cache.insert(key, CacheEntry {
215+
hash: val,
216+
last_seen: now,
217+
});
218+
162219
if changed {
163220
Some(Ok(obj))
164221
} else {
@@ -216,7 +273,7 @@ pub mod predicates {
216273
pub(crate) mod tests {
217274
use std::{pin::pin, task::Poll};
218275

219-
use super::{predicates, Error, PredicateFilter};
276+
use super::{predicates, Config, Error, PredicateFilter};
220277
use futures::{poll, stream, FutureExt, StreamExt};
221278
use kube_client::Resource;
222279
use serde_json::json;
@@ -248,7 +305,11 @@ pub(crate) mod tests {
248305
Ok(mkobj(1)),
249306
Ok(mkobj(2)),
250307
]);
251-
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
308+
let mut rx = pin!(PredicateFilter::new(
309+
data,
310+
predicates::generation,
311+
Config::default()
312+
));
252313

253314
// mkobj(1) passed through
254315
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
@@ -299,7 +360,11 @@ pub(crate) mod tests {
299360
Ok(mkobj(1, "uid-2")),
300361
Ok(mkobj(2, "uid-3")),
301362
]);
302-
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
363+
let mut rx = pin!(PredicateFilter::new(
364+
data,
365+
predicates::generation,
366+
Config::default()
367+
));
303368

304369
// mkobj(1, uid-1) passed through
305370
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
@@ -319,4 +384,60 @@ pub(crate) mod tests {
319384

320385
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
321386
}
387+
388+
#[tokio::test]
389+
async fn predicate_cache_ttl_evicts_expired_entries() {
390+
use futures::{channel::mpsc, SinkExt};
391+
use k8s_openapi::api::core::v1::Pod;
392+
use std::time::Duration;
393+
394+
let mkobj = |g: i32, uid: &str| {
395+
let p: Pod = serde_json::from_value(json!({
396+
"apiVersion": "v1",
397+
"kind": "Pod",
398+
"metadata": {
399+
"name": "blog",
400+
"namespace": "default",
401+
"generation": Some(g),
402+
"uid": uid,
403+
},
404+
"spec": {
405+
"containers": [{
406+
"name": "blog",
407+
"image": "clux/blog:0.1.0"
408+
}],
409+
}
410+
}))
411+
.unwrap();
412+
p
413+
};
414+
415+
// Use a very short TTL for testing
416+
let config = Config::default().ttl(Duration::from_millis(50));
417+
418+
// Use a channel so we can send items with delays
419+
let (mut tx, rx) = mpsc::unbounded();
420+
let mut filtered = pin!(PredicateFilter::new(
421+
rx.map(Ok::<_, Error>),
422+
predicates::generation,
423+
config
424+
));
425+
426+
// Send first object
427+
tx.send(mkobj(1, "uid-1")).await.unwrap();
428+
let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
429+
assert_eq!(first.meta().generation, Some(1));
430+
431+
// Send same object immediately - should be filtered
432+
tx.send(mkobj(1, "uid-1")).await.unwrap();
433+
assert!(matches!(poll!(filtered.next()), Poll::Pending));
434+
435+
// Wait for TTL to expire
436+
tokio::time::sleep(Duration::from_millis(100)).await;
437+
438+
// Send same object after TTL - should pass through due to eviction
439+
tx.send(mkobj(1, "uid-1")).await.unwrap();
440+
let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
441+
assert_eq!(second.meta().generation, Some(1));
442+
}
322443
}

kube-runtime/src/utils/watch_ext.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
utils::{
33
event_decode::EventDecode,
44
event_modify::EventModify,
5-
predicate::{Predicate, PredicateFilter},
5+
predicate::{Config as PredicateConfig, Predicate, PredicateFilter},
66
stream_backoff::StreamBackoff,
77
},
88
watcher,
@@ -99,6 +99,9 @@ pub trait WatchStreamExt: Stream {
9999
/// Common use case for this is to avoid repeat events for status updates
100100
/// by filtering on [`predicates::generation`](crate::predicates::generation).
101101
///
102+
/// The cache entries have a configurable time-to-live (TTL) to prevent unbounded
103+
/// memory growth. By default, entries expire after 1 hour.
104+
///
102105
/// ## Usage
103106
/// ```no_run
104107
/// # use std::pin::pin;
@@ -111,21 +114,21 @@ pub trait WatchStreamExt: Stream {
111114
/// let deploys: Api<Deployment> = Api::default_namespaced(client);
112115
/// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
113116
/// .applied_objects()
114-
/// .predicate_filter(predicates::generation));
117+
/// .predicate_filter(predicates::generation, Default::default()));
115118
///
116119
/// while let Some(d) = changed_deploys.try_next().await? {
117120
/// println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
118121
/// }
119122
/// # Ok(())
120123
/// # }
121124
/// ```
122-
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
125+
fn predicate_filter<K, P>(self, predicate: P, config: PredicateConfig) -> PredicateFilter<Self, K, P>
123126
where
124127
Self: Stream<Item = Result<K, watcher::Error>> + Sized,
125128
K: Resource + 'static,
126129
P: Predicate<K> + 'static,
127130
{
128-
PredicateFilter::new(self, predicate)
131+
PredicateFilter::new(self, predicate, config)
129132
}
130133

131134
/// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
@@ -301,7 +304,7 @@ pub(crate) mod tests {
301304
fn test_watcher_stream_type_drift() {
302305
let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
303306
.touched_objects()
304-
.predicate_filter(predicates::generation)
307+
.predicate_filter(predicates::generation, Default::default())
305308
.boxed();
306309
assert_stream(pred_watch);
307310
}

0 commit comments

Comments
 (0)