Skip to content

Commit

Permalink
chore: rename policy_break to secret
Browse files Browse the repository at this point in the history
  • Loading branch information
gg-mmill committed Dec 20, 2024
1 parent 270d9ba commit 9ba60b0
Show file tree
Hide file tree
Showing 17 changed files with 158 additions and 184 deletions.
2 changes: 1 addition & 1 deletion ggshield/core/text_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"warning": {"fg": "yellow"},
"heading": {"fg": "green"},
"incident_validity": {"fg": "bright_yellow", "bold": True},
"policy_break_type": {"fg": "bright_yellow", "bold": True},
"secret_type": {"fg": "bright_yellow", "bold": True},
"occurrence_count": {"fg": "bright_yellow", "bold": True},
"ignore_sha": {"fg": "bright_yellow", "bold": True},
"iac_vulnerability_critical": {"fg": "red", "bold": True},
Expand Down
2 changes: 1 addition & 1 deletion ggshield/verticals/secret/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def docker_scan_archive(
ui.display_heading(f"Scanning layer {info.diff_id}")
with ui.create_scanner_ui(file_count) as scanner_ui:
layer_results = scanner.scan(files, scanner_ui=scanner_ui)
if not layer_results.has_policy_breaks:
if not layer_results.has_secrets:
layer_id_cache.add(layer_id)
results.extend(layer_results)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
from .secret_output_handler import SecretOutputHandler


def format_policy_break(policy_break: Secret) -> str:
def format_secret(secret: Secret) -> str:
"""Returns a string with the policy name, validity and a comma-separated,
double-quoted, censored version of all `policy_break` matches.
double-quoted, censored version of all `secret` matches.
Looks like this:
PayPal OAuth2 Keys (Validity: Valid, id="aa*******bb", secret="cc******dd")
"""
match_str = ", ".join(
f'{x.match_type}: "{censor_match(x)}"' for x in policy_break.matches
f'{x.match_type}: "{censor_match(x)}"' for x in secret.matches
)
validity = translate_validity(policy_break.validity)
return f"{policy_break.break_type} (Validity: {validity}, {match_str})"
validity = translate_validity(secret.validity)
return f"{secret.break_type} (Validity: {validity}, {match_str})"


class SecretGitLabWebUIOutputHandler(SecretOutputHandler):
Expand All @@ -33,21 +33,17 @@ class SecretGitLabWebUIOutputHandler(SecretOutputHandler):
def _process_scan_impl(self, scan: SecretScanCollection) -> str:
results = scan.get_all_results()

policy_breaks_to_report = [
policy_break for result in results for policy_break in result.policy_breaks
]
secrets_to_report = [secret for result in results for secret in result.secrets]

# If no secrets or no new secrets were found
if len(policy_breaks_to_report) == 0:
if len(secrets_to_report) == 0:
return ""

# Use a set to ensure we do not report duplicate incidents.
# (can happen when the secret is present in both the old and the new version of
# the document)
formatted_policy_breaks = {
format_policy_break(x) for x in policy_breaks_to_report
}
break_count = len(formatted_policy_breaks)
formatted_secrets = {format_secret(x) for x in secrets_to_report}
break_count = len(formatted_secrets)

if self.ignore_known_secrets:
summary_str = f"{break_count} new {pluralize('incident', break_count)}"
Expand All @@ -58,7 +54,7 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
# do this because of a bug in GitLab Web IDE which causes newline characters to
# be shown as "<br>"
# https://gitlab.com/gitlab-org/gitlab/-/issues/350349
breaks_str = ", ".join(formatted_policy_breaks)
breaks_str = ", ".join(formatted_secrets)

return (
f"GL-HOOK-ERR: ggshield found {summary_str} in these changes: {breaks_str}."
Expand Down
50 changes: 24 additions & 26 deletions ggshield/verticals/secret/output/secret_json_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ def process_result(
"total_occurrences": 0,
"total_incidents": 0,
}
sha_dict = group_secrets_by_ignore_sha(result.policy_breaks)
sha_dict = group_secrets_by_ignore_sha(result.secrets)
result_dict["total_incidents"] = len(sha_dict)

if not self.show_secrets:
result.censor()

for ignore_sha, policy_breaks in sha_dict.items():
flattened_dict = self.serialized_policy_break(
ignore_sha, policy_breaks, incident_details
for ignore_sha, secrets in sha_dict.items():
flattened_dict = self.serialized_secret(
ignore_sha, secrets, incident_details
)
result_dict["incidents"].append(flattened_dict)
result_dict["total_occurrences"] += flattened_dict["total_occurrences"]
Expand All @@ -108,54 +108,52 @@ def process_error(error: Error) -> Dict[str, Any]:
}
return error_dict

def serialized_policy_break(
def serialized_secret(
self,
ignore_sha: str,
policy_breaks: List[Secret],
secrets: List[Secret],
incident_details: Dict[str, SecretIncident],
) -> Dict[str, Any]:
flattened_dict: Dict[str, Any] = {
"occurrences": [],
"ignore_sha": ignore_sha,
"policy": policy_breaks[0].policy,
"break_type": policy_breaks[0].break_type,
"total_occurrences": len(policy_breaks),
"policy": secrets[0].policy,
"break_type": secrets[0].break_type,
"total_occurrences": len(secrets),
}

if policy_breaks[0].validity:
flattened_dict["validity"] = policy_breaks[0].validity
if secrets[0].validity:
flattened_dict["validity"] = secrets[0].validity

if policy_breaks[0].known_secret:
flattened_dict["known_secret"] = policy_breaks[0].known_secret
flattened_dict["incident_url"] = policy_breaks[0].incident_url
assert policy_breaks[0].incident_url
details = incident_details.get(policy_breaks[0].incident_url)
if secrets[0].known_secret:
flattened_dict["known_secret"] = secrets[0].known_secret
flattened_dict["incident_url"] = secrets[0].incident_url
assert secrets[0].incident_url
details = incident_details.get(secrets[0].incident_url)
if details is not None:
flattened_dict["incident_details"] = details

if policy_breaks[0].ignore_reason is not None:
if secrets[0].ignore_reason is not None:
flattened_dict["ignore_reason"] = dataclasses.asdict(
policy_breaks[0].ignore_reason
secrets[0].ignore_reason
)

for policy_break in policy_breaks:
flattened_dict["occurrences"].extend(
self.serialize_policy_break_matches(policy_break)
)
for secret in secrets:
flattened_dict["occurrences"].extend(self.serialize_secret_matches(secret))

return flattened_dict

def serialize_policy_break_matches(
def serialize_secret_matches(
self,
policy_break: Secret,
secret: Secret,
) -> List[Dict[str, Any]]:
"""
Serialize policy_break matches. The method uses MatchSpan to get the start and
Serialize secret matches. The method uses MatchSpan to get the start and
end index of the match.
Returns a list of matches.
"""
matches_list: List[Dict[str, Any]] = []
for match in policy_break.matches:
for match in secret.matches:
assert isinstance(match, ExtendedMatch)

match_dict: Dict[str, Any] = {
Expand Down
2 changes: 1 addition & 1 deletion ggshield/verticals/secret/output/secret_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
raise NotImplementedError()

def _get_exit_code(self, scan: SecretScanCollection) -> ExitCode:
if scan.total_policy_breaks_count > 0:
if scan.total_secrets_count > 0:
return ExitCode.SCAN_FOUND_PROBLEMS
return ExitCode.SUCCESS
28 changes: 13 additions & 15 deletions ggshield/verticals/secret/output/secret_sarif_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,29 @@ def _create_sarif_results(
per policy break.
"""
for result in results:
for policy_break in result.policy_breaks:
yield _create_sarif_result_dict(result.url, policy_break, incident_details)
for secret in result.secrets:
yield _create_sarif_result_dict(result.url, secret, incident_details)


def _create_sarif_result_dict(
url: str,
policy_break: Secret,
secret: Secret,
incident_details: Dict[str, SecretIncident],
) -> Dict[str, Any]:
# Prepare message with links to the related location for each match
matches_str = ", ".join(
f"[{m.match_type}]({id})" for id, m in enumerate(policy_break.matches)
f"[{m.match_type}]({id})" for id, m in enumerate(secret.matches)
)
matches_li = "\n".join(
f"- [{m.match_type}]({id})" for id, m in enumerate(policy_break.matches)
)
extended_matches = cast(List[ExtendedMatch], policy_break.matches)
message = f"Secret detected: {policy_break.break_type}.\nMatches: {matches_str}"
markdown_message = (
f"Secret detected: {policy_break.break_type}\nMatches:\n{matches_li}"
f"- [{m.match_type}]({id})" for id, m in enumerate(secret.matches)
)
extended_matches = cast(List[ExtendedMatch], secret.matches)
message = f"Secret detected: {secret.break_type}.\nMatches: {matches_str}"
markdown_message = f"Secret detected: {secret.break_type}\nMatches:\n{matches_li}"

# Create dict
dct = {
"ruleId": policy_break.break_type,
"ruleId": secret.break_type,
"level": "error",
"message": {
"text": message,
Expand All @@ -97,12 +95,12 @@ def _create_sarif_result_dict(
for id, m in enumerate(extended_matches)
],
"partialFingerprints": {
"secret/v1": policy_break.get_ignore_sha(),
"secret/v1": secret.get_ignore_sha(),
},
}
if policy_break.incident_url:
dct["hostedViewerUri"] = policy_break.incident_url
details = incident_details.get(policy_break.incident_url)
if secret.incident_url:
dct["hostedViewerUri"] = secret.incident_url
details = incident_details.get(secret.incident_url)
if details is not None:
dct["properties"] = {"incidentDetails": details.to_dict()}
return dct
Expand Down
50 changes: 22 additions & 28 deletions ggshield/verticals/secret/output/secret_text_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
)

known_secrets_count = sum(
result.ignored_policy_breaks_count_by_kind.get(IgnoreKind.KNOWN_SECRET, 0)
result.ignored_secrets_count_by_kind.get(IgnoreKind.KNOWN_SECRET, 0)
for result in scan.get_all_results()
)
if self.ignore_known_secrets and known_secrets_count > 0:
Expand Down Expand Up @@ -120,27 +120,23 @@ def process_result(self, result: Result) -> str:
"""
result_buf = StringIO()

sha_dict = group_secrets_by_ignore_sha(result.policy_breaks)
sha_dict = group_secrets_by_ignore_sha(result.secrets)

if not self.show_secrets:
result.censor()

number_of_displayed_secrets = 0
number_of_hidden_secrets = sum(
result.ignored_policy_breaks_count_by_kind.values()
)
for ignore_sha, policy_breaks in sha_dict.items():
number_of_hidden_secrets = sum(result.ignored_secrets_count_by_kind.values())
for ignore_sha, secrets in sha_dict.items():
number_of_displayed_secrets += 1

result_buf.write(
policy_break_header(
policy_breaks, ignore_sha, policy_breaks[0].known_secret
)
secret_header(secrets, ignore_sha, secrets[0].known_secret)
)

result_buf.write(
leak_message_located(
flatten_policy_breaks_by_line(policy_breaks),
flatten_secrets_by_line(secrets),
result.is_on_patch,
clip_long_lines=not self.verbose,
)
Expand Down Expand Up @@ -256,17 +252,17 @@ def leak_message_located(
return leak_msg.getvalue()


def flatten_policy_breaks_by_line(
policy_breaks: List[Secret],
def flatten_secrets_by_line(
secrets: List[Secret],
) -> List[Tuple[Line, List[ExtendedMatch]]]:
"""
flatten_policy_breaks_by_line turns a list of policy breaks into a list of
flatten_secrets_by_line turns a list of policy breaks into a list of
tuples of (line, list of matches) mapping a line to a list of matches starting
at that line. The list is sorted by line number.
"""
flat_match_dict: Dict[Line, List[ExtendedMatch]] = dict()
for policy_break in policy_breaks:
for match in policy_break.matches:
for secret in secrets:
for match in secret.matches:
assert isinstance(match, ExtendedMatch)
for line in match.lines_before_secret + match.lines_after_secret:
if line not in flat_match_dict:
Expand All @@ -284,38 +280,36 @@ def flatten_policy_breaks_by_line(
return ordered_flat_match


def policy_break_header(
policy_breaks: List[Secret],
def secret_header(
secrets: List[Secret],
ignore_sha: str,
known_secret: bool = False,
) -> str:
"""
Build a header for the policy break.
"""
policy_break = policy_breaks[0]
secret = secrets[0]
indent = " "
validity_msg = (
f"\n{indent}Validity: {format_text(translate_validity(policy_break.validity), STYLE['incident_validity'])}"
if policy_break.validity
f"\n{indent}Validity: {format_text(translate_validity(secret.validity), STYLE['incident_validity'])}"
if secret.validity
else ""
)

start_line = format_text(">>", STYLE["detector_line_start"])
policy_break_type = format_text(policy_break.break_type, STYLE["policy_break_type"])
number_occurrences = format_text(str(len(policy_breaks)), STYLE["occurrence_count"])
secret_type = format_text(secret.break_type, STYLE["secret_type"])
number_occurrences = format_text(str(len(secrets)), STYLE["occurrence_count"])
ignore_sha = format_text(ignore_sha, STYLE["ignore_sha"])

message = f"""
{start_line} Secret detected: {policy_break_type}{validity_msg}
{start_line} Secret detected: {secret_type}{validity_msg}
{indent}Occurrences: {number_occurrences}
{indent}Known by GitGuardian dashboard: {"YES" if known_secret else "NO"}
{indent}Incident URL: {policy_breaks[0].incident_url if known_secret and policy_break.incident_url else "N/A"}
{indent}Incident URL: {secrets[0].incident_url if known_secret and secret.incident_url else "N/A"}
{indent}Secret SHA: {ignore_sha}
"""
if policy_break.ignore_reason is not None:
message += (
f"{indent}Ignored: {policy_break.ignore_reason.to_human_readable()}\n"
)
if secret.ignore_reason is not None:
message += f"{indent}Ignored: {secret.ignore_reason.to_human_readable()}\n"

return message + "\n"

Expand Down
Loading

0 comments on commit 9ba60b0

Please sign in to comment.