Skip to content
This repository has been archived by the owner on Aug 10, 2024. It is now read-only.

Commit

Permalink
Add test cases for AzKVKeyNoExpiryEvent
Browse files Browse the repository at this point in the history
Added test cases for the `AzKVKeyNoExpiryEvent` event plugin. An issue
with the event plugin was also identified where events were generated
even for the keys which are not enabled. This issue has also been fixed
in this commit and test cases have been added to cover the same.
  • Loading branch information
mitprasoon committed Mar 8, 2020
1 parent cdb9456 commit e57281e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 6 deletions.
11 changes: 5 additions & 6 deletions cloudmarker/events/azkvkeynoexpiryevent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Microsoft Azure Key Vault key expiry event.
This module defines the :class:`AzKVKeyNoExpiryEvent` class that
identifies Key Vault keys without expiry set. This plugin works on the
Key Vault key properties found in the ``ext`` bucket of ``key_vault_key``
records.
identifies Key Vault active (enabled) keys without expiry set. This
plugin works on the Key Vault key properties found in the ``ext``
bucket of ``key_vault_key`` records.
"""


Expand Down Expand Up @@ -45,9 +45,8 @@ def eval(self, record):
if ext.get('record_type') != 'key_vault_key':
return

if ext.get('enabled') and ext.get('expiry_set'):
return
yield from _get_key_vault_key_no_expiry_event(com, ext)
if ext.get('enabled') and (ext.get('expiry_set') is False):
yield from _get_key_vault_key_no_expiry_event(com, ext)

def done(self):
"""Perform cleanup work.
Expand Down
93 changes: 93 additions & 0 deletions cloudmarker/test/test_azkvkeynoexpiryevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Tests for AzKVKeyNoExpiryEvent plugin."""


import copy
import unittest

from cloudmarker.events import azkvkeynoexpiryevent

base_record = {
'com': {
'cloud_type': 'azure',
'record_type': 'key_vault_key'
},
'ext': {
'record_type': 'key_vault_key',
'enabled': True,
'expiry_set': False
}
}


class AzKVKeyNoExpiryEventTest(unittest.TestCase):
"""Tests for AzKVKeyNoExpiryEvent plugin."""

def test_com_bucket_missing(self):
record = copy.deepcopy(base_record)
record['com'] = None
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_com_bucket_cloud_type_non_azure(self):
record = copy.deepcopy(base_record)
record['com']['cloud_type'] = 'non_azure'
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_ext_bucket_missing(self):
record = copy.deepcopy(base_record)
record['ext'] = None
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_ext_bucket_record_type_non_key_vault_key(self):
record = copy.deepcopy(base_record)
record['ext']['record_type'] = 'non_key_vault_key'
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_key_not_enabled_expiry_not_set(self):
record = copy.deepcopy(base_record)
record['ext']['enabled'] = False
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_key_not_enabled_and_expiry_set(self):
record = copy.deepcopy(base_record)
record['ext']['enabled'] = False
record['ext']['expiry_set'] = True
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_key_enabled_and_expiry_set(self):
record = copy.deepcopy(base_record)
record['ext']['enabled'] = True
record['ext']['expiry_set'] = True
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(events, [])

def test_key_enabled_and_expiry_notset(self):
record = copy.deepcopy(base_record)
plugin = azkvkeynoexpiryevent. \
AzKVKeyNoExpiryEvent()
events = list(plugin.eval(record))
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['ext']['record_type'],
'key_vault_key_no_expiry_event')
self.assertEqual(events[0]['com']['record_type'],
'key_vault_key_no_expiry_event')
self.assertEqual(events[0]['com']['cloud_type'], 'azure')

0 comments on commit e57281e

Please sign in to comment.