|
6 | 6 |
|
7 | 7 | import logging |
8 | 8 | import os |
9 | | -from typing import Any, Dict, List, Optional, Union |
| 9 | +from typing import Any, Dict, List, Optional, Set, Tuple, Union |
10 | 10 | from urllib import parse |
11 | 11 |
|
12 | 12 | from securesystemslib import util as sslib_util |
@@ -110,7 +110,10 @@ def get_one_valid_targetinfo( |
110 | 110 | RepositoryError: Metadata failed to verify in some way |
111 | 111 | TODO: download-related errors |
112 | 112 | """ |
113 | | - return self._preorder_depth_first_walk(target_path) |
| 113 | + targetinfo, dummy = self._preorder_depth_first_walk( |
| 114 | + target_path, set(), ("targets", "root"), self.config.max_delegations |
| 115 | + ) |
| 116 | + return targetinfo |
114 | 117 |
|
115 | 118 | @staticmethod |
116 | 119 | def updated_targets( |
@@ -307,77 +310,63 @@ def _load_targets(self, role: str, parent_role: str) -> None: |
307 | 310 | self._persist_metadata(role, data) |
308 | 311 |
|
309 | 312 | def _preorder_depth_first_walk( |
310 | | - self, target_filepath: str |
311 | | - ) -> Union[Dict[str, Any], None]: |
| 313 | + self, |
| 314 | + target_filepath: str, |
| 315 | + visited_role_names: Set[str], |
| 316 | + current_role_pair: List[Tuple[str, ...]], |
| 317 | + number_of_delegations: int, |
| 318 | + ) -> Tuple[Union[Dict[str, Any], None], bool]: |
312 | 319 | """ |
313 | 320 | Interrogates the tree of target delegations in order of appearance |
314 | 321 | (which implicitly order trustworthiness), and returns the matching |
315 | 322 | target found in the most trusted role. |
316 | 323 | """ |
317 | | - |
318 | | - role_names = [("targets", "root")] |
319 | | - visited_role_names = set() |
320 | | - number_of_delegations = self.config.max_delegations |
321 | | - |
| 324 | + targetinfo = None |
| 325 | + terminated = False |
322 | 326 | # Preorder depth-first traversal of the graph of target delegations. |
323 | | - while number_of_delegations > 0 and len(role_names) > 0: |
324 | | - |
325 | | - # Pop the role name from the top of the stack. |
326 | | - role_name, parent_role = role_names.pop(-1) |
327 | | - |
328 | | - # Skip any visited current role to prevent cycles. |
329 | | - if (role_name, parent_role) in visited_role_names: |
330 | | - logger.debug("Skipping visited current role %s", role_name) |
331 | | - continue |
332 | | - |
333 | | - # The metadata for 'role_name' must be downloaded/updated before |
334 | | - # its targets, delegations, and child roles can be inspected. |
335 | | - self._load_targets(role_name, parent_role) |
336 | | - |
337 | | - role_metadata: Targets = self._trusted_set[role_name].signed |
338 | | - target = role_metadata.targets.get(target_filepath) |
339 | | - |
340 | | - if target is not None: |
341 | | - logger.debug("Found target in current role %s", role_name) |
342 | | - return {"filepath": target_filepath, "fileinfo": target} |
343 | | - |
344 | | - # After preorder check, add current role to set of visited roles. |
345 | | - visited_role_names.add((role_name, parent_role)) |
346 | | - |
347 | | - # And also decrement number of visited roles. |
348 | | - number_of_delegations -= 1 |
349 | | - |
350 | | - if role_metadata.delegations is not None: |
351 | | - child_roles_to_visit = [] |
352 | | - # NOTE: This may be a slow operation if there are many |
353 | | - # delegated roles. |
354 | | - for child_role in role_metadata.delegations.roles: |
355 | | - if child_role.is_in_trusted_paths(target_filepath): |
356 | | - logger.debug("Adding child role %s", child_role.name) |
357 | | - |
358 | | - child_roles_to_visit.append( |
359 | | - (child_role.name, role_name) |
360 | | - ) |
361 | | - if child_role.terminating: |
362 | | - logger.debug("Not backtracking to other roles.") |
363 | | - role_names = [] |
364 | | - break |
365 | | - # Push 'child_roles_to_visit' in reverse order of appearance |
366 | | - # onto 'role_names'. Roles are popped from the end of |
367 | | - # the 'role_names' list. |
368 | | - child_roles_to_visit.reverse() |
369 | | - role_names.extend(child_roles_to_visit) |
370 | | - |
371 | | - if number_of_delegations == 0 and len(role_names) > 0: |
372 | | - logger.debug( |
373 | | - "%d roles left to visit, but allowed to " |
374 | | - "visit at most %d delegations.", |
375 | | - len(role_names), |
376 | | - self.config.max_delegations, |
377 | | - ) |
378 | | - |
379 | | - # If this point is reached then target is not found, return None |
380 | | - return None |
| 327 | + if number_of_delegations <= 0: |
| 328 | + return targetinfo, terminated |
| 329 | + |
| 330 | + # Pop the role name from the top of the stack. |
| 331 | + role_name, parent_role = current_role_pair |
| 332 | + |
| 333 | + # The metadata for 'role_name' must be downloaded/updated before |
| 334 | + # its targets, delegations, and child roles can be inspected. |
| 335 | + self._load_targets(role_name, parent_role) |
| 336 | + role_metadata: Targets = self._trusted_set[role_name].signed |
| 337 | + target = role_metadata.targets.get(target_filepath) |
| 338 | + |
| 339 | + if target is not None: |
| 340 | + logger.debug("Found target in current role %s", role_name) |
| 341 | + targetinfo = {"filepath": target_filepath, "fileinfo": target} |
| 342 | + return targetinfo, terminated |
| 343 | + |
| 344 | + # After preorder check, add current role to set of visited roles. |
| 345 | + visited_role_names.add((role_name, parent_role)) |
| 346 | + |
| 347 | + # And also decrement number of visited roles. |
| 348 | + number_of_delegations -= 1 |
| 349 | + if role_metadata.delegations is not None: |
| 350 | + for child_role in role_metadata.delegations.roles: |
| 351 | + # Skip any visited current role to prevent cycles. |
| 352 | + if (child_role.name, parent_role) in visited_role_names: |
| 353 | + continue |
| 354 | + |
| 355 | + if child_role.is_in_trusted_paths(target_filepath): |
| 356 | + |
| 357 | + targetinfo, terminated = self._preorder_depth_first_walk( |
| 358 | + target_filepath, |
| 359 | + visited_role_names, |
| 360 | + (child_role.name, role_name), |
| 361 | + number_of_delegations, |
| 362 | + ) |
| 363 | + |
| 364 | + if child_role.terminating or terminated: |
| 365 | + terminated = True |
| 366 | + logger.debug("Not backtracking to other roles.") |
| 367 | + break |
| 368 | + |
| 369 | + return targetinfo, terminated |
381 | 370 |
|
382 | 371 |
|
383 | 372 | def _ensure_trailing_slash(url: str): |
|
0 commit comments