Skip to content

Commit ee98aa3

Browse files
author
Jussi Kukkonen
committed
Metadata API: Move "typed constructors" to Signed
Change the way Metadata[Root] and other typed Metadata are constructed. Before: root_md = Metadata._from_file(filename, signed_type=Root) after: root_md = Root.metadata_from_file(filename) This nicely removes the requirement for an extra argument. This unfortunately adds a lot of lines to metadata.py but actual LOC count does not really change -- it's just 2 additional abstract methods and 4 one-liner implementations for each of those. The only things these implementations do are: * annotates returns type as e.g. Metadata[Root] * raises if the signed type in input is incorrect Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
1 parent c79c7c6 commit ee98aa3

2 files changed

Lines changed: 170 additions & 29 deletions

File tree

tests/test_api.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ def test_typed_read(self):
140140
data = f.read()
141141

142142
# Loading a root file as "Metadata[Root]" succeeds
143-
md = Metadata.from_bytes(data, signed_type=Root)
144-
md2 = Metadata.from_file(path, signed_type=Root)
143+
md = Root.metadata_from_bytes(data)
144+
md2 = Root.metadata_from_file(path)
145145

146146
# Loading the file fails with non-"Root" type constraints
147147
for expected_type in [Timestamp, Snapshot, Targets]:
148148
with self.assertRaises(DeserializationError):
149-
Metadata.from_bytes(data, signed_type=expected_type)
149+
expected_type.metadata_from_bytes(data)
150150
with self.assertRaises(DeserializationError):
151-
Metadata.from_file(path, signed_type=expected_type)
151+
expected_type.metadata_from_file(path)
152152

153153

154154
def test_compact_json(self):
@@ -177,7 +177,7 @@ def test_read_write_read_compare(self):
177177

178178
def test_sign_verify(self):
179179
root_path = os.path.join(self.repo_dir, 'metadata', 'root.json')
180-
root = Metadata.from_file(root_path, signed_type=Root).signed
180+
root = Root.metadata_from_file(root_path).signed
181181

182182
# Locate the public keys we need from root
183183
targets_keyid = next(iter(root.roles["targets"].keyids))
@@ -189,7 +189,7 @@ def test_sign_verify(self):
189189

190190
# Load sample metadata (targets) and assert ...
191191
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
192-
metadata_obj = Metadata.from_file(path, signed_type=Targets)
192+
metadata_obj = Targets.metadata_from_file(path)
193193

194194
# ... it has a single existing signature,
195195
self.assertEqual(len(metadata_obj.signatures), 1)
@@ -306,7 +306,7 @@ def test_targetfile_class(self):
306306
def test_metadata_snapshot(self):
307307
snapshot_path = os.path.join(
308308
self.repo_dir, 'metadata', 'snapshot.json')
309-
snapshot = Metadata.from_file(snapshot_path, signed_type=Snapshot)
309+
snapshot = Snapshot.metadata_from_file(snapshot_path)
310310

311311
# Create a MetaFile instance representing what we expect
312312
# the updated data to be.
@@ -332,7 +332,7 @@ def test_metadata_snapshot(self):
332332
def test_metadata_timestamp(self):
333333
timestamp_path = os.path.join(
334334
self.repo_dir, 'metadata', 'timestamp.json')
335-
timestamp = Metadata.from_file(timestamp_path, signed_type=Timestamp)
335+
timestamp = Timestamp.metadata_from_file(timestamp_path)
336336

337337
self.assertEqual(timestamp.signed.version, 1)
338338
timestamp.signed.bump_version()
@@ -441,8 +441,7 @@ def test_role_class(self):
441441
def test_metadata_root(self):
442442
root_path = os.path.join(
443443
self.repo_dir, 'metadata', 'root.json')
444-
root = Metadata.from_file(root_path, signed_type=Root)
445-
444+
root = Root.metadata_from_file(root_path)
446445
# Add a second key to root role
447446
root_key2 = import_ed25519_publickey_from_file(
448447
os.path.join(self.keystore_dir, 'root_key2.pub'))
@@ -579,7 +578,7 @@ def test_delegation_class(self):
579578
def test_metadata_targets(self):
580579
targets_path = os.path.join(
581580
self.repo_dir, 'metadata', 'targets.json')
582-
targets = Metadata.from_file(targets_path, signed_type=Targets)
581+
targets = Targets.metadata_from_file(targets_path)
583582

584583
# Create a fileinfo dict representing what we expect the updated data to be
585584
filename = 'file2.txt'
@@ -668,7 +667,7 @@ def test_length_and_hash_validation(self):
668667
# for untrusted metadata file to verify.
669668
timestamp_path = os.path.join(
670669
self.repo_dir, 'metadata', 'timestamp.json')
671-
timestamp = Metadata.from_file(timestamp_path, signed_type=Timestamp)
670+
timestamp = Timestamp.metadata_from_file(timestamp_path)
672671
snapshot_metafile = timestamp.signed.meta["snapshot.json"]
673672

674673
snapshot_path = os.path.join(
@@ -702,7 +701,7 @@ def test_length_and_hash_validation(self):
702701
# Test target files' hash and length verification
703702
targets_path = os.path.join(
704703
self.repo_dir, 'metadata', 'targets.json')
705-
targets = Metadata.from_file(targets_path, signed_type=Targets)
704+
targets = Targets.metadata_from_file(targets_path)
706705
file1_targetfile = targets.signed.targets['file1.txt']
707706
filepath = os.path.join(
708707
self.repo_dir, 'targets', 'file1.txt')

tuf/api/metadata.py

Lines changed: 158 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,11 @@ class Metadata(Generic[T]):
7373
[Root, Timestamp, Snapshot, Targets]. The purpose of this is to allow
7474
type checking of the signed attribute in code using Metadata::
7575
76-
root_md = Metadata.from_file("root.json", signed_type=Root)
77-
# root_md type is now Metadata[Root]. This means signed and its
76+
root_md = Root.metadata_from_file("root.json")
77+
print(root_md.signed.consistent_snapshot)
78+
# root_md type is now Metadata[Root]. This means root_md.signed and its
7879
# attributes like consistent_snapshot are now statically typed and the
7980
# types can be verified by static type checkers and shown by IDEs
80-
print(root_md.signed.consistent_snapshot)
81-
82-
Using the signed_type argument in factory constructors is not required but
83-
not doing so means T is not a specific type so static typing cannot happen.
8481
8582
Attributes:
8683
signed: A subclass of Signed, which has the actual metadata payload,
@@ -147,7 +144,6 @@ def from_file(
147144
filename: str,
148145
deserializer: Optional[MetadataDeserializer] = None,
149146
storage_backend: Optional[StorageBackendInterface] = None,
150-
signed_type: Optional[Type[T]] = None,
151147
) -> "Metadata[T]":
152148
"""Loads TUF metadata from file storage.
153149
@@ -159,7 +155,6 @@ def from_file(
159155
storage_backend: An object that implements
160156
securesystemslib.storage.StorageBackendInterface. Per default
161157
a (local) FilesystemBackend is used.
162-
signed_type: Optional; Expected type of deserialized signed object.
163158
164159
Raises:
165160
securesystemslib.exceptions.StorageError: The file cannot be read.
@@ -174,21 +169,19 @@ def from_file(
174169
storage_backend = FilesystemBackend()
175170

176171
with storage_backend.get(filename) as f:
177-
return Metadata.from_bytes(f.read(), deserializer, signed_type)
172+
return Metadata.from_bytes(f.read(), deserializer)
178173

179174
@staticmethod
180175
def from_bytes(
181176
data: bytes,
182177
deserializer: Optional[MetadataDeserializer] = None,
183-
signed_type: Optional[Type[T]] = None,
184178
) -> "Metadata[T]":
185179
"""Loads TUF metadata from raw data.
186180
187181
Arguments:
188182
data: metadata content as bytes.
189183
deserializer: Optional; A MetadataDeserializer instance that
190184
implements deserialization. Default is JSONDeserializer.
191-
signed_type: Optional; Expected type of deserialized signed object.
192185
193186
Raises:
194187
tuf.api.serialization.DeserializationError:
@@ -207,11 +200,6 @@ def from_bytes(
207200

208201
md = deserializer.deserialize(data)
209202

210-
# Ensure deserialized signed type matches the requested type
211-
if signed_type is not None and signed_type != type(md.signed):
212-
raise DeserializationError(
213-
f"Expected {signed_type}, got {type(md.signed)}"
214-
)
215203
return md
216204

217205
def to_dict(self) -> Dict[str, Any]:
@@ -364,6 +352,60 @@ def to_dict(self) -> Dict[str, Any]:
364352
"""Serialization helper that returns dict representation of self"""
365353
raise NotImplementedError
366354

355+
@classmethod
356+
@abc.abstractmethod
357+
def metadata_from_bytes(
358+
cls,
359+
data: bytes,
360+
deserializer: Optional[MetadataDeserializer] = None,
361+
) -> Metadata:
362+
"""Loads a Metadata object from bytes.
363+
364+
Like Metadata.from_bytes() but also raises DeserializationError if
365+
bytes does not contain the correct metadata type."""
366+
raise NotImplementedError
367+
368+
@classmethod
369+
@abc.abstractmethod
370+
def metadata_from_file(
371+
cls,
372+
filename: str,
373+
deserializer: Optional[MetadataDeserializer] = None,
374+
storage_backend: Optional[StorageBackendInterface] = None,
375+
) -> Metadata:
376+
"""Loads a Metadata object from file.
377+
378+
Like Metadata.from_file() but also raises DeserializationError if
379+
file does not contain the correct metadata type."""
380+
raise NotImplementedError
381+
382+
@classmethod
383+
def _metadata_from_bytes(
384+
cls, data: bytes, deserializer: Optional[MetadataDeserializer]
385+
) -> Metadata:
386+
"""Like Metadata.from_bytes() but raises on wrong type"""
387+
metadata = Metadata.from_bytes(data, deserializer)
388+
if not isinstance(metadata.signed, cls):
389+
raise DeserializationError(
390+
f"Expected {cls}, got {type(metadata.signed)}"
391+
)
392+
return metadata
393+
394+
@classmethod
395+
def _metadata_from_file(
396+
cls,
397+
filename: str,
398+
deserializer: Optional[MetadataDeserializer],
399+
storage_backend: Optional[StorageBackendInterface],
400+
) -> Metadata:
401+
"""Like Metadata.from_file() but raises on wrong type"""
402+
metadata = Metadata.from_file(filename, deserializer, storage_backend)
403+
if not isinstance(metadata.signed, cls):
404+
raise DeserializationError(
405+
f"Expected {cls}, got {type(metadata.signed)}"
406+
)
407+
return metadata
408+
367409
@classmethod
368410
@abc.abstractmethod
369411
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed":
@@ -632,6 +674,31 @@ def __init__(
632674
self.keys = keys
633675
self.roles = roles
634676

677+
@classmethod
678+
def metadata_from_bytes(
679+
cls,
680+
data: bytes,
681+
deserializer: Optional[MetadataDeserializer] = None,
682+
) -> Metadata["Root"]:
683+
"""Loads a Metadata[Root] from raw data.
684+
685+
Like Metadata.from_bytes() but also raises DeserializationError if
686+
bytes does not contain root metadata."""
687+
return cls._metadata_from_bytes(data, deserializer)
688+
689+
@classmethod
690+
def metadata_from_file(
691+
cls,
692+
filename: str,
693+
deserializer: Optional[MetadataDeserializer] = None,
694+
storage_backend: Optional[StorageBackendInterface] = None,
695+
) -> Metadata["Root"]:
696+
"""Loads a Metadata[Root] from file.
697+
698+
Like Metadata.from_file() but also raises DeserializationError if file
699+
does not contain root metadata."""
700+
return cls._metadata_from_file(filename, deserializer, storage_backend)
701+
635702
@classmethod
636703
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root":
637704
"""Creates Root object from its dict representation."""
@@ -846,6 +913,31 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp":
846913
# All fields left in the timestamp_dict are unrecognized.
847914
return cls(*common_args, meta, signed_dict)
848915

916+
@classmethod
917+
def metadata_from_bytes(
918+
cls,
919+
data: bytes,
920+
deserializer: Optional[MetadataDeserializer] = None,
921+
) -> Metadata["Timestamp"]:
922+
"""Loads a Metadata[Timestamp] from raw data.
923+
924+
Like Metadata.from_bytes() but also raises DeserializationError if
925+
bytes does not contain timestamp metadata."""
926+
return cls._metadata_from_bytes(data, deserializer)
927+
928+
@classmethod
929+
def metadata_from_file(
930+
cls,
931+
filename: str,
932+
deserializer: Optional[MetadataDeserializer] = None,
933+
storage_backend: Optional[StorageBackendInterface] = None,
934+
) -> Metadata["Timestamp"]:
935+
"""Loads a Metadata[Timestamp] from file.
936+
937+
Like Metadata.from_file() but also raises DeserializationError if file
938+
does not contain timestamp metadata."""
939+
return cls._metadata_from_file(filename, deserializer, storage_backend)
940+
849941
def to_dict(self) -> Dict[str, Any]:
850942
"""Returns the dict representation of self."""
851943
res_dict = self._common_fields_to_dict()
@@ -898,6 +990,31 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot":
898990
# All fields left in the snapshot_dict are unrecognized.
899991
return cls(*common_args, meta, signed_dict)
900992

993+
@classmethod
994+
def metadata_from_bytes(
995+
cls,
996+
data: bytes,
997+
deserializer: Optional[MetadataDeserializer] = None,
998+
) -> Metadata["Snapshot"]:
999+
"""Loads a Metadata[Snapshot] from raw data.
1000+
1001+
Like Metadata.from_bytes() but also raises DeserializationError if
1002+
bytes does not contain snapshot metadata."""
1003+
return cls._metadata_from_bytes(data, deserializer)
1004+
1005+
@classmethod
1006+
def metadata_from_file(
1007+
cls,
1008+
filename: str,
1009+
deserializer: Optional[MetadataDeserializer] = None,
1010+
storage_backend: Optional[StorageBackendInterface] = None,
1011+
) -> Metadata["Snapshot"]:
1012+
"""Loads a Metadata[Snapshot] from file.
1013+
1014+
Like Metadata.from_file() but also raises DeserializationError if file
1015+
does not contain snapshot metadata."""
1016+
return cls._metadata_from_file(filename, deserializer, storage_backend)
1017+
9011018
def to_dict(self) -> Dict[str, Any]:
9021019
"""Returns the dict representation of self."""
9031020
snapshot_dict = self._common_fields_to_dict()
@@ -1161,6 +1278,31 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets":
11611278
# All fields left in the targets_dict are unrecognized.
11621279
return cls(*common_args, res_targets, delegations, signed_dict)
11631280

1281+
@classmethod
1282+
def metadata_from_bytes(
1283+
cls,
1284+
data: bytes,
1285+
deserializer: Optional[MetadataDeserializer] = None,
1286+
) -> Metadata["Targets"]:
1287+
"""Loads a Metadata[Targets] from raw data.
1288+
1289+
Like Metadata.from_bytes() but also raises DeserializationError if
1290+
bytes does not contain targets metadata."""
1291+
return cls._metadata_from_bytes(data, deserializer)
1292+
1293+
@classmethod
1294+
def metadata_from_file(
1295+
cls,
1296+
filename: str,
1297+
deserializer: Optional[MetadataDeserializer] = None,
1298+
storage_backend: Optional[StorageBackendInterface] = None,
1299+
) -> Metadata["Targets"]:
1300+
"""Loads a Metadata[Targets] from file.
1301+
1302+
Like Metadata.from_file() but also raises DeserializationError if file
1303+
does not contain targets metadata."""
1304+
return cls._metadata_from_file(filename, deserializer, storage_backend)
1305+
11641306
def to_dict(self) -> Dict[str, Any]:
11651307
"""Returns the dict representation of self."""
11661308
targets_dict = self._common_fields_to_dict()

0 commit comments

Comments
 (0)