Skip to content

Commit 307e517

Browse files
committed
req_neighbours
1 parent dea2f74 commit 307e517

File tree

6 files changed

+155
-6
lines changed

6 files changed

+155
-6
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "meshcore"
7-
version = "2.1.23"
7+
version = "2.1.24"
88
authors = [
99
{ name="Florent de Lamotte", email="[email protected]" },
1010
{ name="Alex Wolden", email="[email protected]" },

src/meshcore/commands/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ async def send(
163163
return Event(EventType.OK, {})
164164

165165
# attached at base because its a common method
166-
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None, min_timeout=0) -> Event:
166+
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, context={}, timeout=None, min_timeout=0) -> Event:
167167
dst_bytes = _validate_destination(dst, prefix_length=32)
168168
pubkey_prefix = _validate_destination(dst, prefix_length=6)
169169
logger.debug(f"Binary request to {dst_bytes.hex()}")
@@ -180,6 +180,6 @@ async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqTyp
180180
# Use provided timeout or fallback to suggested timeout (with 5s default)
181181
actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0
182182
actual_timeout = min_timeout if actual_timeout < min_timeout else actual_timeout
183-
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout)
183+
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout, context=context)
184184

185185
return result

src/meshcore/commands/binary.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import logging
3+
import random
24

35
from .base import CommandHandlerBase
46
from ..events import EventType
@@ -131,3 +133,112 @@ async def req_acl_sync(self, contact, timeout=0, min_timeout=0):
131133
)
132134

133135
return acl_event.payload["acl_data"] if acl_event else None
136+
137+
async def req_neighbours_async(self,
138+
contact,
139+
count=255,
140+
offset=0,
141+
order_by=0,
142+
pubkey_prefix_length=4,
143+
timeout=0,
144+
min_timeout=0
145+
):
146+
req = (b"\x00" # version : 0
147+
+ count.to_bytes(1, "little", signed=False)
148+
+ offset.to_bytes(2, "little", signed=False)
149+
+ order_by.to_bytes(1, "little", signed=False)
150+
+ pubkey_prefix_length.to_bytes(1, "little", signed=False)
151+
+ random.randint(1, 0xFFFFFFFF).to_bytes(4, "little", signed=False)
152+
)
153+
154+
logger.debug(f"Sending binary neighbours req, count: {count}, offset: {offset} {req.hex()}")
155+
156+
return await self.send_binary_req (
157+
contact,
158+
BinaryReqType.NEIGHBOURS,
159+
data=req,
160+
timeout=timeout,
161+
context={"pubkey_prefix_length": pubkey_prefix_length}
162+
)
163+
164+
async def req_neighbours_sync(self,
165+
contact,
166+
count=255,
167+
offset=0,
168+
order_by=0,
169+
pubkey_prefix_length=4,
170+
timeout=0,
171+
min_timeout=0
172+
):
173+
174+
res = await self.req_neighbours_async(contact,
175+
count=count,
176+
offset=offset,
177+
order_by=order_by,
178+
pubkey_prefix_length=pubkey_prefix_length,
179+
timeout=timeout,
180+
min_timeout=min_timeout)
181+
182+
if res is None or res.type == EventType.ERROR:
183+
return None
184+
185+
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
186+
timeout = timeout if min_timeout < timeout else min_timeout
187+
188+
if self.dispatcher is None:
189+
return None
190+
191+
# Listen for NEIGHBOUR_RESPONSE
192+
neighbours_event = await self.dispatcher.wait_for_event(
193+
EventType.NEIGHBOURS_RESPONSE,
194+
attribute_filters={"tag": res.payload["expected_ack"].hex()},
195+
timeout=timeout,
196+
)
197+
198+
return neighbours_event.payload if neighbours_event else None
199+
200+
# do several queries if not all neighbours have been obtained
201+
async def fetch_all_neighbours(self,
202+
contact,
203+
order_by=0,
204+
pubkey_prefix_length=4,
205+
timeout=0,
206+
min_timeout=0
207+
):
208+
209+
# Initial request
210+
res = await self.req_neighbours_sync(contact,
211+
count=255,
212+
offset=0,
213+
order_by=order_by,
214+
pubkey_prefix_length=pubkey_prefix_length,
215+
timeout=timeout,
216+
min_timeout=min_timeout)
217+
218+
if res is None:
219+
return None
220+
221+
neighbours_count = res["neighbours_count"] # total neighbours
222+
results_count = res["results_count"] # obtained neighbours
223+
224+
del res["tag"]
225+
226+
while results_count < neighbours_count:
227+
#await asyncio.sleep(2) # wait 2s before next fetch
228+
next_res = await self.req_neighbours_sync(contact,
229+
count=255,
230+
offset=results_count,
231+
order_by=order_by,
232+
pubkey_prefix_length=pubkey_prefix_length,
233+
timeout=timeout,
234+
min_timeout=min_timeout+5) # requests are close, so let's have some more timeout
235+
236+
if next_res is None :
237+
return res # caller should check it has everything
238+
239+
results_count = results_count + next_res["results_count"]
240+
241+
res["results_count"] = results_count
242+
res["neighbours"] += next_res["neighbours"]
243+
244+
return res

src/meshcore/events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class EventType(Enum):
4646
DISABLED = "disabled"
4747
CONTROL_DATA = "control_data"
4848
DISCOVER_RESPONSE = "discover_response"
49+
NEIGHBOURS_RESPONSE = "neighbours_response"
4950

5051
# Command response types
5152
OK = "command_ok"

src/meshcore/packets.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class BinaryReqType(Enum):
66
TELEMETRY = 0x03
77
MMA = 0x04
88
ACL = 0x05
9+
NEIGHBOURS = 0x06
910

1011
class ControlType(Enum):
1112
NODE_DISCOVER_REQ = 0x80

src/meshcore/reader.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self, dispatcher: EventDispatcher):
2323
# Track pending binary requests by tag for proper response parsing
2424
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
2525

26-
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float):
26+
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}):
2727
"""Register a pending binary request for proper response parsing"""
2828
# Clean up expired requests before adding new one
2929
self.cleanup_expired_requests()
@@ -32,7 +32,8 @@ def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReq
3232
self.pending_binary_requests[tag] = {
3333
"request_type": request_type,
3434
"pubkey_prefix": prefix,
35-
"expires_at": expires_at
35+
"expires_at": expires_at,
36+
"context": context # optional info we want to keep from req to resp
3637
}
3738
logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s")
3839

@@ -519,6 +520,7 @@ async def handle_rx(self, data: bytearray):
519520
if tag in self.pending_binary_requests:
520521
request_type = self.pending_binary_requests[tag]["request_type"]
521522
pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"]
523+
context = self.pending_binary_requests[tag]["context"]
522524
del self.pending_binary_requests[tag]
523525
logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}")
524526

@@ -558,6 +560,40 @@ async def handle_rx(self, data: bytearray):
558560
)
559561
except Exception as e:
560562
logger.error(f"Error parsing binary ACL response: {e}")
563+
564+
elif request_type == BinaryReqType.NEIGHBOURS:
565+
try:
566+
pk_plen = context["pubkey_prefix_length"]
567+
bbuf = io.BytesIO(response_data)
568+
569+
res = {
570+
"pubkey_prefix": pubkey_prefix,
571+
"tag": tag
572+
}
573+
res.update(context) # add context in result
574+
575+
res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True)
576+
results_count = int.from_bytes(bbuf.read(2), "little", signed=True)
577+
res["results_count"] = results_count
578+
579+
neighbours_list = []
580+
581+
for _ in range (results_count):
582+
neighb = {}
583+
neighb["pubkey"] = bbuf.read(pk_plen).hex()
584+
neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True)
585+
neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4
586+
neighbours_list.append(neighb)
587+
588+
res["neighbours"] = neighbours_list
589+
590+
await self.dispatcher.dispatch(
591+
Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix})
592+
)
593+
594+
except Exception as e:
595+
logger.error(f"Error parsing binary NEIGHBOURS response: {e}")
596+
561597
else:
562598
logger.debug(f"No tracked request found for binary response tag {tag}")
563599

@@ -623,7 +659,7 @@ async def handle_rx(self, data: bytearray):
623659
if len(pubkey) < 32:
624660
pubkey = pubkey[0:8]
625661
else:
626-
pubkey = pubkey[0:32]
662+
pubkey = pubkey[0:32]
627663

628664
ndr["pubkey"] = pubkey.hex()
629665

0 commit comments

Comments
 (0)