Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #222: fixed Findings creating spuriois Elements #223

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 242,16 @@ def __ne__(self, other):
def __str__(self):
return ", ".join(sorted(set(d.name for d in self)))


class varControls(var):
def __set__(self, instance, value):
if not isinstance(value, Controls):
raise ValueError(
"expecting an Controls "
"value, got a {}".format(type(value))
"expecting an Controls " "value, got a {}".format(type(value))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"expecting an Controls " "value, got a {}".format(type(value))
"expecting an Controls value, got a {}".format(type(value))

)
super().__set__(instance, value)


class Action(Enum):
"""Action taken when validating a threat model."""

Expand Down Expand Up @@ -442,18 443,25 @@ def _apply_defaults(flows, data):
e._safeset("dstPort", e.sink.port)
if hasattr(e.sink.controls, "isEncrypted"):
e.controls._safeset("isEncrypted", e.sink.controls.isEncrypted)
e.controls._safeset("authenticatesDestination", e.source.controls.authenticatesDestination)
e.controls._safeset("checksDestinationRevocation", e.source.controls.checksDestinationRevocation)
e.controls._safeset(
"authenticatesDestination", e.source.controls.authenticatesDestination
)
e.controls._safeset(
"checksDestinationRevocation", e.source.controls.checksDestinationRevocation
)

for d in e.data:
if d.isStored:
if hasattr(e.sink.controls, "isEncryptedAtRest"):
for d in e.data:
d._safeset("isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest)
d._safeset(
"isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest
)
if hasattr(e.source, "isEncryptedAtRest"):
for d in e.data:
d._safeset(
"isSourceEncryptedAtRest", e.source.controls.isEncryptedAtRest
"isSourceEncryptedAtRest",
e.source.controls.isEncryptedAtRest,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change appears to go against the convention you have for other changes in this set - from a single line _safeset() to 2-line, but here you have 3-lines. Try to be consistent.

)
if d.credentialsLife != Lifetime.NONE and not d.isCredentials:
d._safeset("isCredentials", True)
Expand Down Expand Up @@ -522,28 530,26 @@ def _describe_classes(classes):

def _list_elements():
"""List all elements which can be used in a threat model with the corresponding description"""

def all_subclasses(cls):
"""Get all sub classes of a class"""
subclasses = set(cls.__subclasses__())
return subclasses.union(
(s for c in subclasses for s in all_subclasses(c)))
return subclasses.union((s for c in subclasses for s in all_subclasses(c)))

def print_components(cls_list):
elements = sorted(cls_list, key=lambda c: c.__name__)
max_len = max((len(e.__name__) for e in elements))
for sc in elements:
doc = sc.__doc__ if sc.__doc__ is not None else ''
print(f'{sc.__name__:<{max_len}} -- {doc}')
#print all elements
print('Elements:')
doc = sc.__doc__ if sc.__doc__ is not None else ""
print(f"{sc.__name__:<{max_len}} -- {doc}")

# print all elements
print("Elements:")
print_components(all_subclasses(Element))

# Print Attributes
print('\nAtributes:')
print_components(
all_subclasses(OrderedEnum) | {Data, Action, Lifetime}
)

print("\nAtributes:")
print_components(all_subclasses(OrderedEnum) | {Data, Action, Lifetime})


def _get_elements_and_boundaries(flows):
Expand Down Expand Up @@ -661,7 667,7 @@ def __init__(
if args:
element = args[0]
else:
element = kwargs.pop("element", Element("invalid"))
element = kwargs.pop("element", Element("created from Finding"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the Element() is used here as a default value only to avoid checking later if element is None. If that's the case, it's better to explicitly check for it in this function, than check for a reserved name elsewhere in the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In class Finding, element is set to required. If Finding can be connected to the tm rather than a specific Element, it seems this might address the problem. A dummy element is being created just to satisfy this constraint.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, Finding also has 2 attributes on associating with Elements:

  • element - required Element object
  • target - optional - string?

Maybe we need to remove one?


self.target = element.name
self.element = element
Expand All @@ -683,6 689,7 @@ def __init__(

threat_id = kwargs.get("threat_id", None)
for f in element.overrides:
# we override on threat_id, which is
if f.threat_id != threat_id:
continue
for i in dir(f.__class__):
Expand Down Expand Up @@ -729,7 736,14 @@ class TM:
_data = []
_threatsExcluded = []
_sf = None
_duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo", "controls"
_duplicate_ignored_attrs = (
"name",
"note",
"order",
"response",
"responseTo",
"controls",
)
name = varString("", required=True, doc="Model name")
description = varString("", required=True, doc="Model description")
threatsFile = varString(
Expand Down Expand Up @@ -877,15 891,14 @@ def _check_duplicates(self, flows):

left_controls_attrs = left.controls._attr_values()
right_controls_attrs = right.controls._attr_values()
#for a in self._duplicate_ignored_attrs:
# for a in self._duplicate_ignored_attrs:
# del left_controls_attrs[a], right_controls_attrs[a]
if left_controls_attrs != right_controls_attrs:
continue
if self.onDuplicates == Action.IGNORE:
right._is_drawn = True
continue


raise ValueError(
"Duplicate Dataflow found between {} and {}: "
"{} is same as {}".format(
Expand Down Expand Up @@ -1087,7 1100,6 @@ def _stale(self, days):
print(f"Checking for code {days} days older than this model.")

for e in TM._elements:

for src in e.sourceFiles:
try:
src_mtime = datetime.fromtimestamp(
Expand Down Expand Up @@ -1164,6 1176,7 @@ def get_table(self, db, klass):
]
return db.define_table(name, fields)


class Controls:
"""Controls implemented by/on and Element"""

Expand Down Expand Up @@ -1256,15 1269,13 @@ def _attr_values(self):
result[i] = value
return result


def _safeset(self, attr, value):
try:
setattr(self, attr, value)
except ValueError:
pass



class Element:
"""A generic element"""

Expand Down Expand Up @@ -1303,7 1314,8 @@ def __init__(self, name, **kwargs):
self.controls = Controls()
self.uuid = uuid.UUID(int=random.getrandbits(128))
self._is_drawn = False
TM._elements.append(self)
if self.name != "created from Finding":
TM._elements.append(self)

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
Expand Down Expand Up @@ -1607,7 1619,7 @@ class Datastore(Asset):
* FILE_SYSTEM - files on a file system
* SQL - A SQL Database
* LDAP - An LDAP Server
* AWS_S3 - An S3 Bucket within AWS"""
* AWS_S3 - An S3 Bucket within AWS""",
)

def __init__(self, name, **kwargs):
Expand Down Expand Up @@ -1874,27 1886,29 @@ def serialize(obj, nested=False):
result[i.lstrip("_")] = value
return result


def encode_element_threat_data(obj):
"""Used to html encode threat data from a list of Elements"""
encoded_elements = []
if (type(obj) is not list):
raise ValueError("expecting a list value, got a {}".format(type(value)))
if type(obj) is not list:
raise ValueError("expecting a list value, got a {}".format(type(obj)))

for o in obj:
c = copy.deepcopy(o)
for a in o._attr_values():
if (a == "findings"):
encoded_findings = encode_threat_data(o.findings)
c._safeset("findings", encoded_findings)
c = copy.deepcopy(o)
for a in o._attr_values():
if a == "findings":
encoded_findings = encode_threat_data(o.findings)
c._safeset("findings", encoded_findings)
else:
v = getattr(o, a)
if (type(v) is not list or (type(v) is list and len(v) != 0)):
c._safeset(a, v)
encoded_elements.append(c)
v = getattr(o, a)
if type(v) is not list or (type(v) is list and len(v) != 0):
c._safeset(a, v)

encoded_elements.append(c)

return encoded_elements


def encode_threat_data(obj):
"""Used to html encode threat data from a list of threats or findings"""
encoded_threat_data = []
Expand Down Expand Up @@ -1955,7 1969,9 @@ def get_args():
"--describe", help="describe the properties available for a given element"
)
_parser.add_argument(
"--list-elements", action="store_true", help="list all elements which can be part of a threat model"
"--list-elements",
action="store_true",
help="list all elements which can be part of a threat model",
)
_parser.add_argument("--json", help="output a JSON file")
_parser.add_argument(
Expand Down