From 82bc106bcd30ad78c1a28c51949068618b5b342d Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Fri, 1 Oct 2021 21:48:27 -0400 Subject: [PATCH 1/7] Add feature to define findings manually in the model, similar to overrides --- docs/template.md | 21 +++++++++++++++++++++ pytm/pytm.py | 8 ++++++++ tm.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+) diff --git a/docs/template.md b/docs/template.md index 55a7a03..2d04dcc 100644 --- a/docs/template.md +++ b/docs/template.md @@ -54,3 +54,24 @@ Name|Description|Classification   }| + +## Custom Threats + +|{custom_findings:repeat: +
+ {{item.id}} -- {{item.description}} +
Targeted Element
+

{{item.target}}

+
Severity
+

{{item.severity}}

+
Example Instances
+

{{item.example}}

+
Mitigations
+

{{item.mitigations}}

+
References
+

{{item.references}}

+   +   +   +
+}| diff --git a/pytm/pytm.py b/pytm/pytm.py index 9791b7a..5e84639 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -715,6 +715,7 @@ class TM: mergeResponses = varBool(False, doc="Merge response edges in DFDs") ignoreUnused = varBool(False, doc="Ignore elements not used in any Dataflow") findings = varFindings([], doc="threats found for elements of this model") + custom_findings = varFindings([], doc="custom threats manually added to elements of this model") onDuplicates = varAction( Action.NO_ACTION, doc="""How to handle duplicate Dataflow @@ -754,11 +755,15 @@ def _add_threats(self): def resolve(self): finding_count = 0 findings = [] + custom_findings = [] elements = defaultdict(list) for e in TM._elements: if not e.inScope: continue + if (len(e.custom_findings) > 0): + custom_findings.extend(e.custom_findings) + override_ids = set(f.threat_id for f in e.overrides) # if element is a dataflow filter out overrides from source and sink # because they will be always applied there anyway @@ -779,6 +784,7 @@ def resolve(self): findings.append(f) elements[e].append(f) self.findings = findings + self.custom_findings = custom_findings for e, findings in elements.items(): e.findings = findings @@ -965,6 +971,7 @@ def report(self, template_path): "dataflows": TM._flows, "threats": threats, "findings": findings, + "custom_findings": self.custom_findings, "elements": TM._elements, "assets": TM._assets, "actors": TM._actors, @@ -1131,6 +1138,7 @@ class Element: doc="""Minimum TLS version required.""", ) findings = varFindings([], doc="Threats that apply to this element") + custom_findings = varFindings([], doc="custom threats manually added to this element") overrides = varFindings( [], doc="""Overrides to findings, allowing to set diff --git a/tm.py b/tm.py index 8c59864..c6172e0 100755 --- a/tm.py +++ b/tm.py @@ -10,6 +10,7 @@ Datastore, Lambda, Server, + Finding, ) tm = TM("my test tm") @@ -33,6 +34,20 @@ web.encodesOutput = True web.authorizesSource = False web.sourceFiles = ["pytm/json.py", "docs/template.md"] +web.custom_findings = [ + Finding(web, + id = "foo_id_3", + description = "foo_description", + details = "foo_details", + severity = "HIGH", + mitigations = "foo_mitigations", + example = "foo_example", + threat_id = "foo_threat_id_3", + references = "foo_references", + response = "accepted", + ), +] + db = Datastore("SQL Database") db.OS = "CentOS" @@ -108,6 +123,31 @@ my_lambda_to_db.protocol = "MySQL" my_lambda_to_db.dstPort = 3306 my_lambda_to_db.data = clear_op +my_lambda_to_db.custom_findings = [ + Finding(my_lambda_to_db, + id = "foo_id_1", + description = "foo_description", + details = "foo_details", + severity = "HIGH", + mitigations = "foo_mitigations", + example = "foo_example", + threat_id = "foo_threat_id_1", + references = "foo_references", + response = "accepted", + ), + Finding(my_lambda_to_db, + id = "foo_id_2", + description = "foo_description", + details = "foo_details", + severity = "HIGH", + mitigations = "foo_mitigations", + example = "foo_example", + threat_id = "foo_threat_id_2", + references = "foo_references", + response = "accepted", + ) + +] userIdToken = Data( name="User ID Token", From 9e0fce534de9478f74f9ffef2814e05ce27395e4 Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Mon, 4 Oct 2021 21:43:10 -0400 Subject: [PATCH 2/7] rename from 'custom' to 'manual' findings. --- docs/template.md | 2 +- pytm/pytm.py | 14 +++++++------- tm.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/template.md b/docs/template.md index 2d04dcc..94ca795 100644 --- a/docs/template.md +++ b/docs/template.md @@ -57,7 +57,7 @@ Name|Description|Classification ## Custom Threats -|{custom_findings:repeat: +|{manual_findings:repeat:
{{item.id}} -- {{item.description}}
Targeted Element
diff --git a/pytm/pytm.py b/pytm/pytm.py index 5e84639..7c9047a 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -715,7 +715,7 @@ class TM: mergeResponses = varBool(False, doc="Merge response edges in DFDs") ignoreUnused = varBool(False, doc="Ignore elements not used in any Dataflow") findings = varFindings([], doc="threats found for elements of this model") - custom_findings = varFindings([], doc="custom threats manually added to elements of this model") + manual_findings = varFindings([], doc="Findings manually added to elements of this model") onDuplicates = varAction( Action.NO_ACTION, doc="""How to handle duplicate Dataflow @@ -755,14 +755,14 @@ def _add_threats(self): def resolve(self): finding_count = 0 findings = [] - custom_findings = [] + manual_findings = [] elements = defaultdict(list) for e in TM._elements: if not e.inScope: continue - if (len(e.custom_findings) > 0): - custom_findings.extend(e.custom_findings) + if (len(e.manual_findings) > 0): + manual_findings.extend(e.manual_findings) override_ids = set(f.threat_id for f in e.overrides) # if element is a dataflow filter out overrides from source and sink @@ -784,7 +784,7 @@ def resolve(self): findings.append(f) elements[e].append(f) self.findings = findings - self.custom_findings = custom_findings + self.manual_findings = manual_findings for e, findings in elements.items(): e.findings = findings @@ -971,7 +971,7 @@ def report(self, template_path): "dataflows": TM._flows, "threats": threats, "findings": findings, - "custom_findings": self.custom_findings, + "manual_findings": self.manual_findings, "elements": TM._elements, "assets": TM._assets, "actors": TM._actors, @@ -1138,7 +1138,7 @@ class Element: doc="""Minimum TLS version required.""", ) findings = varFindings([], doc="Threats that apply to this element") - custom_findings = varFindings([], doc="custom threats manually added to this element") + manual_findings = varFindings([], doc="Findings manually added to this element") overrides = varFindings( [], doc="""Overrides to findings, allowing to set diff --git a/tm.py b/tm.py index c6172e0..5ff6e77 100755 --- a/tm.py +++ b/tm.py @@ -34,7 +34,7 @@ web.encodesOutput = True web.authorizesSource = False web.sourceFiles = ["pytm/json.py", "docs/template.md"] -web.custom_findings = [ +web.manual_findings = [ Finding(web, id = "foo_id_3", description = "foo_description", @@ -123,7 +123,7 @@ my_lambda_to_db.protocol = "MySQL" my_lambda_to_db.dstPort = 3306 my_lambda_to_db.data = clear_op -my_lambda_to_db.custom_findings = [ +my_lambda_to_db.manual_findings = [ Finding(my_lambda_to_db, id = "foo_id_1", description = "foo_description", From 1eaeded77787cdccdb3862c83cb6883dc204f7dd Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Mon, 4 Oct 2021 23:04:16 -0400 Subject: [PATCH 3/7] Added source element to Findings, merged manual findings into main and per element finding lists, updated json test data. --- docs/template.md | 22 ++-------------------- pytm/pytm.py | 15 ++++++++++----- tests/output.json | 20 +++++++++++++++++++- tm.py | 7 ++++--- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/docs/template.md b/docs/template.md index 94ca795..10ea583 100644 --- a/docs/template.md +++ b/docs/template.md @@ -49,29 +49,11 @@ Name|Description|Classification

{{item.mitigations}}

References

{{item.references}}

+
Source
+

{{item.source}}

     
}| -## Custom Threats - -|{manual_findings:repeat: -
- {{item.id}} -- {{item.description}} -
Targeted Element
-

{{item.target}}

-
Severity
-

{{item.severity}}

-
Example Instances
-

{{item.example}}

-
Mitigations
-

{{item.mitigations}}

-
References
-

{{item.references}}

-   -   -   -
-}| diff --git a/pytm/pytm.py b/pytm/pytm.py index 7c9047a..a5858da 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -626,6 +626,7 @@ class Finding: """, ) cvss = varString("", required=False, doc="The CVSS score and/or vector") + source = varString("manual", required=False, doc="The source of the Finding.") def __init__( self, @@ -715,7 +716,6 @@ class TM: mergeResponses = varBool(False, doc="Merge response edges in DFDs") ignoreUnused = varBool(False, doc="Ignore elements not used in any Dataflow") findings = varFindings([], doc="threats found for elements of this model") - manual_findings = varFindings([], doc="Findings manually added to elements of this model") onDuplicates = varAction( Action.NO_ACTION, doc="""How to handle duplicate Dataflow @@ -762,8 +762,13 @@ def resolve(self): continue if (len(e.manual_findings) > 0): - manual_findings.extend(e.manual_findings) + for f in e.manual_findings: + finding_count += 1 + f._safeset("id", str(finding_count)); + findings.append(f) + elements[e].append(f) + override_ids = set(f.threat_id for f in e.overrides) # if element is a dataflow filter out overrides from source and sink # because they will be always applied there anyway @@ -779,12 +784,12 @@ def resolve(self): continue finding_count += 1 - f = Finding(e, id=str(finding_count), threat=t) + f = Finding(e, id=str(finding_count), threat=t, source="pytm") logger.debug(f"new finding: {f}") findings.append(f) elements[e].append(f) + self.findings = findings - self.manual_findings = manual_findings for e, findings in elements.items(): e.findings = findings @@ -971,7 +976,6 @@ def report(self, template_path): "dataflows": TM._flows, "threats": threats, "findings": findings, - "manual_findings": self.manual_findings, "elements": TM._elements, "assets": TM._assets, "actors": TM._actors, @@ -1862,6 +1866,7 @@ def encode_threat_data(obj): "threat_id", "references", "condition", + "source", ] if type(obj) is Finding or (len(obj) != 0 and type(obj[0]) is Finding): diff --git a/tests/output.json b/tests/output.json index 0adee11..a5de779 100644 --- a/tests/output.json +++ b/tests/output.json @@ -16,6 +16,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User", @@ -68,6 +69,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Web Server", @@ -127,6 +129,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Lambda func", @@ -181,6 +184,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Task queue worker", @@ -241,6 +245,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "SQL Database", @@ -273,6 +278,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Internet", @@ -287,6 +293,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Server/DB", @@ -333,6 +340,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User", @@ -383,6 +391,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Web Server", @@ -442,6 +451,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Lambda func", @@ -496,6 +506,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Task queue worker", @@ -556,6 +567,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "SQL Database", @@ -601,6 +613,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User enters comments (*)", @@ -636,6 +649,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Insert query with comments", @@ -671,6 +685,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Call func", @@ -706,6 +721,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Retrieve comments", @@ -741,6 +757,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Show comments (*)", @@ -776,6 +793,7 @@ "levels": [ 0 ], + "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Query for tasks", @@ -801,4 +819,4 @@ "onDuplicates": "Action.NO_ACTION", "threatsExcluded": [], "threatsFile": "pytm/threatlib/threats.json" -} +} \ No newline at end of file diff --git a/tm.py b/tm.py index 5ff6e77..62c22f0 100755 --- a/tm.py +++ b/tm.py @@ -36,7 +36,7 @@ web.sourceFiles = ["pytm/json.py", "docs/template.md"] web.manual_findings = [ Finding(web, - id = "foo_id_3", + #id = "foo_id_3", description = "foo_description", details = "foo_details", severity = "HIGH", @@ -125,7 +125,7 @@ my_lambda_to_db.data = clear_op my_lambda_to_db.manual_findings = [ Finding(my_lambda_to_db, - id = "foo_id_1", + # id = "foo_id_1", description = "foo_description", details = "foo_details", severity = "HIGH", @@ -136,7 +136,7 @@ response = "accepted", ), Finding(my_lambda_to_db, - id = "foo_id_2", +# id = "foo_id_2", description = "foo_description", details = "foo_details", severity = "HIGH", @@ -145,6 +145,7 @@ threat_id = "foo_threat_id_2", references = "foo_references", response = "accepted", + source = "Product Security" ) ] From 16408991a9dd13544f84d7c309846ef143a5ff80 Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Tue, 5 Oct 2021 00:58:59 -0400 Subject: [PATCH 4/7] Removed duplicate element reference when assigning a manual finding to an element in a model. --- pytm/pytm.py | 32 ++++++++++++++++++++------------ tm.py | 21 ++++++--------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/pytm/pytm.py b/pytm/pytm.py index a5858da..705eaf0 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -59,12 +59,18 @@ def __set__(self, instance, value): # called when x.d = val # instance = x # value = val + if instance in self.data: - raise ValueError( - "cannot overwrite {}.{} value with {}, already set to {}".format( - instance, self.__class__.__name__, value, self.data[instance] + if (not isinstance(instance, Finding) + or (isinstance(instance, Finding) and isinstance(self,varString) and self.data[instance] != "invalid") + or (isinstance(instance, Finding) and isinstance(self,varElement) and self.data[instance].name != "invalid") + ): + raise ValueError( + "cannot overwrite {}.{} value with {}, already set to {}".format( + instance, self.__class__.__name__, value, self.data[instance] + ) ) - ) + self.data[instance] = value if self.onSet is not None: self.onSet(instance, value) @@ -603,17 +609,17 @@ class Finding: """Represents a Finding - the element in question and a description of the finding""" - element = varElement(None, required=True, doc="Element this finding applies to") + element = varElement(None, required=False, doc="Element this finding applies to") target = varString("", doc="Name of the element this finding applies to") description = varString("", required=True, doc="Threat description") details = varString("", required=True, doc="Threat details") - severity = varString("", required=True, doc="Threat severity") - mitigations = varString("", required=True, doc="Threat mitigations") - example = varString("", required=True, doc="Threat example") + severity = varString("", required=False, doc="Threat severity") + mitigations = varString("", required=False, doc="Threat mitigations") + example = varString("", required=False, doc="Threat example") id = varString("", required=True, doc="Finding ID") - threat_id = varString("", required=True, doc="Threat ID") - references = varString("", required=True, doc="Threat references") - condition = varString("", required=True, doc="Threat condition") + threat_id = varString("", required=False, doc="Threat ID") + references = varString("", required=False, doc="Threat references") + condition = varString("", required=False, doc="Threat condition") response = varString( "", required=False, @@ -765,7 +771,9 @@ def resolve(self): for f in e.manual_findings: finding_count += 1 - f._safeset("id", str(finding_count)); + f._safeset("id", str(finding_count)) + f._safeset("element", e) + f._safeset("target", e.name) findings.append(f) elements[e].append(f) diff --git a/tm.py b/tm.py index 62c22f0..0fd6504 100755 --- a/tm.py +++ b/tm.py @@ -35,16 +35,9 @@ web.authorizesSource = False web.sourceFiles = ["pytm/json.py", "docs/template.md"] web.manual_findings = [ - Finding(web, - #id = "foo_id_3", - description = "foo_description", + Finding( + description = "foo_description_3", details = "foo_details", - severity = "HIGH", - mitigations = "foo_mitigations", - example = "foo_example", - threat_id = "foo_threat_id_3", - references = "foo_references", - response = "accepted", ), ] @@ -124,9 +117,8 @@ my_lambda_to_db.dstPort = 3306 my_lambda_to_db.data = clear_op my_lambda_to_db.manual_findings = [ - Finding(my_lambda_to_db, - # id = "foo_id_1", - description = "foo_description", + Finding( + description = "foo_description_1", details = "foo_details", severity = "HIGH", mitigations = "foo_mitigations", @@ -135,9 +127,8 @@ references = "foo_references", response = "accepted", ), - Finding(my_lambda_to_db, -# id = "foo_id_2", - description = "foo_description", + Finding( + description = "foo_description_2", details = "foo_details", severity = "HIGH", mitigations = "foo_mitigations", From 389ee110935f4e6ed0ca2a98a1eaea3852d8d248 Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Tue, 5 Oct 2021 15:51:43 -0400 Subject: [PATCH 5/7] remove manual_findings attribute and re-use findings. Updated sample tm, docs and json test data. --- docs/pytm/index.html | 78 ++++++++++++++++++++++++++++++++++++-------- pytm/pytm.py | 8 ++--- tests/output.json | 18 ---------- tm.py | 4 +-- 4 files changed, 69 insertions(+), 39 deletions(-) diff --git a/docs/pytm/index.html b/docs/pytm/index.html index 30cb0a5..8501e23 100644 --- a/docs/pytm/index.html +++ b/docs/pytm/index.html @@ -3,7 +3,7 @@ - + pytm API documentation @@ -2134,17 +2134,17 @@

Instance variables

"""Represents a Finding - the element in question and a description of the finding""" - element = varElement(None, required=True, doc="Element this finding applies to") + element = varElement(None, required=False, doc="Element this finding applies to") target = varString("", doc="Name of the element this finding applies to") description = varString("", required=True, doc="Threat description") details = varString("", required=True, doc="Threat details") - severity = varString("", required=True, doc="Threat severity") - mitigations = varString("", required=True, doc="Threat mitigations") - example = varString("", required=True, doc="Threat example") + severity = varString("", required=False, doc="Threat severity") + mitigations = varString("", required=False, doc="Threat mitigations") + example = varString("", required=False, doc="Threat example") id = varString("", required=True, doc="Finding ID") - threat_id = varString("", required=True, doc="Threat ID") - references = varString("", required=True, doc="Threat references") - condition = varString("", required=True, doc="Threat condition") + threat_id = varString("", required=False, doc="Threat ID") + references = varString("", required=False, doc="Threat references") + condition = varString("", required=False, doc="Threat condition") response = varString( "", required=False, @@ -2157,6 +2157,7 @@

Instance variables

""", ) cvss = varString("", required=False, doc="The CVSS score and/or vector") + source = varString("manual", required=False, doc="The source of the Finding.") def __init__( self, @@ -2218,7 +2219,7 @@

Instance variables

) def __str__(self): - return f"{self.target}: {self.description}\n{self.details}\n{self.severity}" + return f"'{self.target}': {self.description}\n{self.details}\n{self.severity}"

Instance variables

@@ -2403,6 +2404,22 @@

Instance variables

return self.data.get(instance, self.default) +
var source
+
+

The source of the Finding.

+
+ +Expand source code + +
def __get__(self, instance, owner):
+    # when x.d is called we get here
+    # instance = x
+    # owner = type(x)
+    if instance is None:
+        return self
+    return self.data.get(instance, self.default)
+
+
var target

Name of the element this finding applies to

@@ -3587,6 +3604,16 @@

Class variables

if not e.inScope: continue + if (len(e.findings) > 0): + + for f in e.findings: + finding_count += 1 + f._safeset("id", str(finding_count)) + f._safeset("element", e) + f._safeset("target", e.name) + findings.append(f) + elements[e].append(f) + override_ids = set(f.threat_id for f in e.overrides) # if element is a dataflow filter out overrides from source and sink # because they will be always applied there anyway @@ -3602,12 +3629,15 @@

Class variables

continue finding_count += 1 - f = Finding(e, id=str(finding_count), threat=t) + f = Finding(e, id=str(finding_count), threat=t, source="pytm") + logger.debug(f"new finding: {f}") findings.append(f) elements[e].append(f) + self.findings = findings for e, findings in elements.items(): - e.findings = findings + e._safeset("findings", findings) + #e.findings = findings def check(self): if self.description is None: @@ -3839,6 +3869,9 @@

Class variables

if result.describe is not None: _describe_classes(result.describe.split()) + if result.list_elements: + _list_elements() + if result.list is True: [print("{} - {}".format(t.id, t.description)) for t in TM._threats] @@ -4160,6 +4193,9 @@

Methods

if result.describe is not None: _describe_classes(result.describe.split()) + if result.list_elements: + _list_elements() + if result.list is True: [print("{} - {}".format(t.id, t.description)) for t in TM._threats] @@ -4215,6 +4251,16 @@

Methods

if not e.inScope: continue + if (len(e.findings) > 0): + + for f in e.findings: + finding_count += 1 + f._safeset("id", str(finding_count)) + f._safeset("element", e) + f._safeset("target", e.name) + findings.append(f) + elements[e].append(f) + override_ids = set(f.threat_id for f in e.overrides) # if element is a dataflow filter out overrides from source and sink # because they will be always applied there anyway @@ -4230,12 +4276,15 @@

Methods

continue finding_count += 1 - f = Finding(e, id=str(finding_count), threat=t) + f = Finding(e, id=str(finding_count), threat=t, source="pytm") + logger.debug(f"new finding: {f}") findings.append(f) elements[e].append(f) + self.findings = findings for e, findings in elements.items(): - e.findings = findings + e._safeset("findings", findings) + #e.findings = findings
@@ -4673,6 +4722,7 @@

Finding

  • references
  • response
  • severity
  • +
  • source
  • target
  • threat_id
  • @@ -4805,7 +4855,7 @@

    Threat

    \ No newline at end of file diff --git a/pytm/pytm.py b/pytm/pytm.py index 705eaf0..fe06695 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -761,15 +761,14 @@ def _add_threats(self): def resolve(self): finding_count = 0 findings = [] - manual_findings = [] elements = defaultdict(list) for e in TM._elements: if not e.inScope: continue - if (len(e.manual_findings) > 0): + if (len(e.findings) > 0): - for f in e.manual_findings: + for f in e.findings: finding_count += 1 f._safeset("id", str(finding_count)) f._safeset("element", e) @@ -799,7 +798,7 @@ def resolve(self): self.findings = findings for e, findings in elements.items(): - e.findings = findings + e._safeset("findings", findings) def check(self): if self.description is None: @@ -1150,7 +1149,6 @@ class Element: doc="""Minimum TLS version required.""", ) findings = varFindings([], doc="Threats that apply to this element") - manual_findings = varFindings([], doc="Findings manually added to this element") overrides = varFindings( [], doc="""Overrides to findings, allowing to set diff --git a/tests/output.json b/tests/output.json index a5de779..601acc1 100644 --- a/tests/output.json +++ b/tests/output.json @@ -16,7 +16,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User", @@ -69,7 +68,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Web Server", @@ -129,7 +127,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Lambda func", @@ -184,7 +181,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Task queue worker", @@ -245,7 +241,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "SQL Database", @@ -278,7 +273,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Internet", @@ -293,7 +287,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Server/DB", @@ -340,7 +333,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User", @@ -391,7 +383,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Web Server", @@ -451,7 +442,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Lambda func", @@ -506,7 +496,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Task queue worker", @@ -567,7 +556,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "SQL Database", @@ -613,7 +601,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "User enters comments (*)", @@ -649,7 +636,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Insert query with comments", @@ -685,7 +671,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Call func", @@ -721,7 +706,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Retrieve comments", @@ -757,7 +741,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Show comments (*)", @@ -793,7 +776,6 @@ "levels": [ 0 ], - "manual_findings": [], "maxClassification": "Classification.UNKNOWN", "minTLSVersion": "TLSVersion.NONE", "name": "Query for tasks", diff --git a/tm.py b/tm.py index 0fd6504..ef44464 100755 --- a/tm.py +++ b/tm.py @@ -34,7 +34,7 @@ web.encodesOutput = True web.authorizesSource = False web.sourceFiles = ["pytm/json.py", "docs/template.md"] -web.manual_findings = [ +web.findings = [ Finding( description = "foo_description_3", details = "foo_details", @@ -116,7 +116,7 @@ my_lambda_to_db.protocol = "MySQL" my_lambda_to_db.dstPort = 3306 my_lambda_to_db.data = clear_op -my_lambda_to_db.manual_findings = [ +my_lambda_to_db.findings = [ Finding( description = "foo_description_1", details = "foo_details", From 05f6dfdaf2e906357d971642772b9bc5e82a962e Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Tue, 5 Oct 2021 19:41:10 -0400 Subject: [PATCH 6/7] review comments. In addition to supporting Findings added to an element's findings, added Finding added to the model with element as arg. --- pytm/pytm.py | 62 +++++++++++++++++++++++++++++++++------------------- tm.py | 8 ++++++- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/pytm/pytm.py b/pytm/pytm.py index fe06695..8c6b46a 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -61,10 +61,7 @@ def __set__(self, instance, value): # value = val if instance in self.data: - if (not isinstance(instance, Finding) - or (isinstance(instance, Finding) and isinstance(self,varString) and self.data[instance] != "invalid") - or (isinstance(instance, Finding) and isinstance(self,varElement) and self.data[instance].name != "invalid") - ): + if (not isinstance(instance, Finding)): raise ValueError( "cannot overwrite {}.{} value with {}, already set to {}".format( instance, self.__class__.__name__, value, self.data[instance] @@ -642,10 +639,12 @@ def __init__( if args: element = args[0] else: - element = kwargs.pop("element", Element("invalid")) + element = kwargs.pop("element", None) + + if (element): + self.target = element.name + self.element = element - self.target = element.name - self.element = element attrs = [ "description", "details", @@ -663,25 +662,30 @@ def __init__( kwargs[a] = getattr(threat, a) threat_id = kwargs.get("threat_id", None) - for f in element.overrides: - if f.threat_id != threat_id: - continue - for i in dir(f.__class__): - attr = getattr(f.__class__, i) - if ( - i in ("element", "target") - or i.startswith("_") - or callable(attr) - or not isinstance(attr, var) - ): + + if (element): + for f in element.overrides: + if f.threat_id != threat_id: continue - if f in attr.data: - kwargs[i] = attr.data[f] - break - + for i in dir(f.__class__): + attr = getattr(f.__class__, i) + if ( + i in ("element", "target") + or i.startswith("_") + or callable(attr) + or not isinstance(attr, var) + ): + continue + if f in attr.data: + kwargs[i] = attr.data[f] + break + for k, v in kwargs.items(): setattr(self, k, v) + TM._findings.append(self) + + def _safeset(self, attr, value): try: setattr(self, attr, value) @@ -711,6 +715,7 @@ class TM: _threatsExcluded = [] _sf = None _duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo" + _findings = [] name = varString("", required=True, doc="Model name") description = varString("", required=True, doc="Model description") threatsFile = varString( @@ -746,6 +751,7 @@ def reset(cls): cls._threats = [] cls._boundaries = [] cls._data = [] + cls._findings = [] def _init_threats(self): TM._threats = [] @@ -762,10 +768,20 @@ def resolve(self): finding_count = 0 findings = [] elements = defaultdict(list) + + #Manually added findings with element as arg to Finding object + for f in TM._findings: + if (f.element): + finding_count += 1 + f._safeset("id", str(finding_count)) + findings.append(f) + elements[f.element].append(f) + for e in TM._elements: if not e.inScope: continue + #Manually added findings, added to an element's finding attribute if (len(e.findings) > 0): for f in e.findings: @@ -786,6 +802,7 @@ def resolve(self): except AttributeError: pass + #Findings added by pytm using threatlib for t in TM._threats: if not t.apply(e) and t.id not in override_ids: continue @@ -797,6 +814,7 @@ def resolve(self): elements[e].append(f) self.findings = findings + for e, findings in elements.items(): e._safeset("findings", findings) diff --git a/tm.py b/tm.py index ef44464..3bfca1b 100755 --- a/tm.py +++ b/tm.py @@ -37,9 +37,15 @@ web.findings = [ Finding( description = "foo_description_3", - details = "foo_details", + details = "foo_details_3", ), ] + +Finding(web, + description = "foo_description_4", + details = "foo_details_4", + ) + db = Datastore("SQL Database") From 34959b68ce51d5771d1caae26a08ed6e975b8c0f Mon Sep 17 00:00:00 2001 From: Nick Ozmore Date: Tue, 12 Oct 2021 11:54:04 -0400 Subject: [PATCH 7/7] Code cleanup, removed unnecessary condition. --- pytm/pytm.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pytm/pytm.py b/pytm/pytm.py index 8c6b46a..586508a 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -59,15 +59,12 @@ def __set__(self, instance, value): # called when x.d = val # instance = x # value = val - if instance in self.data: - if (not isinstance(instance, Finding)): - raise ValueError( - "cannot overwrite {}.{} value with {}, already set to {}".format( - instance, self.__class__.__name__, value, self.data[instance] - ) + raise ValueError( + "cannot overwrite {}.{} value with {}, already set to {}".format( + instance, self.__class__.__name__, value, self.data[instance] ) - + ) self.data[instance] = value if self.onSet is not None: self.onSet(instance, value)