Skip to content

Commit 522f385

Browse files
committed
feat: debounce notifications
1 parent f4ddb2b commit 522f385

File tree

5 files changed

+171
-0
lines changed

5 files changed

+171
-0
lines changed

src/debouncer.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//! # Debouncer for notifications.
2+
//!
3+
//! Sometimes the client application may be reinstalled
4+
//! while keeping the notification token.
5+
//! In this case the same token is stored twice
6+
//! for the same mailbox on a chatmail relay
7+
//! and is notified twice for the same message.
8+
//! Since it is not possible for the chatmail relay
9+
//! to deduplicate the tokens in this case
10+
//! as only the notification gateway
11+
//! can decrypt them, notification gateway needs
12+
//! to debounce notifications to the same token.
13+
14+
use std::cmp::Reverse;
15+
use std::collections::{BinaryHeap, HashSet};
16+
use std::sync::Mutex;
17+
use std::time::{Duration, Instant};
18+
19+
#[derive(Default)]
20+
pub(crate) struct Debouncer {
21+
state: Mutex<DebouncerState>,
22+
}
23+
24+
#[derive(Default)]
25+
struct DebouncerState {
26+
/// Set of recently notified tokens.
27+
///
28+
/// The tokens are stored in plaintext,
29+
/// not hashed or encrypted.
30+
/// No token is stored for a long time anyway.
31+
tokens: HashSet<String>,
32+
33+
/// Binary heap storing tokens
34+
/// sorted by the timestamp of the recent notifications.
35+
///
36+
/// `Reverse` is used to turn max-heap into min-heap.
37+
heap: BinaryHeap<Reverse<(Instant, String)>>,
38+
}
39+
40+
impl DebouncerState {
41+
/// Removes old entries for tokens that can be notified again.
42+
fn cleanup(&mut self, now: Instant) {
43+
loop {
44+
let Some(Reverse((timestamp, token))) = self.heap.pop() else {
45+
break;
46+
};
47+
48+
if now.duration_since(timestamp) < Duration::from_secs(1) {
49+
self.heap.push(Reverse((timestamp, token)));
50+
break;
51+
}
52+
53+
self.tokens.remove(&token);
54+
}
55+
}
56+
57+
fn is_debounced(&mut self, now: Instant, token: &String) -> bool {
58+
self.cleanup(now);
59+
self.tokens.contains(token)
60+
}
61+
62+
fn notify(&mut self, now: Instant, token: String) {
63+
if self.tokens.insert(token.clone()) {
64+
self.heap.push(Reverse((now, token)));
65+
}
66+
}
67+
68+
fn count(&self) -> usize {
69+
let res = self.tokens.len();
70+
debug_assert_eq!(res, self.heap.len());
71+
res
72+
}
73+
}
74+
75+
impl Debouncer {
76+
/// Returns true if the token was notified recently
77+
/// and should not be notified again.
78+
pub(crate) fn is_debounced(&self, now: Instant, token: &String) -> bool {
79+
let mut state = self.state.lock().unwrap();
80+
state.is_debounced(now, token)
81+
}
82+
83+
pub(crate) fn notify(&self, now: Instant, token: String) {
84+
self.state.lock().unwrap().notify(now, token);
85+
}
86+
87+
/// Returns number of currently debounced notification tokens.
88+
///
89+
/// This is used for metrics to display the size of the set.
90+
///
91+
/// This function does not remove expired tokens.
92+
pub(crate) fn count(&self) -> usize {
93+
self.state.lock().unwrap().count()
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod tests {
99+
use super::*;
100+
101+
#[test]
102+
fn test_debouncer() {
103+
let mut now = Instant::now();
104+
105+
let debouncer = Debouncer::default();
106+
107+
let token1 = "foobar".to_string();
108+
let token2 = "barbaz".to_string();
109+
110+
assert!(!debouncer.is_debounced(now, &token1));
111+
assert!(!debouncer.is_debounced(now, &token2));
112+
assert_eq!(debouncer.count(), 0);
113+
114+
debouncer.notify(now, token1.clone());
115+
116+
assert!(debouncer.is_debounced(now, &token1));
117+
assert!(!debouncer.is_debounced(now, &token2));
118+
assert_eq!(debouncer.count(), 1);
119+
120+
now += Duration::from_secs(5);
121+
122+
assert!(!debouncer.is_debounced(now, &token1));
123+
assert!(!debouncer.is_debounced(now, &token2));
124+
assert_eq!(debouncer.count(), 0);
125+
}
126+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod debouncer;
12
pub mod metrics;
23
pub mod notifier;
34
mod openpgp;

src/metrics.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ pub struct Metrics {
3030
/// Number of successfully sent visible UBports notifications.
3131
pub ubports_notifications_total: Counter,
3232

33+
/// Number of debounced notifications.
34+
pub debounced_notifications_total: Counter,
35+
36+
/// Number of tokens notified recently.
37+
pub debounced_set_size: Gauge<i64, AtomicI64>,
38+
3339
/// Number of successfully sent heartbeat notifications.
3440
pub heartbeat_notifications_total: Counter,
3541

@@ -68,6 +74,20 @@ impl Metrics {
6874
ubports_notifications_total.clone(),
6975
);
7076

77+
let debounced_notifications_total = Counter::default();
78+
registry.register(
79+
"debounced_notifications",
80+
"Number of debounced notifications",
81+
debounced_notifications_total.clone(),
82+
);
83+
84+
let debounced_set_size = Gauge::<i64, AtomicI64>::default();
85+
registry.register(
86+
"debounced_set_size",
87+
"Number of tokens notified recently.",
88+
debounced_set_size.clone(),
89+
);
90+
7191
let heartbeat_notifications_total = Counter::default();
7292
registry.register(
7393
"heartbeat_notifications",
@@ -101,6 +121,8 @@ impl Metrics {
101121
direct_notifications_total,
102122
fcm_notifications_total,
103123
ubports_notifications_total,
124+
debounced_notifications_total,
125+
debounced_set_size,
104126
heartbeat_notifications_total,
105127
heartbeat_registrations_total,
106128
heartbeat_tokens,

src/server.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use chrono::{Local, TimeDelta};
1010
use log::*;
1111
use serde::Deserialize;
1212
use std::str::FromStr;
13+
use std::time::Instant;
1314

1415
use crate::metrics::Metrics;
1516
use crate::state::State;
@@ -293,6 +294,19 @@ async fn notify_device(
293294
}
294295

295296
info!("Got direct notification for {device_token}.");
297+
let now = Instant::now();
298+
if state.debouncer().is_debounced(now, &device_token) {
299+
let metrics = state.metrics();
300+
metrics.debounced_notifications_total.inc();
301+
metrics
302+
.debounced_set_size
303+
.set(state.debouncer().count() as i64);
304+
return Ok(StatusCode::OK);
305+
}
306+
state.debouncer().notify(now, device_token.clone());
307+
state.metrics()
308+
.debounced_set_size
309+
.set(state.debouncer().count() as i64);
296310
let device_token: NotificationToken = device_token.as_str().parse()?;
297311

298312
let status_code = match device_token {

src/state.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66
use a2::{Client, Endpoint};
77
use anyhow::{Context as _, Result};
88

9+
use crate::debouncer::Debouncer;
910
use crate::metrics::Metrics;
1011
use crate::openpgp::PgpDecryptor;
1112
use crate::schedule::Schedule;
@@ -36,6 +37,8 @@ pub struct InnerState {
3637
/// Decryptor for incoming tokens
3738
/// storing the secret keyring inside.
3839
openpgp_decryptor: PgpDecryptor,
40+
41+
debouncer: Debouncer,
3942
}
4043

4144
impl State {
@@ -88,6 +91,7 @@ impl State {
8891
interval,
8992
fcm_authenticator,
9093
openpgp_decryptor,
94+
debouncer: Default::default(),
9195
}),
9296
})
9397
}
@@ -134,4 +138,8 @@ impl State {
134138
pub fn openpgp_decryptor(&self) -> &PgpDecryptor {
135139
&self.inner.openpgp_decryptor
136140
}
141+
142+
pub(crate) fn debouncer(&self) -> &Debouncer {
143+
&self.inner.debouncer
144+
}
137145
}

0 commit comments

Comments
 (0)