This repository has been archived by the owner on Aug 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add AzWebAppClientCertEvent event plugin
The plugin `AzWebAppClientCertEvent` identifies an Azure web app with client certificates (mutual TLS) disabled. Client certificates (mTLS) allow for the app to request a certificate for incoming requests. Only clients that have a valid certificate will be able to reach the app.
- Loading branch information
1 parent
b548b7c
commit f093430
Showing
4 changed files
with
175 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
"""Microsoft web app client certificate event. | ||
This module defines the :class:`AzWebAppClientCertEvent` class that | ||
identifies a web app with client certificate (mutual TLS) disabled. | ||
This plugin works on the web apps config properties found in the | ||
``ext`` bucket of ``web_app_config`` records. | ||
""" | ||
|
||
|
||
import logging | ||
|
||
from cloudmarker import util | ||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
||
class AzWebAppClientCertEvent: | ||
"""Azure web app client certificate event plugin.""" | ||
|
||
def __init__(self): | ||
"""Create an instance of :class:`AzWebAppClientCertEvent`.""" | ||
|
||
def eval(self, record): | ||
"""Evaluate Azure web app to check if client cert is disabled. | ||
Arguments: | ||
record (dict): A web app record. | ||
Yields: | ||
dict: An event record representing a web app with client | ||
certificate (mTLS) disabled. | ||
""" | ||
com = record.get('com', {}) | ||
if com is None: | ||
return | ||
|
||
if com.get('cloud_type') != 'azure': | ||
return | ||
|
||
ext = record.get('ext', {}) | ||
if ext is None: | ||
return | ||
|
||
if ext.get('record_type') != 'web_app_config': | ||
return | ||
|
||
if ext.get('client_cert_enabled'): | ||
return | ||
|
||
yield from _get_azure_web_app_client_cert_event(com, ext) | ||
|
||
def done(self): | ||
"""Perform cleanup work. | ||
Currently, this method does nothing. This may change in future. | ||
""" | ||
|
||
|
||
def _get_azure_web_app_client_cert_event(com, ext): | ||
"""Generate Web App client certificate event. | ||
Arguments: | ||
com (dict): Azure web app record `com` bucket. | ||
ext (dict): Azure web app record `ext` bucket. | ||
Returns: | ||
dict: An event record representing web app with client | ||
certificate (mTLS) disabled | ||
""" | ||
friendly_cloud_type = util.friendly_string(com.get('cloud_type')) | ||
reference = com.get('reference') | ||
description = ( | ||
'{} web app {} has client certificate (mTLS) disabled.' | ||
.format(friendly_cloud_type, reference) | ||
) | ||
recommendation = ( | ||
'Check {} web app {} and enable client certificate (mTLS).' | ||
.format(friendly_cloud_type, reference) | ||
) | ||
|
||
event_record = { | ||
# Preserve the extended properties from the web app | ||
# record because they provide useful context to | ||
# locate the web app that led to the event. | ||
'ext': util.merge_dicts(ext, { | ||
'record_type': 'web_app_client_certificate_event' | ||
}), | ||
'com': { | ||
'cloud_type': com.get('cloud_type'), | ||
'record_type': 'web_app_client_certificate_event', | ||
'reference': reference, | ||
'description': description, | ||
'recommendation': recommendation, | ||
} | ||
} | ||
_log.info('Generating web_app_client_certificate_event; %r', event_record) | ||
yield event_record |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
"""Tests for AzWebAppClientCertEvent plugin.""" | ||
|
||
|
||
import copy | ||
import unittest | ||
|
||
from cloudmarker.events import azwebappclientcertevent | ||
|
||
base_record = { | ||
'ext': { | ||
'record_type': 'web_app_config', | ||
'cloud_type': 'azure', | ||
'client_cert_enabled': True | ||
}, | ||
'com': { | ||
'cloud_type': 'azure' | ||
} | ||
} | ||
|
||
|
||
class AzWebAppClientCertEventTest(unittest.TestCase): | ||
"""Tests for AzWebAppClientCertEvent plugin.""" | ||
|
||
def test_com_bucket_missing(self): | ||
record = copy.deepcopy(base_record) | ||
record['com'] = None | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(events, []) | ||
|
||
def test_cloud_type_non_azure(self): | ||
record = copy.deepcopy(base_record) | ||
record['com']['cloud_type'] = 'non_azure' | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(events, []) | ||
|
||
def test_ext_bucket_missing(self): | ||
record = copy.deepcopy(base_record) | ||
record['ext'] = None | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(events, []) | ||
|
||
def test_record_type_non_web_app_config(self): | ||
record = copy.deepcopy(base_record) | ||
record['ext']['record_type'] = 'non_web_app_config' | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(events, []) | ||
|
||
def test_client_cert_enabled(self): | ||
record = copy.deepcopy(base_record) | ||
record['ext']['client_cert_enabled'] = True | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(events, []) | ||
|
||
def test_client_cert_disabled(self): | ||
record = copy.deepcopy(base_record) | ||
record['ext']['client_cert_enabled'] = False | ||
plugin = azwebappclientcertevent.AzWebAppClientCertEvent() | ||
events = list(plugin.eval(record)) | ||
self.assertEqual(len(events), 1) | ||
self.assertEqual(events[0]['ext']['record_type'], | ||
'web_app_client_certificate_event') | ||
self.assertEqual(events[0]['com']['record_type'], | ||
'web_app_client_certificate_event') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters