From 7a5ff047f748aebb7a4422f6ecc20f364f008cde Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Mon, 12 Jul 2021 17:50:16 -0700 Subject: [PATCH] Adding optional response parameter to DataFlow to speed up writing A->B->A sequences --- example_OPA_orchestration.py | 31 +++++++++++++++++++++++ fluentm.py | 48 +++++++++++++++++++++++++++++++----- 2 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 example_OPA_orchestration.py diff --git a/example_OPA_orchestration.py b/example_OPA_orchestration.py new file mode 100644 index 0000000..882ab27 --- /dev/null +++ b/example_OPA_orchestration.py @@ -0,0 +1,31 @@ +from fluentm import Actor, Boundary, Process, DataFlow, TLS, HTTP, Internal, GIT, SSH +from fluentm import report + +Process("Policy Repo").inBoundary(Boundary("Version Control")) + +Boundary("New Kubernetes Cluster").inBoundary("New AWS Account") + +scenes={ + "Developer requests a new cluster":[ + DataFlow(Actor("Developer"), Process("Cluster Orchestrator").inBoundary(Boundary("Control Cluster")), TLS(HTTP("Create cluster request"))), + DataFlow(Process("Cluster Orchestrator"), Process("Cluster Orchestrator"), Internal("Validates user")), + DataFlow(Process("Cluster Orchestrator"), Process("IAM").inBoundary("AWS"), TLS(HTTP("Create new account")), response=TLS(HTTP("Account Details"))), + DataFlow(Process("Cluster Orchestrator"), Process("EKS").inBoundary("AWS"), TLS(HTTP("Create new cluster")), response=TLS(HTTP("Cluster Details"))), + DataFlow(Process("Cluster Orchestrator"), Process("k8s API").inBoundary(Boundary("New Kubernetes Cluster")), TLS(HTTP("Add default admission controller"))), + ], + "Developer creates a pod":[ + DataFlow(Actor("Developer"), Process("k8s API"), TLS(HTTP("Create POD"))), + DataFlow(Process("k8s API"), Process("Admission Controller").inBoundary("Cluster Orchestrator"), TLS(HTTP("Validate pod creation"))), + DataFlow(Process("Admission Controller"), Process("OPA").inBoundary("Cluster Orchestrator"), HTTP("Validate pod creation..")), + DataFlow(Process("OPA"), Process("Policy Repo"), SSH(GIT("Get latest policy")), response=SSH(GIT("Latest policy REGO"))), + DataFlow(Process("OPA"), Process("k8s API"), TLS(HTTP("Validation Decision"))), + DataFlow(Process("k8s API"), Actor("Developer"), TLS(HTTP("Approve/Denied"))) + ] + } + +if __name__ == "__main__": + r = report(scenes, outputDir="examples/OPA_orchestration", dfdLabels=True) + + + + diff --git a/fluentm.py b/fluentm.py index d61f45b..e3ce134 100644 --- a/fluentm.py +++ b/fluentm.py @@ -66,11 +66,11 @@ def flatString(self, depth=0, s=None): s = "" if isinstance(self, WrappableProtocol): - s += f"{self.__class__.__name__}(" + s += f"{self.__class__.__name__}( " if isinstance(self.wraps, WrappableProtocol): return self.wraps.flatString(depth=depth + 1, s=s) if isinstance(self.wraps, Data): - s += f"{self.wraps.name}{')'*(depth+1)}" + s += f"{self.wraps.name}{' )'*(depth+1)}" return s else: assert False, "Bad instance type in WrappableProtocol structure" @@ -91,6 +91,17 @@ def __init__(self, toWrap): version=None, ) +class Internal(WrappableProtocol): + def __init__(self, toWrap): + super().__init__( + toWrap, + encrypted=False, + serverAuthenticated=False, + clientAuthenticated=False, + serverCredential=None, # TODO: Replace with a type? Would that be useful? + clientCredential=None, + version=None + ) class IPSEC(WrappableProtocol): @@ -433,7 +444,8 @@ def __init__( catcher: Union[Actor, Process], data: Union[str, WrappableProtocol, Data], label: Union[str, None] = None, - credential: Union[Unset, Credential, None] = Unset() + credential: Union[Unset, Credential, None] = Unset(), + response: Union[WrappableProtocol, None] = None ): assert isinstance( pitcher, (Actor, Process) @@ -465,6 +477,9 @@ def __init__( else: name = wrappedData.getData().name + if response != None: + self.response = response + if name not in DataFlow._instances: self.pitcher = pitcher self.catcher = catcher @@ -485,7 +500,7 @@ def renderDfd(graph: Digraph, title: str, outputDir: str): def dfd(scenes: dict, title: str, dfdLabels=True, render=False): - graph = Digraph(title) + graph = Digraph(title) graph.attr(rankdir="LR", color="blue") graph.attr("node", fontname="Arial", fontsize="14") @@ -501,6 +516,7 @@ def dfd(scenes: dict, title: str, dfdLabels=True, render=False): flowCounter = 1 for flow in scenes[title]: + print(flow) for e in (flow.pitcher, flow.catcher): if e.name in nodes: # We've already placed this node in a previous flow @@ -511,8 +527,9 @@ def dfd(scenes: dict, title: str, dfdLabels=True, render=False): print(f"Walking: {ptr.boundary.name}") if ptr.boundary.name not in boundaryClusters: boundaryClusters[ptr.boundary.name] = Digraph(name=f"cluster_{ptr.boundary.name}", graph_attr=clusterAttr | {"label":ptr.boundary.name}) - - nodes[ptr.name] = boundaryClusters[ptr.boundary.name].node(ptr.name) + + if type(ptr) is not Boundary: + nodes[ptr.name] = boundaryClusters[ptr.boundary.name].node(ptr.name) # This is where nodes get added inside boundaries if hasattr(ptr.boundary, "boundary"): # See if this boundary, is also in a boundary if ptr.boundary.boundary.name not in boundaryClusters: @@ -553,13 +570,32 @@ def dataFlowTable(scenes: dict, key: str): "Data Flow": f.wrappedData.flatString(), } ) + flowCounter += 1 return table +def _mixinResponses(scenes, key): + newFlows = [] + for f in scenes[key]: + newFlows.append(f) + if hasattr(f, "response"): # If there's a response, insert it as a new DataFlow object + newFlows.append( + DataFlow( + f.catcher, + f.pitcher, + f.response + ) + ) + scenes[key][:] = newFlows + + def report(scenes: dict, outputDir: str, select=None, dfdLabels=True): if select is None: select = scenes.keys() + for key in scenes.keys(): + _mixinResponses(scenes, key) + sceneReports = {} for key in select: