Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing loader imports #580

Merged
merged 4 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- jinja2 3.0.1 to 3.0.3 - [#562](https://github.com/jertel/elastalert2/pull/562) - @nsano-rururu
- Fix `get_rule_file_hash` TypeError - [#566](https://github.com/jertel/elastalert2/pull/566) - @JeffAshton
- Ensure `schema.yaml` stream closed - [#567](https://github.com/jertel/elastalert2/pull/567) - @JeffAshton
- Fixing `import` bugs & memory leak in `RulesLoader`/`FileRulesLoader` - [#580](https://github.com/jertel/elastalert2/pull/580) - @JeffAshton

# 2.2.3

Expand Down
50 changes: 34 additions & 16 deletions elastalert/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def get_import_rule(self, rule):
Retrieve the name of the rule to import.
:param dict rule: Rule dict
:return: rule name that will all `get_yaml` to retrieve the yaml of the rule
:rtype: str
:rtype: str or List[str]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As defined by schema:

"""
return rule['import']

Expand Down Expand Up @@ -240,28 +240,46 @@ def load_yaml(self, filename):
'rule_file': filename,
}

self.import_rules.pop(filename, None) # clear `filename` dependency
files_to_import = []
current_path = filename

# Imports are applied using a Depth First Search (DFS) traversal.
# If a rule defines multiple imports, both of which define the same value,
# the value from the later import will take precedent.
import_paths_stack = []

while True:
loaded = self.get_yaml(filename)
loaded = self.get_yaml(current_path)

# Special case for merging filters - if both files specify a filter merge (AND) them
if 'filter' in rule and 'filter' in loaded:
rule['filter'] = loaded['filter'] + rule['filter']

loaded.update(rule)
rule = loaded
if 'import' in rule:
# add all of the files to load into the load queue
files_to_import += self.get_import_rule(rule)
del (rule['import']) # or we could go on forever!
if len(files_to_import) > 0:
# set the next file to load
next_file_to_import = files_to_import.pop()
rules = self.import_rules.get(filename, [])
rules.append(next_file_to_import)
self.import_rules[filename] = rules
filename = next_file_to_import

if 'import' not in rule:
# clear import_rules cache for current path
self.import_rules.pop(current_path, None)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenario will this self.import_rules.pop statement actually pop something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the import is removed from a file. Added test case to cover this:

ef06201#diff-68599694211dc95012d74a98b54fd5f439eddb0d4ce257ba1a35a1772f243069R617

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now. It's great to have all this new test coverage, thanks for adding it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping this resolves the issue I'm seeing:

image

After rule updates, the yaml scanner explodes with warnings:

{
    "name": "py.warnings",
    "levelno": 30,
    "levelname": "WARNING",
    "pathname": "/usr/local/lib/python3.10/warnings.py",
    "filename": "warnings.py",
    "module": "warnings",
    "lineno": 109,
    "funcName": "_showwarnmsg",
    "created": 1638273031.6421432,
    "asctime": "2021-11-30 11:50:31,642",
    "msecs": 642.1432495117188,
    "relativeCreated": 492821989.7737503,
    "thread": 139802450061056,
    "threadName": "ThreadPoolExecutor-0_6",
    "process": 47,
    "message": "/usr/local/lib/python3.10/site-packages/yaml/scanner.py:286: ResourceWarning: unclosed <ssl.SSLSocket fd=386, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.31.38.167', 49134), raddr=('54.87.161.2', 443)>\n  for level in list(self.possible_simple_keys):\n"
}

With enough rule updates the CPU ends up pegged on computing sha1 hashes of the imports.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you automated your rule updates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We commit our rules to a git repository and I run cron job inside the docker container to update them.


else:
# read the set of import paths from the rule
import_paths = self.get_import_rule(rule)
if type(import_paths) is str:
import_paths = [import_paths]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bi-product of how they were using the += operator on the files_to_import list before:

files_to_import += self.get_import_rule(rule)

Example:

list = []

list += [ 'a', 'b' ]
print( list )

list += 'c'
print( list )

Outputs:

['a', 'b']
['a', 'b', 'c']


# remove the processed import property to prevent infinite loop
del (rule['import'])

# update import_rules cache for current path
self.import_rules[current_path] = import_paths

# push the imports paths onto the top of the stack
for import_path in import_paths:
import_paths_stack.append(import_path)

# pop the next import path from the top of the stack
if len(import_paths_stack) > 0:
current_path = import_paths_stack.pop()
else:
break

Expand Down Expand Up @@ -596,7 +614,7 @@ def get_import_rule(self, rule):
Allow for relative paths to the import rule.
:param dict rule:
:return: Path the import rule
:rtype: str
:rtype: List[str]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value, expanded_imports, is always a List[str].

"""
rule_imports = rule['import']
if type(rule_imports) is str:
Expand Down
173 changes: 104 additions & 69 deletions tests/loaders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,41 +512,25 @@ def test_load_yaml_recursive_import():
branch_path = os.path.join(loaders_test_cases_path, 'recursive_import/branch.yaml')
leaf_path = os.path.join(loaders_test_cases_path, 'recursive_import/leaf.yaml')

leaf_yaml = rules_loader.load_yaml(leaf_path)
assert leaf_yaml == {
'name': 'leaf',
'rule_file': leaf_path,
'diameter': '5cm',
}
assert sorted(rules_loader.import_rules.keys()) == [
branch_path,
leaf_path,
]
assert rules_loader.import_rules[branch_path] == [
trunk_path,
]
assert rules_loader.import_rules[leaf_path] == [
branch_path,
]

# reload the rule
leaf_yaml = rules_loader.load_yaml(leaf_path)
assert leaf_yaml == {
'name': 'leaf',
'rule_file': leaf_path,
'diameter': '5cm',
}
assert sorted(rules_loader.import_rules.keys()) == [
branch_path,
leaf_path,
]
assert rules_loader.import_rules[branch_path] == [
trunk_path,
trunk_path, # memory leak
]
assert rules_loader.import_rules[leaf_path] == [
branch_path,
]
# re-load the rule a couple times to ensure import_rules cache is updated correctly
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best visualized with side by side:

image

for i in range(3):

leaf_yaml = rules_loader.load_yaml(leaf_path)
assert leaf_yaml == {
'name': 'leaf',
'rule_file': leaf_path,
'diameter': '5cm',
}
assert sorted(rules_loader.import_rules.keys()) == [
branch_path,
leaf_path,
]
assert rules_loader.import_rules[branch_path] == [
trunk_path,
]
assert rules_loader.import_rules[leaf_path] == [
branch_path,
]


def test_load_yaml_multiple_imports():
Expand All @@ -557,38 +541,89 @@ def test_load_yaml_multiple_imports():
oxygen_path = os.path.join(loaders_test_cases_path, 'multiple_imports/oxygen.yaml')
water_path = os.path.join(loaders_test_cases_path, 'multiple_imports/water.yaml')

water_yaml = rules_loader.load_yaml(water_path)
assert water_yaml == {
'name': 'water',
'rule_file': water_path,
'symbol': 'O',
}
assert sorted(rules_loader.import_rules.keys()) == [
oxygen_path,
water_path,
]
assert rules_loader.import_rules[oxygen_path] == [
hydrogen_path,
]
assert rules_loader.import_rules[water_path] == [
oxygen_path,
]
# re-load the rule a couple times to ensure import_rules cache is updated correctly
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best visualized with side by side:

image

for i in range(3):

# reload the rule
water_yaml = rules_loader.load_yaml(water_path)
assert water_yaml == {
'name': 'water',
'rule_file': water_path,
'symbol': 'O',
}
assert sorted(rules_loader.import_rules.keys()) == [
oxygen_path,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

water_path,
]
assert rules_loader.import_rules[oxygen_path] == [
hydrogen_path,
hydrogen_path, # memory leak
]
assert rules_loader.import_rules[water_path] == [
oxygen_path,
]
water_yaml = rules_loader.load_yaml(water_path)
assert water_yaml == {
'name': 'water',
'rule_file': water_path,
'symbol': 'O',
}
assert sorted(rules_loader.import_rules.keys()) == [
water_path,
]
assert rules_loader.import_rules[water_path] == [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache no represents correct set of import paths:

hydrogen_path,
oxygen_path,
]


def test_load_yaml_imports_modified():
config = {}
rules_loader = FileRulesLoader(config)

rule_path = os.path.join(empty_folder_test_path, 'rule.yaml')
first_import_path = os.path.join(empty_folder_test_path, 'first.yaml')
second_import_path = os.path.join(empty_folder_test_path, 'second.yaml')

with mock.patch.object(rules_loader, 'get_yaml') as get_yaml:
get_yaml.side_effect = [
{
'name': 'rule',
'import': first_import_path,
},
{
'imported': 'first',
}
]
rule_yaml = rules_loader.load_yaml(rule_path)
assert rule_yaml == {
'name': 'rule',
'rule_file': rule_path,
'imported': 'first',
}
assert sorted(rules_loader.import_rules.keys()) == [
rule_path,
]
assert rules_loader.import_rules[rule_path] == [
first_import_path
]

# simulate the import changing
with mock.patch.object(rules_loader, 'get_yaml') as get_yaml:
get_yaml.side_effect = [
{
'name': 'rule',
'import': second_import_path,
},
{
'imported': 'second',
}
]
rule_yaml = rules_loader.load_yaml(rule_path)
assert rule_yaml == {
'name': 'rule',
'rule_file': rule_path,
'imported': 'second',
}
assert sorted(rules_loader.import_rules.keys()) == [
rule_path,
]
assert rules_loader.import_rules[rule_path] == [
second_import_path
]

# simulate the import being removed
with mock.patch.object(rules_loader, 'get_yaml') as get_yaml:
get_yaml.side_effect = [
{
'name': 'rule',
},
]
rule_yaml = rules_loader.load_yaml(rule_path)
assert rule_yaml == {
'name': 'rule',
'rule_file': rule_path,
}
assert len(rules_loader.import_rules) == 0