From 8d0602f41e3520cf94108b7079a5ee85575ec053 Mon Sep 17 00:00:00 2001 From: Rob C Date: Mon, 19 Jul 2021 11:18:13 -0700 Subject: [PATCH] Refactored aggregated dfds into one dfd function --- fluentm.py | 133 ++++++++++++++--------------------------------- requirements.txt | 1 + 2 files changed, 41 insertions(+), 93 deletions(-) diff --git a/fluentm.py b/fluentm.py index eb7e87b..4a3b860 100644 --- a/fluentm.py +++ b/fluentm.py @@ -484,7 +484,6 @@ def get(name): # DataFlow is _NOT_ an Asset class DataFlow(object): - def __init__( self, pitcher: Union[Actor, Process], @@ -541,86 +540,7 @@ def renderDfd(graph: Digraph, title: str, outputDir: str): print(graph) return f"{title}-dfd.png" -def aggregatedDfd(scenes: dict): - print("### Starting Aggregated DFD ###") - graph = Digraph("Aggregated DFD") - graph.attr(rankdir="LR", color="blue") - graph.attr("node", fontname="Arial", fontsize="14") - - clusterAttr = { - "fontname": "Arial", - "fontsize": "12", - "color": "red", - "line": "dotted", - } - - boundaryClusters = {} - - # Track which nodes should be placed in which clusters but place neither until we've built the subgraph structure. - placements = {} - edges = {} - - # Gather the boundaries and understand how they're nested (but don't nest the graphviz objects ,yet) - # Graphviz subgraphs can't have nodes added, so you need to populate a graph with nodes first, then subgraph it under another graph - for scene in scenes: - # print(f"scene: {scene}") - for flow in scenes[scene]: - # print(flow) - for e in (flow.pitcher, flow.catcher): - if e.name in placements: - continue # skip to next loop - - ptr = e - while hasattr(ptr, "boundary"): - if ptr.boundary not in boundaryClusters: - boundaryClusters[ptr.boundary] = Digraph( - name=f"cluster_{ptr.boundary.name}", - graph_attr=clusterAttr | {"label": ptr.boundary.name}, - ) - ptr = ptr.boundary - - if hasattr(e, "boundary"): - placements[e.name] = boundaryClusters[e.boundary] - else: - placements[e.name] = graph - - # Figure out which edges we need to draw (We want double ended lines) - for flow in scenes[scene]: - # Look to see if the reverse flow is already there in reverse - # If it is, update the line description to say it should go both ways - f = (flow.pitcher.name, flow.catcher.name) # Directional flow - revf = (f[1],f[0]) # Reverse that directional flow - - # First, check if the flow is already in there but the other way around - if revf in edges: - edges[revf] = "BOTH" - elif f not in edges: - edges[f] = "LR" - - # Place nodes in Graphs, ready for subgraphing - print(placements) - for n in placements: - placements[n].node(n) - - # Subgraph the nodes - for c in boundaryClusters: - if hasattr(c, "boundary"): - boundaryClusters[c.boundary].subgraph(boundaryClusters[c]) - else: - graph.subgraph(boundaryClusters[c]) - - for edge in edges: - print(edge) - if edges[edge] == "LR": - graph.edge_attr.update(dir='forward') - graph.edge(edge[0],edge[1]) - elif edges[edge] == "BOTH": - graph.edge_attr.update(dir='both') - graph.edge(edge[0], edge[1]) - - return graph - -def dfd(scenes: dict, title: str, dfdLabels=True, render=False): +def dfd(scenes: dict, title: str, dfdLabels=True, render=False, simplified=False): graph = Digraph(title) graph.attr(rankdir="LR", color="blue") graph.attr("node", fontname="Arial", fontsize="14") @@ -671,15 +591,31 @@ def dfd(scenes: dict, title: str, dfdLabels=True, render=False): graph.subgraph(boundaryClusters[c]) # Add the edges - flowCounter = 1 - for flow in scenes[title]: - if dfdLabels is True: - graph.edge( - flow.pitcher.name, flow.catcher.name, f"({flowCounter}) {flow.name}" - ) - else: - graph.edge(flow.pitcher.name, flow.catcher.name, f"({flowCounter})") - flowCounter += 1 + + if simplified is True: + edges = {} # Map the edges and figure out if we need to be double or single ended + for flow in scenes[title]: + # This edge is flow.pitcher.name -> flow.catcher.name + # If we don't have this edge, first check to see if we have it the other way + if (flow.pitcher.name, flow.catcher.name) not in edges and (flow.catcher.name, flow.pitcher.name) not in edges: + edges[(flow.pitcher.name, flow.catcher.name)] = "forward" + elif (flow.pitcher.name, flow.catcher.name) not in edges and (flow.catcher.name, flow.pitcher.name) in edges: + edges[(flow.catcher.name, flow.pitcher.name)] = "both" + + for edge in edges: + print(edge) + graph.edge(edge[0], edge[1], dir=edges[edge]) + + else: #simplified is False + flowCounter = 1 + for flow in scenes[title]: + if dfdLabels is True: + graph.edge( + flow.pitcher.name, flow.catcher.name, f"({flowCounter}) {flow.name}" + ) + else: + graph.edge(flow.pitcher.name, flow.catcher.name, f"({flowCounter})") + flowCounter += 1 return graph @@ -727,18 +663,29 @@ def report(scenes: dict, outputDir: str, select=None, dfdLabels=True): "dfdImage": renderDfd(graph, key, outputDir=outputDir), "dataFlowTable": dataFlowTable(scenes, key), } - - agg = aggregatedDfd(scenes) + + compoundFlows = [] + for flow in scenes.values(): + compoundFlows = compoundFlows + flow + + agg = dfd({'all':compoundFlows}, 'all', simplified=True) aggDfd = { "graph": agg, "dfdImage": renderDfd(agg, "AggregatedDfd", outputDir=outputDir), } + templateLoader = FileSystemLoader(searchpath="./") templateEnv = Environment(loader=templateLoader) template = templateEnv.get_template("reportTemplate.html") with open(f"{outputDir}/ThreatModel.html", "w") as f: f.write( - template.render({"title": "Threat Models", "sceneReports": sceneReports, "aggregatedDfd":aggDfd}) + template.render( + { + "title": "Threat Models", + "sceneReports": sceneReports, + "aggregatedDfd": aggDfd, + } + ) ) diff --git a/requirements.txt b/requirements.txt index 63b5d50..d10eb08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ graphviz==0.16 Jinja2==3.0.1 +black==21.7b0 \ No newline at end of file