|
71 | 71 | from typing import Dict, Iterator, Optional |
72 | 72 |
|
73 | 73 | from tuf import exceptions |
74 | | -from tuf.api.metadata import Metadata, Root, Targets |
| 74 | +from tuf.api.metadata import Metadata |
75 | 75 | from tuf.api.serialization import DeserializationError |
76 | 76 |
|
77 | 77 | logger = logging.getLogger(__name__) |
78 | 78 |
|
79 | | -# This is a placeholder until ... |
80 | | -# TODO issue 1306: implement this in Metadata API |
81 | | -def verify_with_threshold( |
82 | | - delegator: Metadata, role_name: str, unverified: Metadata |
83 | | -) -> bool: |
84 | | - """Verify 'unverified' with keys and threshold defined in delegator""" |
85 | | - role = None |
86 | | - keys = {} |
87 | | - if isinstance(delegator.signed, Root): |
88 | | - keys = delegator.signed.keys |
89 | | - role = delegator.signed.roles.get(role_name) |
90 | | - elif isinstance(delegator.signed, Targets): |
91 | | - if delegator.signed.delegations: |
92 | | - keys = delegator.signed.delegations.keys |
93 | | - # role names are unique: first match is enough |
94 | | - roles = delegator.signed.delegations.roles |
95 | | - role = next((r for r in roles if r.name == role_name), None) |
96 | | - else: |
97 | | - raise ValueError("Call is valid only on delegator metadata") |
98 | | - |
99 | | - if role is None: |
100 | | - raise ValueError(f"Delegated role {role_name} not found") |
101 | | - |
102 | | - # verify that delegate is signed by correct threshold of unique keys |
103 | | - unique_keys = set() |
104 | | - for keyid in role.keyids: |
105 | | - key = keys[keyid] |
106 | | - try: |
107 | | - key.verify_signature(unverified) |
108 | | - unique_keys.add(key.keyval["public"]) |
109 | | - except Exception as e: # pylint: disable=broad-except |
110 | | - # TODO specify the Exceptions (see issue #1351) |
111 | | - logger.info("verify failed: %s", e) |
112 | | - |
113 | | - return len(unique_keys) >= role.threshold |
114 | | - |
115 | 79 |
|
116 | 80 | class TrustedMetadataSet(abc.Mapping): |
117 | 81 | """Internal class to keep track of trusted metadata in Updater |
@@ -207,20 +171,14 @@ def update_root(self, data: bytes): |
207 | 171 |
|
208 | 172 | if self.root is not None: |
209 | 173 | # We are not loading initial trusted root: verify the new one |
210 | | - if not verify_with_threshold(self.root, "root", new_root): |
211 | | - raise exceptions.UnsignedMetadataError( |
212 | | - "New root is not signed by root", new_root.signed |
213 | | - ) |
| 174 | + self.root.verify_delegate("root", new_root) |
214 | 175 |
|
215 | 176 | if new_root.signed.version != self.root.signed.version + 1: |
216 | 177 | raise exceptions.ReplayedMetadataError( |
217 | 178 | "root", new_root.signed.version, self.root.signed.version |
218 | 179 | ) |
219 | 180 |
|
220 | | - if not verify_with_threshold(new_root, "root", new_root): |
221 | | - raise exceptions.UnsignedMetadataError( |
222 | | - "New root is not signed by itself", new_root.signed |
223 | | - ) |
| 181 | + new_root.verify_delegate("root", new_root) |
224 | 182 |
|
225 | 183 | self._trusted_set["root"] = new_root |
226 | 184 | logger.debug("Updated root") |
@@ -270,10 +228,7 @@ def update_timestamp(self, data: bytes): |
270 | 228 | f"Expected 'timestamp', got '{new_timestamp.signed.type}'" |
271 | 229 | ) |
272 | 230 |
|
273 | | - if not verify_with_threshold(self.root, "timestamp", new_timestamp): |
274 | | - raise exceptions.UnsignedMetadataError( |
275 | | - "New timestamp is not signed by root", new_timestamp.signed |
276 | | - ) |
| 231 | + self.root.verify_delegate("timestamp", new_timestamp) |
277 | 232 |
|
278 | 233 | # If an existing trusted timestamp is updated, |
279 | 234 | # check for a rollback attack |
@@ -339,10 +294,7 @@ def update_snapshot(self, data: bytes): |
339 | 294 | f"Expected 'snapshot', got '{new_snapshot.signed.type}'" |
340 | 295 | ) |
341 | 296 |
|
342 | | - if not verify_with_threshold(self.root, "snapshot", new_snapshot): |
343 | | - raise exceptions.UnsignedMetadataError( |
344 | | - "New snapshot is not signed by root", new_snapshot.signed |
345 | | - ) |
| 297 | + self.root.verify_delegate("snapshot", new_snapshot) |
346 | 298 |
|
347 | 299 | if ( |
348 | 300 | new_snapshot.signed.version |
@@ -408,7 +360,7 @@ def update_delegated_targets( |
408 | 360 | if self.snapshot is None: |
409 | 361 | raise RuntimeError("Cannot load targets before snapshot") |
410 | 362 |
|
411 | | - delegator = self.get(delegator_name) |
| 363 | + delegator: Optional[Metadata] = self.get(delegator_name) |
412 | 364 | if delegator is None: |
413 | 365 | raise RuntimeError("Cannot load targets before delegator") |
414 | 366 |
|
@@ -438,11 +390,7 @@ def update_delegated_targets( |
438 | 390 | f"Expected 'targets', got '{new_delegate.signed.type}'" |
439 | 391 | ) |
440 | 392 |
|
441 | | - if not verify_with_threshold(delegator, role_name, new_delegate): |
442 | | - raise exceptions.UnsignedMetadataError( |
443 | | - f"New {role_name} is not signed by {delegator_name}", |
444 | | - new_delegate, |
445 | | - ) |
| 393 | + delegator.verify_delegate(role_name, new_delegate) |
446 | 394 |
|
447 | 395 | if new_delegate.signed.version != meta.version: |
448 | 396 | raise exceptions.BadVersionNumberError( |
|
0 commit comments