Skip to content

Commit c256ac6

Browse files
committed
fix sending partial messages to peers not supporting them
1 parent 2220ae4 commit c256ac6

File tree

3 files changed

+135
-167
lines changed

3 files changed

+135
-167
lines changed

protocols/gossipsub/src/behaviour.rs

Lines changed: 127 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,13 @@ where
658658

659659
let topic_hash = raw_message.topic.clone();
660660

661-
let recipient_peers = self.get_publish_peers(&topic_hash, true);
661+
#[cfg(feature = "partial_messages")]
662+
let recipient_peers = self.get_publish_peers(&topic_hash, |_, peer| {
663+
!peer.partial_only_topics.contains(&topic_hash)
664+
});
665+
666+
#[cfg(not(feature = "partial_messages"))]
667+
let recipient_peers = self.get_publish_peers(&topic_hash, |_, _| true);
662668

663669
// If the message isn't a duplicate and we have sent it to some peers add it to the
664670
// duplicate cache and memcache.
@@ -715,129 +721,101 @@ where
715721
Ok(msg_id)
716722
}
717723

718-
// Get Peers from the mesh or fanout to publish a message to.
719-
// If `exclude_partial_only` set, filter out peers who only want partial messages for the topic.
724+
// Get Peers from the mesh or fanout to publish a message to
725+
// filtered out further by the provided `f` callback.
720726
fn get_publish_peers(
721727
&mut self,
722728
topic_hash: &TopicHash,
723-
exclude_partial_only: bool,
729+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
724730
) -> HashSet<PeerId> {
725-
let mesh_n = self.config.mesh_n_for_topic(topic_hash);
726-
727731
let peers_on_topic = self
728732
.connected_peers
729733
.iter()
730-
.filter(|(_, peer)| {
731-
#[cfg(feature = "partial_messages")]
732-
{
733-
if exclude_partial_only && peer.partial_only_topics.contains(topic_hash) {
734-
return false;
735-
}
736-
}
737-
let _ = peer;
738-
true
739-
})
740-
.map(|(peer_id, _)| peer_id)
741-
.peekable();
742-
743-
let mut recipient_peers = HashSet::new();
744-
if self.config.flood_publish() {
745-
// Forward to all peers above score and all explicit peers
746-
recipient_peers.extend(peers_on_topic.filter(|p| {
747-
self.explicit_peers.contains(*p)
734+
.filter(|(_, peer)| peer.topics.contains(topic_hash))
735+
.filter(|(peer_id, _)| {
736+
self.explicit_peers.contains(*peer_id)
748737
|| !self
749738
.peer_score
750-
.below_threshold(p, |ts| ts.publish_threshold)
739+
.below_threshold(peer_id, |ts| ts.publish_threshold)
751740
.0
752-
}));
753-
} else {
754-
match self.mesh.get(topic_hash) {
755-
// Mesh peers
756-
Some(mesh_peers) => {
757-
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
758-
// peers (if possible).
759-
let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
760-
761-
if needed_extra_peers > 0 {
762-
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
763-
// and publish to them.
764-
765-
// Get a random set of peers that are appropriate to send messages too.
766-
let peer_list = get_random_peers(
767-
&self.connected_peers,
768-
topic_hash,
769-
needed_extra_peers,
770-
exclude_partial_only,
771-
|peer| {
772-
!mesh_peers.contains(peer)
773-
&& !self.explicit_peers.contains(peer)
774-
&& !self
775-
.peer_score
776-
.below_threshold(peer, |ts| ts.publish_threshold)
777-
.0
778-
},
779-
);
780-
recipient_peers.extend(peer_list);
781-
}
741+
});
782742

783-
recipient_peers.extend(mesh_peers);
784-
}
785-
// Gossipsub peers
786-
None => {
787-
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
788-
// `fanout_peers` is always non-empty if it's `Some`.
789-
let fanout_peers = self
790-
.fanout
791-
.get(topic_hash)
792-
.filter(|peers| !peers.is_empty());
793-
// If we have fanout peers add them to the map.
794-
if let Some(peers) = fanout_peers {
795-
for peer in peers {
796-
recipient_peers.insert(*peer);
797-
}
798-
} else {
799-
// We have no fanout peers, select mesh_n of them and add them to the fanout
800-
let new_peers = get_random_peers(
801-
&self.connected_peers,
802-
topic_hash,
803-
mesh_n,
804-
exclude_partial_only,
805-
|p| {
806-
!self.explicit_peers.contains(p)
807-
&& !self
808-
.peer_score
809-
.below_threshold(p, |ts| ts.publish_threshold)
810-
.0
811-
},
812-
);
813-
// Add the new peers to the fanout and recipient peers
814-
self.fanout.insert(topic_hash.clone(), new_peers.clone());
815-
for peer in new_peers {
816-
tracing::debug!(%peer, "Peer added to fanout");
817-
recipient_peers.insert(peer);
818-
}
819-
}
820-
// We are publishing to fanout peers - update the time we published
821-
self.fanout_last_pub
822-
.insert(topic_hash.clone(), Instant::now());
743+
// Forward to all peers above score and all explicit peers
744+
if self.config.flood_publish() {
745+
return peers_on_topic
746+
.filter(|(peer_id, peer_details)| f(peer_id, peer_details))
747+
.map(|(peer_id, _)| *peer_id)
748+
.collect();
749+
}
750+
751+
let mesh_n = self.config.mesh_n_for_topic(topic_hash);
752+
let mut recipient_peers = HashSet::new();
753+
// Explicit peers that are part of the topic and Floodsub peers.
754+
recipient_peers.extend(
755+
peers_on_topic
756+
.clone()
757+
.filter(|(peer_id, peer)| {
758+
self.explicit_peers.contains(peer_id) || peer.kind == PeerKind::Floodsub
759+
})
760+
.map(|(peer_id, _)| *peer_id),
761+
);
762+
763+
match self.mesh.get(topic_hash) {
764+
// Mesh peers
765+
Some(mesh_peers) => {
766+
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
767+
// peers (if possible).
768+
let mesh_peers = peers_on_topic
769+
.clone()
770+
.filter_map(|(peer_id, _)| mesh_peers.get(peer_id))
771+
.copied()
772+
.collect::<Vec<PeerId>>();
773+
774+
let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
775+
if needed_extra_peers > 0 {
776+
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
777+
// and publish to them.
778+
779+
// Get a random set of peers that are appropriate to send messages too.
780+
let peer_list =
781+
get_random_peers(peers_on_topic, topic_hash, needed_extra_peers, |_, _| {
782+
true
783+
});
784+
recipient_peers.extend(peer_list);
823785
}
824-
}
825786

826-
// Explicit peers that are part of the topic
827-
recipient_peers
828-
.extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
787+
recipient_peers.extend(mesh_peers);
788+
}
789+
// Gossipsub peers
790+
None => {
791+
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
792+
let fanout_peers = peers_on_topic
793+
.clone()
794+
.filter_map(|(peer_id, _)| {
795+
self.fanout
796+
.get(topic_hash)
797+
.and_then(|fanout| fanout.get(peer_id))
798+
})
799+
.copied()
800+
.collect::<Vec<PeerId>>();
829801

830-
// Floodsub peers
831-
for (peer, connections) in &self.connected_peers {
832-
if connections.kind == PeerKind::Floodsub
833-
&& connections.topics.contains(topic_hash)
834-
&& !self
835-
.peer_score
836-
.below_threshold(peer, |ts| ts.publish_threshold)
837-
.0
838-
{
839-
recipient_peers.insert(*peer);
802+
// If we have fanout peers add them to the map.
803+
if !fanout_peers.is_empty() {
804+
recipient_peers.extend(fanout_peers);
805+
} else {
806+
// We have no fanout peers, select mesh_n of them and add them to the fanout
807+
let new_peers =
808+
get_random_peers(peers_on_topic, topic_hash, mesh_n, |_, _| true);
809+
// Add the new peers to the fanout and recipient peers
810+
self.fanout.insert(topic_hash.clone(), new_peers.clone());
811+
for peer in new_peers {
812+
tracing::debug!(%peer, "Peer added to fanout");
813+
recipient_peers.insert(peer);
814+
}
840815
}
816+
// We are publishing to fanout peers - update the time we published
817+
self.fanout_last_pub
818+
.insert(topic_hash.clone(), Instant::now());
841819
}
842820
}
843821

@@ -854,7 +832,9 @@ where
854832

855833
let group_id = partial_message.group_id().as_ref().to_vec();
856834

857-
let recipient_peers = self.get_publish_peers(&topic_hash, false);
835+
let recipient_peers = self.get_publish_peers(&topic_hash, |_, peer| {
836+
peer.partial_only_topics.contains(&topic_hash)
837+
});
858838
let metadata = partial_message.parts_metadata().as_ref().to_vec();
859839
for peer_id in recipient_peers.iter() {
860840
// TODO: this can be optimized, we are going to get the peer again on `send_message`
@@ -1151,12 +1131,11 @@ where
11511131
&self.connected_peers,
11521132
topic_hash,
11531133
mesh_n - added_peers.len(),
1154-
true,
1155-
|peer| {
1156-
!added_peers.contains(peer)
1157-
&& !self.explicit_peers.contains(peer)
1158-
&& !self.peer_score.below_threshold(peer, |_| 0.0).0
1159-
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1134+
|peer_id, _| {
1135+
!added_peers.contains(peer_id)
1136+
&& !self.explicit_peers.contains(peer_id)
1137+
&& !self.peer_score.below_threshold(peer_id, |_| 0.0).0
1138+
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer_id)
11601139
},
11611140
);
11621141

@@ -1246,8 +1225,9 @@ where
12461225
&self.connected_peers,
12471226
topic_hash,
12481227
self.config.prune_peers(),
1249-
true,
1250-
|p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1228+
|peer_id, _| {
1229+
peer_id != peer && !self.peer_score.below_threshold(peer_id, |_| 0.0).0
1230+
},
12511231
)
12521232
.into_iter()
12531233
.map(|p| PeerInfo { peer_id: Some(p) })
@@ -2419,12 +2399,11 @@ where
24192399
&self.connected_peers,
24202400
topic_hash,
24212401
desired_peers,
2422-
true,
2423-
|peer| {
2424-
!peers.contains(peer)
2425-
&& !explicit_peers.contains(peer)
2426-
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
2427-
&& scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2402+
|peer_id, _| {
2403+
!peers.contains(peer_id)
2404+
&& !explicit_peers.contains(peer_id)
2405+
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2406+
&& scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
24282407
},
24292408
);
24302409
for peer in &peer_list {
@@ -2527,8 +2506,7 @@ where
25272506
&self.connected_peers,
25282507
topic_hash,
25292508
needed,
2530-
false,
2531-
|peer_id| {
2509+
|peer_id, _| {
25322510
!peers.contains(peer_id)
25332511
&& !explicit_peers.contains(peer_id)
25342512
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
@@ -2604,8 +2582,7 @@ where
26042582
&self.connected_peers,
26052583
topic_hash,
26062584
self.config.opportunistic_graft_peers(),
2607-
false,
2608-
|peer_id| {
2585+
|peer_id, _| {
26092586
!peers.contains(peer_id)
26102587
&& !explicit_peers.contains(peer_id)
26112588
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
@@ -2701,8 +2678,7 @@ where
27012678
&self.connected_peers,
27022679
topic_hash,
27032680
needed_peers,
2704-
false,
2705-
|peer_id| {
2681+
|peer_id, _| {
27062682
!peers.contains(peer_id)
27072683
&& !explicit_peers.contains(peer_id)
27082684
&& !self
@@ -2816,15 +2792,19 @@ where
28162792
)
28172793
};
28182794
// get gossip_lazy random peers
2819-
let to_msg_peers =
2820-
get_random_peers_dynamic(&self.connected_peers, topic_hash, false, n_map, |peer| {
2821-
!peers.contains(peer)
2822-
&& !self.explicit_peers.contains(peer)
2795+
let to_msg_peers = get_random_peers_dynamic(
2796+
self.connected_peers.iter(),
2797+
topic_hash,
2798+
n_map,
2799+
|peer_id, _| {
2800+
!peers.contains(peer_id)
2801+
&& !self.explicit_peers.contains(peer_id)
28232802
&& !self
28242803
.peer_score
2825-
.below_threshold(peer, |ts| ts.gossip_threshold)
2804+
.below_threshold(peer_id, |ts| ts.gossip_threshold)
28262805
.0
2827-
});
2806+
},
2807+
);
28282808

28292809
tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
28302810

@@ -3787,28 +3767,17 @@ fn peer_removed_from_mesh(
37873767
/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
37883768
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
37893769
/// that gets as input the number of filtered peers.
3790-
#[allow(unused, reason = "partial is used with partial_messages feature")]
3791-
fn get_random_peers_dynamic(
3792-
connected_peers: &HashMap<PeerId, PeerDetails>,
3770+
fn get_random_peers_dynamic<'a>(
3771+
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
37933772
topic_hash: &TopicHash,
3794-
// If we want to exclude partial only peers.
3795-
exclude_partial: bool,
37963773
// maps the number of total peers to the number of selected peers
37973774
n_map: impl Fn(usize) -> usize,
3798-
mut f: impl FnMut(&PeerId) -> bool,
3775+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
37993776
) -> BTreeSet<PeerId> {
3800-
let mut gossip_peers = connected_peers
3801-
.iter()
3802-
.filter_map(|(peer_id, peer)| {
3803-
#[cfg(feature = "partial_messages")]
3804-
{
3805-
if exclude_partial && peer.partial_only_topics.contains(topic_hash) {
3806-
return None;
3807-
}
3808-
}
3809-
Some((peer_id, peer))
3810-
})
3811-
.filter(|(peer_id, _)| f(peer_id))
3777+
let mut gossip_peers = peers
3778+
.into_iter()
3779+
.filter(|(_, p)| p.topics.contains(topic_hash))
3780+
.filter(|(peer_id, peer_details)| f(peer_id, peer_details))
38123781
.filter(|(_, p)| p.kind.is_gossipsub())
38133782
.map(|(peer_id, _)| *peer_id)
38143783
.collect::<Vec<PeerId>>();
@@ -3831,15 +3800,13 @@ fn get_random_peers_dynamic(
38313800

38323801
/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
38333802
/// filtered by the function `f`.
3834-
#[allow(unused, reason = "partial is used with partial_messages feature")]
3835-
fn get_random_peers(
3836-
connected_peers: &HashMap<PeerId, PeerDetails>,
3803+
fn get_random_peers<'a>(
3804+
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
38373805
topic_hash: &TopicHash,
38383806
n: usize,
3839-
exclude_partial: bool,
3840-
f: impl FnMut(&PeerId) -> bool,
3807+
f: impl Fn(&PeerId, &PeerDetails) -> bool,
38413808
) -> BTreeSet<PeerId> {
3842-
get_random_peers_dynamic(connected_peers, topic_hash, exclude_partial, |_| n, f)
3809+
get_random_peers_dynamic(peers, topic_hash, |_| n, f)
38433810
}
38443811

38453812
/// Validates the combination of signing, privacy and message validation to ensure the

0 commit comments

Comments
 (0)