# -*- coding: utf-8 -*- import argparse import copy import datetime import json import logging import os import random import re import signal import sys import threading import time import timeit import traceback from email.mime.text import MIMEText from smtplib import SMTP from smtplib import SMTPException from socket import error import statsd import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from croniter import croniter from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import ElasticsearchException from elasticsearch.exceptions import NotFoundError from elasticsearch.exceptions import TransportError from elastalert.alerters.debug import DebugAlerter from elastalert.config import load_conf from elastalert.enhancements import DropMatchException from elastalert.kibana_discover import generate_kibana_discover_url from elastalert.kibana_external_url_formatter import create_kibana_external_url_formatter from elastalert.opensearch_discover import generate_opensearch_discover_url from elastalert.opensearch_external_url_formatter import create_opensearch_external_url_formatter from elastalert.prometheus_wrapper import PrometheusWrapper from elastalert.ruletypes import FlatlineRule from elastalert.util import (add_keyword_postfix, cronite_datetime_to_timestamp, dt_to_ts, dt_to_unix, EAException, elastalert_logger, elasticsearch_client, format_index, lookup_es_key, parse_deadline, parse_duration, pretty_ts, replace_dots_in_field_names, seconds, set_es_key, should_scrolling_continue, total_seconds, ts_add, ts_now, ts_to_dt, unix_to_dt, ts_utc_to_tz) class ElastAlerter(object): """ The main ElastAlert runner. This class holds all state about active rules, controls when queries are run, and passes information between rules and alerts. :param args: An argparse arguments instance. Should contain debug and start""" thread_data = threading.local() def parse_args(self, args): parser = argparse.ArgumentParser() parser.add_argument( '--config', action='store', dest='config', default="config.yaml", help='Global config file (default: config.yaml)') parser.add_argument('--debug', action='store_true', dest='debug', help='Suppresses alerts and prints information instead. ' 'Not compatible with `--verbose`') parser.add_argument('--rule', dest='rule', help='Run only a specific rule (by filename, must still be in rules folder)') parser.add_argument('--silence', dest='silence', help='Silence rule for a time period. Must be used with --rule. Usage: ' '--silence =, eg. --silence hours=2') parser.add_argument( "--silence_qk_value", dest="silence_qk_value", help="Silence the rule only for this specific query key value.", ) parser.add_argument('--start', dest='start', help='YYYY-MM-DDTHH:MM:SS Start querying from this timestamp. ' 'Use "NOW" to start from current time. (Default: present)') parser.add_argument('--end', dest='end', help='YYYY-MM-DDTHH:MM:SS Query to this timestamp. (Default: present)') parser.add_argument('--verbose', action='store_true', dest='verbose', help='Increase verbosity without suppressing alerts. ' 'Not compatible with `--debug`') parser.add_argument('--patience', action='store', dest='timeout', type=parse_duration, default=datetime.timedelta(), help='Maximum time to wait for ElasticSearch to become responsive. Usage: ' '--patience =. e.g. --patience minutes=5') parser.add_argument( '--pin_rules', action='store_true', dest='pin_rules', help='Stop ElastAlert from monitoring config file changes') parser.add_argument('--es_debug', action='store_true', dest='es_debug', help='Enable verbose logging from Elasticsearch queries') parser.add_argument( '--es_debug_trace', action='store', dest='es_debug_trace', help='Enable logging from Elasticsearch queries as curl command. Queries will be logged to file. Note that ' 'this will incorrectly display localhost:9200 as the host/port') parser.add_argument('--prometheus_port', type=int, dest='prometheus_port', help='Enables Prometheus metrics on specified port.') self.args = parser.parse_args(args) def __init__(self, args): self.es_clients = {} self.parse_args(args) self.debug = self.args.debug self.verbose = self.args.verbose if self.verbose and self.debug: elastalert_logger.info( "Note: --debug and --verbose flags are set. --debug takes precedent." ) if self.verbose or self.debug: elastalert_logger.setLevel(logging.INFO) if self.debug: elastalert_logger.info( """Note: In debug mode, alerts will be logged to console but NOT actually sent. To send them but remain verbose, use --verbose instead.""" ) if not self.args.es_debug: logging.getLogger('elasticsearch').setLevel(logging.WARNING) if self.args.es_debug_trace: tracer = logging.getLogger('elasticsearch.trace') tracer.setLevel(logging.INFO) tracer.addHandler(logging.FileHandler(self.args.es_debug_trace)) self.conf = load_conf(self.args) self.rules_loader = self.conf['rules_loader'] self.rules = self.rules_loader.load(self.conf, self.args) elastalert_logger.info(f'{len(self.rules)} rules loaded') self.max_query_size = self.conf['max_query_size'] self.scroll_keepalive = self.conf['scroll_keepalive'] self.writeback_index = self.conf['writeback_index'] self.run_every = self.conf['run_every'] self.alert_time_limit = self.conf['alert_time_limit'] self.old_query_limit = self.conf['old_query_limit'] self.disable_rules_on_error = self.conf['disable_rules_on_error'] self.notify_email = self.conf.get('notify_email', []) self.from_addr = self.conf.get('from_addr', 'ElastAlert') self.smtp_host = self.conf.get('smtp_host', 'localhost') self.max_aggregation = self.conf.get('max_aggregation', 10000) self.buffer_time = self.conf['buffer_time'] self.silence_cache = {} self.rule_hashes = self.rules_loader.get_hashes(self.conf, self.args.rule) self.starttime = self.args.start self.disabled_rules = [] self.replace_dots_in_field_names = self.conf.get('replace_dots_in_field_names', False) self.thread_data.alerts_sent = 0 self.thread_data.num_hits = 0 self.thread_data.num_dupes = 0 executors = { 'default': ThreadPoolExecutor(max_workers=self.conf.get('max_threads', 10)), } job_defaults = { 'misfire_grace_time': self.conf.get('misfire_grace_time', 5), 'coalesce': True, 'max_instances': 1 } self.scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults) self.string_multi_field_name = self.conf.get('string_multi_field_name', False) self.statsd_instance_tag = self.conf.get('statsd_instance_tag', '') self.statsd_host = self.conf.get('statsd_host', '') if self.statsd_host and len(self.statsd_host) > 0: self.statsd = statsd.StatsClient(host=self.statsd_host, port=8125) else: self.statsd = None self.add_metadata_alert = self.conf.get('add_metadata_alert', False) self.prometheus_port = self.args.prometheus_port self.show_disabled_rules = self.conf.get('show_disabled_rules', True) self.pretty_ts_format = self.conf.get('custom_pretty_ts_format') self.writeback_es = elasticsearch_client(self.conf) remove = [] for rule in self.rules: if 'is_enabled' in rule and not rule['is_enabled']: self.disabled_rules.append(rule) remove.append(rule) elif not self.init_rule(rule): remove.append(rule) list(map(self.rules.remove, remove)) if self.args.silence: self.silence() self.alert_lock = threading.Lock() @staticmethod def get_index(rule, starttime=None, endtime=None): """ Gets the index for a rule. If strftime is set and starttime and endtime are provided, it will return a comma seperated list of indices. If strftime is set but starttime and endtime are not provided, it will replace all format tokens with a wildcard. """ index = rule['index'] add_extra = rule.get('search_extra_index', False) if rule.get('use_strftime_index'): if starttime and endtime: return format_index(index, starttime, endtime, add_extra) else: # Replace the substring containing format characters with a * format_start = index.find('%') format_end = index.rfind('%') + 2 return index[:format_start] + '*' + index[format_end:] else: return index @staticmethod def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False): """ Returns a query dict that will apply a list of filters, filter by start and end time, and sort results by timestamp. :param filters: A list of Elasticsearch filters to use. :param starttime: A timestamp to use as the start time of the query. :param endtime: A timestamp to use as the end time of the query. :param sort: If true, sort results by timestamp. (Default True) :return: A query dictionary to pass to Elasticsearch. """ starttime = to_ts_func(starttime) endtime = to_ts_func(endtime) filters = copy.copy(filters) # ElastAlert documentation still specifies an old way of writing filters # This snippet of code converts it into the new standard new_filters = [] for es_filter in filters: if es_filter.get('query'): new_filters.append(es_filter['query']) else: new_filters.append(es_filter) es_filters = {'filter': {'bool': {'must': new_filters}}} if starttime and endtime: es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime, 'lte': endtime}}}) query = {'query': {'bool': es_filters}} if sort: query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}] return query def get_terms_query(self, query, rule, size, field): """ Takes a query generated by get_query and outputs a aggregation query """ query_element = query['query'] if 'sort' in query_element: query_element.pop('sort') aggs_query = query aggs_query['aggs'] = {'counts': {'terms': {'field': field, 'size': size, 'min_doc_count': rule.get('min_doc_count', 1)}}} return aggs_query def get_aggregation_query(self, query, rule, query_key, terms_size, timestamp_field='@timestamp'): """ Takes a query generated by get_query and outputs a aggregation query """ query_element = query['query'] if 'sort' in query_element: query_element.pop('sort') metric_agg_element = rule['aggregation_query_element'] bucket_interval_period = rule.get('bucket_interval_period') if bucket_interval_period is not None: aggs_element = { 'interval_aggs': { 'date_histogram': { 'field': timestamp_field, 'fixed_interval': bucket_interval_period}, 'aggs': metric_agg_element } } if rule.get('bucket_offset_delta'): aggs_element['interval_aggs']['date_histogram']['offset'] = '+%ss' % (rule['bucket_offset_delta']) else: aggs_element = metric_agg_element if query_key is not None: for idx, key in reversed(list(enumerate(query_key.split(',')))): aggs_element = {'bucket_aggs': {'terms': {'field': key, 'size': terms_size, 'min_doc_count': rule.get('min_doc_count', 1)}, 'aggs': aggs_element}} aggs_query = query aggs_query['aggs'] = aggs_element return aggs_query def get_index_start(self, index, timestamp_field='@timestamp'): """ Query for one result sorted by timestamp to find the beginning of the index. :param index: The index of which to find the earliest event. :return: Timestamp of the earliest event. """ query = {'sort': {timestamp_field: {'order': 'asc'}}} try: res = self.thread_data.current_es.search(index=index, size=1, body=query, _source_includes=[timestamp_field], ignore_unavailable=True) except ElasticsearchException as e: self.handle_error("Elasticsearch query error: %s" % (e), {'index': index, 'query': query}) return '1969-12-30T00:00:00Z' if len(res['hits']['hits']) == 0: # Index is completely empty, return a date before the epoch return '1969-12-30T00:00:00Z' return res['hits']['hits'][0][timestamp_field] @staticmethod def process_hits(rule, hits): """ Update the _source field for each hit received from ES based on the rule configuration. This replaces timestamps with datetime objects, folds important fields into _source and creates compound query_keys. :return: A list of processed _source dictionaries. """ processed_hits = [] for hit in hits: # Merge fields and _source hit.setdefault('_source', {}) for key, value in list(hit.get('fields', {}).items()): # Fields are returned as lists, assume any with length 1 are not arrays in _source # Except sometimes they aren't lists. This is dependent on ES version hit['_source'].setdefault(key, value[0] if type(value) is list and len(value) == 1 else value) # Convert the timestamp to a datetime ts = lookup_es_key(hit['_source'], rule['timestamp_field']) if not ts and not rule["_source_enabled"]: raise EAException( "Error: No timestamp was found for hit. '_source_enabled' is set to false, check your mappings for stored fields" ) set_es_key(hit['_source'], rule['timestamp_field'], rule['ts_to_dt'](ts)) set_es_key(hit, rule['timestamp_field'], lookup_es_key(hit['_source'], rule['timestamp_field'])) # Tack metadata fields into _source for field in ['_id', '_index', '_type']: if field in hit: hit['_source'][field] = hit[field] if rule.get('compound_query_key'): values = [lookup_es_key(hit['_source'], key) for key in rule['compound_query_key']] hit['_source'][rule['query_key']] = ', '.join([str(value) for value in values]) if rule.get('compound_aggregation_key'): values = [lookup_es_key(hit['_source'], key) for key in rule['compound_aggregation_key']] hit['_source'][rule['aggregation_key']] = ', '.join([str(value) for value in values]) processed_hits.append(hit['_source']) return processed_hits def get_hits(self, rule, starttime, endtime, index, scroll=False): """ Query Elasticsearch for the given rule and return the results. :param rule: The rule configuration. :param starttime: The earliest time to query. :param endtime: The latest time to query. :return: A list of hits, bounded by rule['max_query_size'] (or self.max_query_size). """ query = self.get_query( rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'], ) extra_args = {'_source_includes': rule['include']} scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive) if not rule.get('_source_enabled'): query['stored_fields'] = rule['include'] extra_args = {} if rule.get('include_fields', None) is not None: query['fields'] = rule['include_fields'] try: if scroll: res = self.thread_data.current_es.scroll(scroll_id=rule['scroll_id'], scroll=scroll_keepalive) else: res = self.thread_data.current_es.search( scroll=scroll_keepalive, index=index, size=rule.get('max_query_size', self.max_query_size), body=query, ignore_unavailable=True, **extra_args ) if '_scroll_id' in res: rule['scroll_id'] = res['_scroll_id'] self.thread_data.total_hits = int(res['hits']['total']['value']) if len(res.get('_shards', {}).get('failures', [])) > 0: try: errs = [e['reason']['reason'] for e in res['_shards']['failures'] if 'Failed to parse' in e['reason']['reason']] if len(errs): raise ElasticsearchException(errs) except (TypeError, KeyError): # Different versions of ES have this formatted in different ways. Fallback to str-ing the whole thing raise ElasticsearchException(str(res['_shards']['failures'])) elastalert_logger.debug(str(res)) except ElasticsearchException as e: # Elasticsearch sometimes gives us GIGANTIC error messages # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) self.handle_error('Error running query: %s' % (e), {'rule': rule['name'], 'query': query}) return None hits = res['hits']['hits'] self.thread_data.num_hits += len(hits) lt = rule.get('use_local_time') status_log = "Queried rule %s from %s to %s: %s / %s hits" % ( rule['name'], pretty_ts(starttime, lt, self.pretty_ts_format), pretty_ts(endtime, lt, self.pretty_ts_format), self.thread_data.num_hits, len(hits) ) if self.thread_data.total_hits > rule.get('max_query_size', self.max_query_size): elastalert_logger.info("%s (scrolling..)" % status_log) else: elastalert_logger.info(status_log) hits = self.process_hits(rule, hits) return hits def get_hits_count(self, rule, starttime, endtime, index): """ Query Elasticsearch for the count of results and returns a list of timestamps equal to the endtime. This allows the results to be passed to rules which expect an object for each hit. :param rule: The rule configuration dictionary. :param starttime: The earliest time to query. :param endtime: The latest time to query. :return: A dictionary mapping timestamps to number of hits for that time period. """ query = self.get_query( rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], ) es_client = self.thread_data.current_es try: res = es_client.count( index=index, body=query, ignore_unavailable=True ) except ElasticsearchException as e: # Elasticsearch sometimes gives us GIGANTIC error messages # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) self.handle_error('Error running count query: %s' % (e), {'rule': rule['name'], 'query': query}) return None self.thread_data.num_hits += res['count'] lt = rule.get('use_local_time') elastalert_logger.info( "Queried rule %s from %s to %s: %s hits" % (rule['name'], pretty_ts(starttime, lt, self.pretty_ts_format), pretty_ts(endtime, lt, self.pretty_ts_format), res['count']) ) return {endtime: res['count']} @staticmethod def query_key_filters(rule: dict, qk_value_csv: str) -> dict: if qk_value_csv is None: return # Split on comma followed by zero or more whitespace characters. It's # expected to be commaspace separated. However 76ab593 suggests there # are cases when it is only comma and not commaspace qk_values = re.split(r',\s*',qk_value_csv) end = '.keyword' query_keys = [] try: query_keys = rule['compound_query_key'] except KeyError: query_key = rule.get('query_key') if query_key is not None: query_keys.append(query_key) if len(qk_values) != len(query_keys): msg = ( f"Received {len(qk_values)} value(s) for {len(query_keys)} key(s)." f" Did '{qk_value_csv}' have a value with a comma?" " See https://github.com/jertel/elastalert2/pull/1330#issuecomment-1849962106" ) elastalert_logger.warning( msg ) for key, value in zip(query_keys, qk_values): if rule.get('raw_count_keys', True): if not key.endswith(end): key += end yield {'term': {key: value}} def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None): rule_filter = copy.copy(rule['filter']) for filter in self.query_key_filters(rule=rule, qk_value_csv=qk): rule_filter.append(filter) base_query = self.get_query( rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], ) if size is None: size = rule.get('terms_size', 50) query = self.get_terms_query(base_query, rule, size, key) try: res = self.thread_data.current_es.search(index=index, body=query, size=0, ignore_unavailable=True) except ElasticsearchException as e: # Elasticsearch sometimes gives us GIGANTIC error messages # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) self.handle_error('Error running terms query: %s' % (e), {'rule': rule['name'], 'query': query}) return None if 'aggregations' not in res: return {} buckets = res['aggregations']['counts']['buckets'] self.thread_data.num_hits += len(buckets) lt = rule.get('use_local_time') elastalert_logger.info( 'Queried rule %s from %s to %s: %s buckets' % ( rule['name'], pretty_ts(starttime, lt, self.pretty_ts_format), pretty_ts(endtime, lt, self.pretty_ts_format), len(buckets)) ) return {endtime: buckets} def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_size=None): rule_filter = copy.copy(rule['filter']) base_query = self.get_query( rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], ) if term_size is None: term_size = rule.get('terms_size', 50) query = self.get_aggregation_query(base_query, rule, query_key, term_size, rule['timestamp_field']) try: res = self.thread_data.current_es.search(index=index, body=query, size=0, ignore_unavailable=True) except ElasticsearchException as e: if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}) return None if 'aggregations' not in res: return {} payload = res['aggregations'] self.thread_data.num_hits += res['hits']['total']['value'] return {endtime: payload} def remove_duplicate_events(self, data, rule): new_events = [] for event in data: if event['_id'] in rule['processed_hits']: continue # Remember the new data's IDs rule['processed_hits'][event['_id']] = lookup_es_key(event, rule['timestamp_field']) new_events.append(event) return new_events def remove_old_events(self, rule): # Anything older than the buffer time we can forget now = ts_now() remove = [] buffer_time = rule.get('buffer_time', self.buffer_time) if rule.get('query_delay'): try: buffer_time += rule['query_delay'] except Exception as e: self.handle_error("[remove_old_events]Error parsing query_delay send time format %s" % e) for _id, timestamp in rule['processed_hits'].items(): if now - timestamp > buffer_time: remove.append(_id) list(map(rule['processed_hits'].pop, remove)) def run_query(self, rule, start=None, end=None, scroll=False): """ Query for the rule and pass all of the results to the RuleType instance. :param rule: The rule configuration. :param start: The earliest time to query. :param end: The latest time to query. Returns True on success and False on failure. """ if start is None: start = self.get_index_start(rule['index']) if end is None: end = ts_now() if rule.get('query_timezone'): elastalert_logger.info("Query start and end time converting UTC to query_timezone : {}".format(rule.get('query_timezone'))) start = ts_utc_to_tz(start, rule.get('query_timezone')) end = ts_utc_to_tz(end, rule.get('query_timezone')) # Reset hit counter and query rule_inst = rule['type'] rule['scrolling_cycle'] = rule.get('scrolling_cycle', 0) + 1 index = self.get_index(rule, start, end) if rule.get('use_count_query'): data = self.get_hits_count(rule, start, end, index) elif rule.get('use_terms_query'): data = self.get_hits_terms(rule, start, end, index, rule['query_key']) elif rule.get('aggregation_query_element'): data = self.get_hits_aggregation(rule, start, end, index, rule.get('query_key', None)) else: data = self.get_hits(rule, start, end, index, scroll) if data: old_len = len(data) data = self.remove_duplicate_events(data, rule) self.thread_data.num_dupes += old_len - len(data) # There was an exception while querying if data is None: return False elif data: if rule.get('use_count_query'): rule_inst.add_count_data(data) elif rule.get('use_terms_query'): rule_inst.add_terms_data(data) elif rule.get('aggregation_query_element'): rule_inst.add_aggregation_data(data) else: rule_inst.add_data(data) try: if rule.get('scroll_id') and self.thread_data.num_hits < self.thread_data.total_hits and should_scrolling_continue(rule): if not self.run_query(rule, start, end, scroll=True): return False except RuntimeError: # It's possible to scroll far enough to hit max recursive depth pass if 'scroll_id' in rule: scroll_id = rule.pop('scroll_id') try: self.thread_data.current_es.clear_scroll(scroll_id=scroll_id) except NotFoundError: pass return True def get_starttime(self, rule): """ Query ES for the last time we ran this rule. :param rule: The rule configuration. :return: A timestamp or None. """ sort = {'sort': {'@timestamp': {'order': 'desc'}}} query = {'query': {'bool': {'filter': {'term': {'rule_name': '%s' % (rule['name'])}}}}} query.update(sort) try: doc_type = 'elastalert_status' index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) res = self.writeback_es.search(index=index, size=1, body=query, _source_includes=['endtime', 'rule_name']) if res['hits']['hits']: endtime = ts_to_dt(res['hits']['hits'][0]['_source']['endtime']) if ts_now() - endtime < self.old_query_limit: return endtime else: elastalert_logger.info("Found expired previous run for %s at %s" % (rule['name'], endtime)) return None except (ElasticsearchException, KeyError) as e: self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']}) def set_starttime(self, rule, endtime): """ Given a rule and an endtime, sets the appropriate starttime for it. """ # This means we are starting fresh if 'starttime' not in rule: if not rule.get('scan_entire_timeframe'): # Try to get the last run from Elasticsearch last_run_end = self.get_starttime(rule) if last_run_end: rule['starttime'] = last_run_end self.adjust_start_time_for_overlapping_agg_query(rule) self.adjust_start_time_for_interval_sync(rule, endtime) rule['minimum_starttime'] = rule['starttime'] return None # Use buffer for normal queries, or run_every increments otherwise # or, if scan_entire_timeframe, use timeframe if not rule.get('use_count_query') and not rule.get('use_terms_query'): if not rule.get('scan_entire_timeframe'): buffer_time = rule.get('buffer_time', self.buffer_time) buffer_delta = endtime - buffer_time else: buffer_delta = endtime - rule['timeframe'] # If we started using a previous run, don't go past that if 'minimum_starttime' in rule and rule['minimum_starttime'] > buffer_delta: rule['starttime'] = rule['minimum_starttime'] # If buffer_time doesn't bring us past the previous endtime, use that instead elif 'previous_endtime' in rule and rule['previous_endtime'] < buffer_delta: rule['starttime'] = rule['previous_endtime'] self.adjust_start_time_for_overlapping_agg_query(rule) else: rule['starttime'] = buffer_delta self.adjust_start_time_for_interval_sync(rule, endtime) else: if not rule.get('scan_entire_timeframe'): # Query from the end of the last run, if it exists, otherwise a run_every sized window rule['starttime'] = rule.get('previous_endtime', endtime - self.run_every) else: #Based on PR 3141 old Yelp/elastalert - rschirin rule['starttime'] = endtime - rule['timeframe'] def adjust_start_time_for_overlapping_agg_query(self, rule): if rule.get('aggregation_query_element'): if rule.get('allow_buffer_time_overlap') and not rule.get('use_run_every_query_size') and ( rule['buffer_time'] > rule['run_every']): rule['starttime'] = rule['starttime'] - (rule['buffer_time'] - rule['run_every']) rule['original_starttime'] = rule['starttime'] def adjust_start_time_for_interval_sync(self, rule, endtime): # If aggregation query adjust bucket offset if rule.get('aggregation_query_element'): if rule.get('bucket_interval'): es_interval_delta = rule.get('bucket_interval_timedelta') unix_starttime = dt_to_unix(rule['starttime']) es_interval_delta_in_sec = total_seconds(es_interval_delta) offset = int(unix_starttime % es_interval_delta_in_sec) if rule.get('sync_bucket_interval'): rule['starttime'] = unix_to_dt(unix_starttime - offset) endtime = unix_to_dt(dt_to_unix(endtime) - offset) else: rule['bucket_offset_delta'] = offset def get_segment_size(self, rule): """ The segment size is either buffer_size for queries which can overlap or run_every for queries which must be strictly separate. This mimicks the query size for when ElastAlert is running continuously. """ if not rule.get('use_count_query') and not rule.get('use_terms_query') and not rule.get('aggregation_query_element'): return rule.get('buffer_time', self.buffer_time) elif rule.get('aggregation_query_element'): if rule.get('use_run_every_query_size'): return self.run_every else: return rule.get('buffer_time', self.buffer_time) else: return self.run_every def get_query_key_value(self, rule, match): # get the value for the match's query_key (or none) to form the key used for the silence_cache. # Flatline ruletype sets "key" instead of the actual query_key if isinstance(rule['type'], FlatlineRule) and 'key' in match: return str(match['key']) return self.get_named_key_value(rule, match, 'query_key') def get_aggregation_key_value(self, rule, match): # get the value for the match's aggregation_key (or none) to form the key used for grouped aggregates. return self.get_named_key_value(rule, match, 'aggregation_key') def get_named_key_value(self, rule, match, key_name): # search the match for the key specified in the rule to get the value if key_name in rule: try: key_value = lookup_es_key(match, rule[key_name]) if key_value is not None: # Only do the unicode conversion if we actually found something) # otherwise we might transform None --> 'None' key_value = str(key_value) except KeyError: # Some matches may not have the specified key # use a special token for these key_value = '_missing' else: key_value = None return key_value def enhance_filter(self, rule): """ If there is a blacklist or whitelist in rule then we add it to the filter. It adds it as a query_string. If there is already an query string its is appended with blacklist or whitelist. :param rule: :return: """ if not rule.get('filter_by_list', True): return if 'blacklist' in rule: listname = 'blacklist' elif 'whitelist' in rule: listname = 'whitelist' else: return filters = rule['filter'] additional_terms = [] for term in rule[listname]: if not term.startswith('/') or not term.endswith('/'): additional_terms.append(rule['compare_key'] + ':"' + term + '"') else: # These are regular expressions and won't work if they are quoted additional_terms.append(rule['compare_key'] + ':' + term) if listname == 'whitelist': query = "NOT " + " AND NOT ".join(additional_terms) else: query = " OR ".join(additional_terms) query_str_filter = {'query_string': {'query': query}} filters.append(query_str_filter) elastalert_logger.debug("Enhanced filter with {} terms: {}".format(listname, str(query_str_filter))) def get_elasticsearch_client(self, rule): key = rule['name'] es_client = self.es_clients.get(key) if es_client is None: es_client = elasticsearch_client(rule) self.es_clients[key] = es_client return es_client def run_rule(self, rule, endtime, starttime=None): """ Run a rule for a given time period, including querying and alerting on results. :param rule: The rule configuration. :param starttime: The earliest timestamp to query. :param endtime: The latest timestamp to query. :return: The number of matches that the rule produced. """ run_start = time.time() self.thread_data.current_es = self.get_elasticsearch_client(rule) # If there are pending aggregate matches, try processing them for x in range(len(rule['agg_matches'])): match = rule['agg_matches'].pop() self.add_aggregated_alert(match, rule) # Start from provided time if it's given if starttime: rule['starttime'] = starttime else: self.set_starttime(rule, endtime) rule['original_starttime'] = rule['starttime'] rule['scrolling_cycle'] = 0 self.thread_data.num_hits = 0 self.thread_data.num_dupes = 0 self.thread_data.cumulative_hits = 0 # Don't run if starttime was set to the future if ts_now() <= rule['starttime']: elastalert_logger.warning("Attempted to use query start time in the future (%s), sleeping instead" % (starttime)) return 0 # Run the rule. If querying over a large time period, split it up into segments segment_size = self.get_segment_size(rule) tmp_endtime = rule['starttime'] while endtime - rule['starttime'] > segment_size: tmp_endtime = tmp_endtime + segment_size if not self.run_query(rule, rule['starttime'], tmp_endtime): return 0 self.thread_data.cumulative_hits += self.thread_data.num_hits self.thread_data.num_hits = 0 rule['starttime'] = tmp_endtime rule['type'].garbage_collect(tmp_endtime) if rule.get('aggregation_query_element'): if endtime - tmp_endtime == segment_size: if not self.run_query(rule, tmp_endtime, endtime): return 0 self.thread_data.cumulative_hits += self.thread_data.num_hits elif total_seconds(rule['original_starttime'] - tmp_endtime) == 0: rule['starttime'] = rule['original_starttime'] return 0 else: endtime = tmp_endtime else: if not self.run_query(rule, rule['starttime'], endtime): return 0 self.thread_data.cumulative_hits += self.thread_data.num_hits rule['type'].garbage_collect(endtime) # Process any new matches num_matches = len(rule['type'].matches) while rule['type'].matches: match = rule['type'].matches.pop(0) match['num_hits'] = self.thread_data.cumulative_hits match['num_matches'] = num_matches # If realert is set, silence the rule for that duration # Silence is cached by query_key, if it exists # Default realert time is 0 seconds silence_cache_key = rule['realert_key'] query_key_value = self.get_query_key_value(rule, match) if query_key_value is not None: silence_cache_key += '.' + query_key_value if self.is_silenced(rule['name'] + "._silence") or self.is_silenced(silence_cache_key): elastalert_logger.info('Ignoring match for silenced rule %s' % (silence_cache_key,)) continue if rule['realert']: next_alert, exponent = self.next_alert_time(rule, silence_cache_key, ts_now()) self.set_realert(silence_cache_key, next_alert, exponent) if rule.get('run_enhancements_first'): try: for enhancement in rule['match_enhancements']: try: enhancement.process(match) except EAException as e: self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}) except DropMatchException: continue # If no aggregation, alert immediately if not rule['aggregation']: self.alert([match], rule) continue # Add it as an aggregated match self.add_aggregated_alert(match, rule) # Mark this endtime for next run's start rule['previous_endtime'] = endtime time_taken = time.time() - run_start # Write to ES that we've run this rule against this time period body = {'rule_name': rule['name'], 'endtime': endtime, 'starttime': rule['original_starttime'], 'matches': num_matches, 'hits': max(self.thread_data.num_hits, self.thread_data.cumulative_hits), '@timestamp': ts_now(), 'time_taken': time_taken} self.writeback('elastalert_status', body) # Write metrics about the run to statsd if self.statsd: try: self.statsd.gauge( 'rule.time_taken', time_taken, tags={"elastalert_instance": self.statsd_instance_tag, "rule_name": rule['name']}) self.statsd.gauge( 'query.hits', self.thread_data.num_hits, tags={"elastalert_instance": self.statsd_instance_tag, "rule_name": rule['name']}) self.statsd.gauge( 'already_seen.hits', self.thread_data.num_dupes, tags={"elastalert_instance": self.statsd_instance_tag, "rule_name": rule['name']}) self.statsd.gauge( 'query.matches', num_matches, tags={"elastalert_instance": self.statsd_instance_tag, "rule_name": rule['name']}) self.statsd.gauge( 'query.alerts_sent', self.thread_data.alerts_sent, tags={"elastalert_instance": self.statsd_instance_tag, "rule_name": rule['name']}) except BaseException as e: elastalert_logger.error("unable to send metrics:\n%s" % str(e)) return num_matches def init_rule(self, new_rule, new=True): ''' Copies some necessary non-config state from an exiting rule to a new rule. ''' if not new and self.scheduler.get_job(job_id=new_rule['name']): self.scheduler.remove_job(job_id=new_rule['name']) self.enhance_filter(new_rule) # Change top_count_keys to .raw if 'top_count_keys' in new_rule and new_rule.get('raw_count_keys', True): if self.string_multi_field_name: string_multi_field_name = self.string_multi_field_name else: string_multi_field_name = '.keyword' for i, key in enumerate(new_rule['top_count_keys']): if not key.endswith(string_multi_field_name): new_rule['top_count_keys'][i] += string_multi_field_name blank_rule = {'agg_matches': [], 'aggregate_alert_time': {}, 'current_aggregate_id': {}, 'processed_hits': {}, 'run_every': self.run_every, 'has_run_once': False} rule = blank_rule # Set rule to either a blank template or existing rule with same name if not new: for rule in self.rules: if rule['name'] == new_rule['name']: break else: rule = blank_rule copy_properties = ['agg_matches', 'current_aggregate_id', 'aggregate_alert_time', 'processed_hits', 'starttime', 'minimum_starttime', 'has_run_once'] for prop in copy_properties: if prop not in rule: continue new_rule[prop] = rule[prop] job = self.scheduler.add_job(self.handle_rule_execution, 'interval', args=[new_rule], seconds=new_rule['run_every'].total_seconds(), id=new_rule['name'], name="Rule: %s" % (new_rule['name']), max_instances=1, jitter=5) job.modify(next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=random.randint(0, 15))) return new_rule def load_rule_changes(self): """ Using the modification times of rule config files, syncs the running rules to match the files in rules_folder by removing, adding or reloading rules. """ new_rule_hashes = self.rules_loader.get_hashes(self.conf, self.args.rule) # Check each current rule for changes for rule_file, hash_value in self.rule_hashes.items(): if rule_file not in new_rule_hashes: # Rule file was deleted elastalert_logger.info('Rule file %s not found, stopping rule execution' % (rule_file)) for rule in self.rules: if rule['rule_file'] == rule_file: break else: continue self.scheduler.remove_job(job_id=rule['name']) self.rules.remove(rule) continue if hash_value != new_rule_hashes[rule_file]: # Rule file was changed, reload rule try: new_rule = self.rules_loader.load_configuration(rule_file, self.conf) if not new_rule: elastalert_logger.error('Invalid rule file skipped: %s' % rule_file) continue if 'is_enabled' in new_rule and not new_rule['is_enabled']: elastalert_logger.info('Rule file %s is now disabled.' % (rule_file)) # Remove this rule if it's been disabled self.rules = [rule for rule in self.rules if rule['rule_file'] != rule_file] # Stop job if is running if self.scheduler.get_job(job_id=new_rule['name']): self.scheduler.remove_job(job_id=new_rule['name']) # Append to disabled_rule for disabled_rule in self.disabled_rules: if disabled_rule['name'] == new_rule['name']: break else: self.disabled_rules.append(new_rule) continue except EAException as e: message = 'Could not load rule %s: %s' % (rule_file, e) self.handle_error(message) # Want to send email to address specified in the rule. Try and load the YAML to find it. try: rule_yaml = self.rules_loader.load_yaml(rule_file) except EAException: self.send_notification_email(exception=e) continue self.send_notification_email(exception=e, rule=rule_yaml) continue elastalert_logger.info("Reloading configuration for rule %s" % (rule_file)) # Re-enable if rule had been disabled for disabled_rule in self.disabled_rules: if disabled_rule['name'] == new_rule['name']: self.disabled_rules.remove(disabled_rule) break # Initialize the rule that matches rule_file new_rule = self.init_rule(new_rule, False) self.rules = [rule for rule in self.rules if rule['rule_file'] != rule_file] if new_rule: self.rules.append(new_rule) # Load new rules if not self.args.rule: for rule_file in set(new_rule_hashes.keys()) - set(self.rule_hashes.keys()): try: new_rule = self.rules_loader.load_configuration(rule_file, self.conf) if not new_rule: elastalert_logger.error('Invalid rule file skipped: %s' % rule_file) continue if 'is_enabled' in new_rule and not new_rule['is_enabled']: continue if new_rule['name'] in [rule['name'] for rule in self.rules]: raise EAException("A rule with the name %s already exists" % (new_rule['name'])) except EAException as e: self.handle_error('Could not load rule %s: %s' % (rule_file, e)) self.send_notification_email(exception=e, rule_file=rule_file) continue if self.init_rule(new_rule): elastalert_logger.info('Loaded new rule %s' % (rule_file)) if new_rule['name'] in self.es_clients: self.es_clients.pop(new_rule['name']) self.rules.append(new_rule) self.rule_hashes = new_rule_hashes def start(self): """ Periodically go through each rule and run it """ if self.starttime: if self.starttime == 'NOW': self.starttime = ts_now() else: try: self.starttime = ts_to_dt(self.starttime) except (TypeError, ValueError): self.handle_error("%s is not a valid ISO8601 timestamp (YYYY-MM-DDTHH:MM:SS+XX:00)" % (self.starttime)) exit(1) for rule in self.rules: rule['initial_starttime'] = self.starttime self.wait_until_responsive(timeout=self.args.timeout) self.running = True elastalert_logger.info("Starting up") self.scheduler.add_job(self.handle_pending_alerts, 'interval', seconds=self.run_every.total_seconds(), id='_internal_handle_pending_alerts', name='Internal: Handle Pending Alerts') self.scheduler.add_job(self.handle_config_change, 'interval', seconds=self.run_every.total_seconds(), id='_internal_handle_config_change', name='Internal: Handle Config Change') self.scheduler.start() while self.running: next_run = datetime.datetime.now(tz=datetime.UTC) + self.run_every # Quit after end_time has been reached if self.args.end: endtime = ts_to_dt(self.args.end) next_run_dt = next_run.replace(tzinfo=datetime.timezone.utc) if next_run_dt > endtime: elastalert_logger.info("End time '%s' falls before the next run time '%s', exiting." % (endtime, next_run_dt)) exit(0) if next_run < datetime.datetime.now(tz=datetime.UTC): continue # Show disabled rules if self.show_disabled_rules: elastalert_logger.info("Disabled rules are: %s" % (str(self.get_disabled_rules()))) # Wait before querying again sleep_duration = total_seconds(next_run - datetime.datetime.now(tz=datetime.UTC)) self.sleep_for(sleep_duration) def wait_until_responsive(self, timeout, clock=timeit.default_timer): """Wait until ElasticSearch becomes responsive (or too much time passes).""" # Elapsed time is a floating point number of seconds. timeout = timeout.total_seconds() # Don't poll unless we're asked to. if timeout <= 0.0: return # Periodically poll ElasticSearch. Keep going until ElasticSearch is # responsive *and* the writeback index exists. ref = clock() while (clock() - ref) < timeout: try: if self.writeback_es.indices.exists(self.writeback_index): return except ConnectionError: pass time.sleep(1.0) if self.writeback_es.ping(): elastalert_logger.error( 'Writeback index "%s" does not exist, did you run `elastalert-create-index`?', self.writeback_index, ) else: elastalert_logger.error( 'Could not reach ElasticSearch at "%s:%d".', self.conf['es_host'], self.conf['es_port'], ) exit(1) def run_all_rules(self): """ Run each rule one time """ self.handle_pending_alerts() for rule in self.rules: self.handle_rule_execution(rule) self.handle_config_change() def handle_pending_alerts(self): self.thread_data.alerts_sent = 0 self.send_pending_alerts() elastalert_logger.info("Background alerts thread %s pending alerts sent at %s" % ( self.thread_data.alerts_sent, pretty_ts(ts_now(), ts_format=self.pretty_ts_format))) def handle_config_change(self): if not self.args.pin_rules: self.load_rule_changes() elastalert_logger.info( "Background configuration change check run at %s" % (pretty_ts(ts_now(), ts_format=self.pretty_ts_format))) def handle_rule_execution(self, rule): self.thread_data.alerts_sent = 0 next_run = datetime.datetime.now(tz=datetime.UTC) + rule['run_every'] # Set endtime based on the rule's delay delay = rule.get('query_delay') if hasattr(self.args, 'end') and self.args.end: endtime = ts_to_dt(self.args.end) elif delay: try: endtime = ts_now() - delay except Exception as e: self.handle_error("[handle_rule_execution]Error parsing query_delay send time format %s" % e) else: endtime = ts_now() # Apply rules based on execution time limits if rule.get('limit_execution'): rule['next_starttime'] = None rule['next_min_starttime'] = None exec_next = next(croniter(rule['limit_execution'])) endtime_epoch = dt_to_unix(endtime) # If the estimated next endtime (end + run_every) isn't at least a minute past the next exec time # That means that we need to pause execution after this run if endtime_epoch + rule['run_every'].total_seconds() < exec_next - 59: # apscheduler requires pytz tzinfos, so don't use unix_to_dt here! rule['next_starttime'] = datetime.datetime.fromtimestamp(exec_next, tz=datetime.UTC).replace(tzinfo=pytz.utc) if rule.get('limit_execution_coverage'): rule['next_min_starttime'] = rule['next_starttime'] if not rule['has_run_once']: self.reset_rule_schedule(rule) return rule['has_run_once'] = True try: num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime')) except EAException as e: self.handle_error("Error running rule %s: %s" % (rule['name'], e), {'rule': rule['name']}) except Exception as e: self.handle_uncaught_exception(e, rule) else: old_starttime = pretty_ts(rule.get('original_starttime'), rule.get('use_local_time'), self.pretty_ts_format) elastalert_logger.info("Ran %s from %s to %s: %s query hits (%s already seen), %s matches," " %s alerts sent" % (rule['name'], old_starttime, pretty_ts(endtime, rule.get('use_local_time'), self.pretty_ts_format), self.thread_data.num_hits, self.thread_data.num_dupes, num_matches, self.thread_data.alerts_sent)) rule_duration = seconds(endtime - rule.get('original_starttime')) elastalert_logger.info("%s range %s" % (rule['name'], rule_duration)) self.thread_data.alerts_sent = 0 if next_run < datetime.datetime.now(tz=datetime.UTC): # We were processing for longer than our refresh interval # This can happen if --start was specified with a large time period # or if we are running too slow to process events in real time. elastalert_logger.warning( "Querying from %s to %s took longer than %s!" % ( old_starttime, pretty_ts(endtime, rule.get('use_local_time'), self.pretty_ts_format), self.run_every ) ) rule['initial_starttime'] = None self.remove_old_events(rule) self.reset_rule_schedule(rule) def reset_rule_schedule(self, rule): # We hit the end of a execution schedule, pause ourselves until next run if rule.get('limit_execution') and rule['next_starttime']: self.scheduler.modify_job(job_id=rule['name'], next_run_time=rule['next_starttime']) # If we are preventing covering non-scheduled time periods, reset min_starttime and previous_endtime if rule['next_min_starttime']: rule['minimum_starttime'] = rule['next_min_starttime'] rule['previous_endtime'] = rule['next_min_starttime'] elastalert_logger.info('Pausing %s until next run at %s' % ( rule['name'], pretty_ts(rule['next_starttime'], ts_format=self.pretty_ts_format))) def stop(self): """ Stop an ElastAlert runner that's been started """ self.running = False def get_disabled_rules(self): """ Return disabled rules """ return [rule['name'] for rule in self.disabled_rules] def sleep_for(self, duration): """ Sleep for a set duration """ elastalert_logger.info("Sleeping for %s seconds" % (duration)) time.sleep(duration) def alert(self, matches, rule, alert_time=None, retried=False): """ Wraps alerting, Kibana linking and enhancements in an exception handler """ try: return self.send_alert(matches, rule, alert_time=alert_time, retried=retried) except Exception as e: self.handle_uncaught_exception(e, rule) def send_alert(self, matches, rule, alert_time=None, retried=False): """ Send out an alert. :param matches: A list of matches. :param rule: A rule configuration. """ if not matches: return if alert_time is None: alert_time = ts_now() # Compute top count keys if rule.get('top_count_keys'): for match in matches: if 'query_key' in rule: qk = lookup_es_key(match, rule['query_key']) else: qk = None if isinstance(rule['type'], FlatlineRule): # flatline rule triggers when there have been no events from now()-timeframe to now(), # so using now()-timeframe will return no results. for now we can just mutliple the timeframe # by 2, but this could probably be timeframe+run_every to prevent too large of a lookup? timeframe = datetime.timedelta(seconds=2 * rule.get('timeframe').total_seconds()) else: timeframe = rule.get('timeframe', datetime.timedelta(minutes=10)) start = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) - timeframe end = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) + datetime.timedelta(minutes=10) keys = rule.get('top_count_keys') counts = self.get_top_counts(rule, start, end, keys, qk=qk) match.update(counts) if rule.get('generate_kibana_discover_url'): kb_link = generate_kibana_discover_url(rule, matches[0]) if kb_link: kb_link_formatter = self.get_kibana_discover_external_url_formatter(rule) matches[0]['kibana_discover_url'] = kb_link_formatter.format(kb_link) if rule.get('generate_opensearch_discover_url'): opsh_link = generate_opensearch_discover_url(rule, matches[0]) if opsh_link: opsh_link_formatter = self.get_opensearch_discover_external_url_formatter(rule) matches[0]['opensearch_discover_url'] = opsh_link_formatter.format(opsh_link) # Enhancements were already run at match time if # run_enhancements_first is set or # retried==True, which means this is a retry of a failed alert if not rule.get('run_enhancements_first') and not retried: for enhancement in rule['match_enhancements']: valid_matches = [] for match in matches: try: enhancement.process(match) valid_matches.append(match) except DropMatchException: pass except EAException as e: self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}) matches = valid_matches if not matches: return None # Don't send real alerts in debug mode if self.debug: alerter = DebugAlerter(rule) alerter.alert(matches) return None # Run the alerts alert_sent = False alert_exception = None # Alert.pipeline is a single object shared between every alerter # This allows alerters to pass objects and data between themselves alert_pipeline = {"alert_time": alert_time} for alert in rule['alert']: alert.pipeline = alert_pipeline try: alert.alert(matches) except EAException as e: self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']}) alert_exception = str(e) else: self.thread_data.alerts_sent += 1 alert_sent = True # Write the alert(s) to ES agg_id = None for match in matches: alert_body = self.get_alert_body(match, rule, alert_sent, alert_time, alert_exception) # Set all matches to aggregate together if agg_id: alert_body['aggregate_id'] = agg_id res = self.writeback('elastalert', alert_body, rule) if res and not agg_id: agg_id = res['_id'] def get_alert_body(self, match, rule, alert_sent, alert_time, alert_exception=None): body = { 'match_body': match, 'rule_name': rule['name'], 'alert_info': rule['alert'][0].get_info() if not self.debug else {}, 'alert_sent': alert_sent, 'alert_time': alert_time } if rule.get('include_match_in_root'): body.update({k: v for k, v in match.items() if not k.startswith('_')}) if self.add_metadata_alert: body['category'] = rule['category'] body['description'] = rule['description'] body['owner'] = rule['owner'] body['priority'] = rule['priority'] match_time = lookup_es_key(match, rule['timestamp_field']) if match_time is not None: body['match_time'] = match_time # TODO record info about multiple alerts # If the alert failed to send, record the exception if not alert_sent: body['alert_exception'] = alert_exception return body def get_kibana_discover_external_url_formatter(self, rule): """ Gets or create the external url formatter for kibana discover links """ key = '__kibana_discover_external_url_formatter__' formatter = rule.get(key) if formatter is None: shorten = rule.get('shorten_kibana_discover_url') security_tenant = rule.get('kibana_discover_security_tenant') formatter = create_kibana_external_url_formatter(rule, shorten, security_tenant) rule[key] = formatter return formatter def get_opensearch_discover_external_url_formatter(self, rule): """ Gets or create the external url formatter for Opensearch discover links """ key = '__opensearch_discover_external_url_formatter__' formatter = rule.get(key) if formatter is None: formatter = create_opensearch_external_url_formatter(rule) rule[key] = formatter return formatter def writeback(self, doc_type, body, rule=None, match_body=None): # ES 2.0 - 2.3 does not support dots in field names. if self.replace_dots_in_field_names: writeback_body = replace_dots_in_field_names(body) else: writeback_body = body for key in list(writeback_body.keys()): # Convert any datetime objects to timestamps if isinstance(writeback_body[key], datetime.datetime): writeback_body[key] = dt_to_ts(writeback_body[key]) if self.debug: elastalert_logger.info("Skipping writing to ES: %s" % (writeback_body)) return None if '@timestamp' not in writeback_body: writeback_body['@timestamp'] = dt_to_ts(ts_now()) try: index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) res = self.writeback_es.index(index=index, body=body) return res except ElasticsearchException as e: elastalert_logger.exception("Error writing alert info to Elasticsearch: %s" % (e)) def find_recent_pending_alerts(self, time_limit): """ Queries writeback_es to find alerts that did not send and are newer than time_limit """ # XXX only fetches 1000 results. If limit is reached, next loop will catch them # unless there is constantly more than 1000 alerts to send. # Fetch recent, unsent alerts that aren't part of an aggregate, earlier alerts first. inner_query = {'query_string': {'query': '!_exists_:aggregate_id AND alert_sent:false'}} time_filter = {'range': {'alert_time': {'from': dt_to_ts(ts_now() - time_limit), 'to': dt_to_ts(ts_now())}}} sort = {'sort': {'alert_time': {'order': 'asc'}}} query = {'query': {'bool': {'must': inner_query, 'filter': time_filter}}} query.update(sort) try: res = self.writeback_es.search(index=self.writeback_index, body=query, size=1000) if res['hits']['hits']: return res['hits']['hits'] except ElasticsearchException as e: elastalert_logger.exception("Error finding recent pending alerts: %s %s" % (e, query)) return [] def send_pending_alerts(self): with self.alert_lock: pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit) for alert in pending_alerts: _id = alert['_id'] alert = alert['_source'] try: rule_name = alert.pop('rule_name') alert_time = alert.pop('alert_time') match_body = alert.pop('match_body') except KeyError: # Malformed alert, drop it continue # Find original rule for rule in self.rules: if rule['name'] == rule_name: break else: # Original rule is missing, keep alert for later if rule reappears continue # Set current_es for top_count_keys query self.thread_data.current_es = elasticsearch_client(rule) # Send the alert unless it's a future alert if ts_now() > ts_to_dt(alert_time): aggregated_matches = self.get_aggregated_matches(_id) if aggregated_matches: matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches] self.alert(matches, rule, alert_time=alert_time) else: # If this rule isn't using aggregation, this must be a retry of a failed alert retried = False if not rule.get('aggregation'): retried = True self.alert([match_body], rule, alert_time=alert_time, retried=retried) if rule['current_aggregate_id']: for qk, agg_id in rule['current_aggregate_id'].items(): if agg_id == _id: rule['current_aggregate_id'].pop(qk) break # Delete it from the index try: self.writeback_es.delete(index=self.writeback_index, id=_id) except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil. self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time)) # Send in memory aggregated alerts for rule in self.rules: if rule['agg_matches']: for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items(): if ts_now() > aggregate_alert_time: alertable_matches = [ agg_match for agg_match in rule['agg_matches'] if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value ] self.alert(alertable_matches, rule) rule['agg_matches'] = [ agg_match for agg_match in rule['agg_matches'] if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value ] def get_aggregated_matches(self, _id): """ Removes and returns all matches from writeback_es that have aggregate_id == _id """ # XXX if there are more than self.max_aggregation matches, you have big alerts and we will leave entries in ES. query = {'query': {'query_string': {'query': 'aggregate_id:"%s"' % (_id)}}, 'sort': {'@timestamp': 'asc'}} matches = [] try: res = self.writeback_es.search(index=self.writeback_index, body=query, size=self.max_aggregation) for match in res['hits']['hits']: matches.append(match['_source']) self.writeback_es.delete(index=self.writeback_index, id=match['_id']) except (KeyError, ElasticsearchException) as e: self.handle_error("Error fetching aggregated matches: %s" % (e), {'id': _id}) return matches def find_pending_aggregate_alert(self, rule, aggregation_key_value=None): query = {'filter': {'bool': {'must': [{'term': {'rule_name': rule['name']}}, {'range': {'alert_time': {'gt': ts_now()}}}, {'term': {'alert_sent': 'false'}}], 'must_not': [{'exists': {'field': 'aggregate_id'}}]}}} if aggregation_key_value: query['filter']['bool']['must'].append({'term': {'aggregation_key': aggregation_key_value}}) query = {'query': {'bool': query}} query['sort'] = {'alert_time': {'order': 'desc'}} try: res = self.writeback_es.search(index=self.writeback_index, body=query, size=1) if len(res['hits']['hits']) == 0: return None except (KeyError, ElasticsearchException) as e: self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']}) return None return res['hits']['hits'][0] def add_aggregated_alert(self, match, rule): """ Save a match as a pending aggregate alert to Elasticsearch. """ with self.alert_lock: # Optionally include the 'aggregation_key' as a dimension for aggregations aggregation_key_value = self.get_aggregation_key_value(rule, match) # This is a fallback option in case this change to using ts_now() interferes with the behavior current # users are accustomed to. It is not documented because it likely won't be needed. If no one reports # a problem we can remove this fallback option in a future release. if rule.get('aggregation_alert_time_compared_with_timestamp_field', False): compare_dt = lookup_es_key(match, rule['timestamp_field']) else: compare_dt = ts_now() if (not rule['current_aggregate_id'].get(aggregation_key_value) or ('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[ 'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(compare_dt))): # ElastAlert may have restarted while pending alerts exist pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value) if pending_alert: alert_time = ts_to_dt(pending_alert['_source']['alert_time']) rule['aggregate_alert_time'][aggregation_key_value] = alert_time agg_id = pending_alert['_id'] rule['current_aggregate_id'] = {aggregation_key_value: agg_id} elastalert_logger.info( 'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % ( rule['name'], agg_id, aggregation_key_value, alert_time ) ) else: # First match, set alert_time alert_time = '' if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'): croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility try: iter = croniter(rule['aggregation']['schedule'], ts_now()) alert_time = unix_to_dt(iter.get_next()) except Exception as e: self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule']) else: try: if rule.get('aggregate_by_match_time', False): match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) alert_time = match_time + rule['aggregation'] else: alert_time = ts_now() + rule['aggregation'] except Exception as e: self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation']) rule['aggregate_alert_time'][aggregation_key_value] = alert_time agg_id = None elastalert_logger.info( 'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time) ) else: # Already pending aggregation, use existing alert_time alert_time = rule['aggregate_alert_time'].get(aggregation_key_value) agg_id = rule['current_aggregate_id'].get(aggregation_key_value) elastalert_logger.info( 'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % ( rule['name'], agg_id, aggregation_key_value, alert_time ) ) alert_body = self.get_alert_body(match, rule, False, alert_time) if agg_id: alert_body['aggregate_id'] = agg_id if aggregation_key_value: alert_body['aggregation_key'] = aggregation_key_value res = self.writeback('elastalert', alert_body, rule) # If new aggregation, save _id if res and not agg_id: rule['current_aggregate_id'][aggregation_key_value] = res['_id'] # Couldn't write the match to ES, save it in memory for now if not res: rule['agg_matches'].append(match) return res def silence(self, silence_cache_key=None): """ Silence an alert for a period of time. --silence and --rule must be passed as args. """ if self.debug: elastalert_logger.error('--silence not compatible with --debug') exit(1) if not self.args.rule: elastalert_logger.error('--silence must be used with --rule') exit(1) # With --rule, self.rules will only contain that specific rule if not silence_cache_key: if self.args.silence_qk_value: silence_cache_key = self.rules[0]['realert_key'] + "." + self.args.silence_qk_value else: silence_cache_key = self.rules[0]['name'] + "._silence" try: silence_ts = parse_deadline(self.args.silence) except (ValueError, TypeError): elastalert_logger.error('%s is not a valid time period' % (self.args.silence)) exit(1) if not self.set_realert(silence_cache_key, silence_ts, 0): elastalert_logger.error('Failed to save silence command to Elasticsearch') exit(1) elastalert_logger.info('Success. %s will be silenced until %s' % (silence_cache_key, silence_ts)) def set_realert(self, silence_cache_key, timestamp, exponent): """ Write a silence to Elasticsearch for silence_cache_key until timestamp. """ body = {'exponent': exponent, 'rule_name': silence_cache_key, '@timestamp': ts_now(), 'until': timestamp} self.silence_cache[silence_cache_key] = (timestamp, exponent) return self.writeback('silence', body) def is_silenced(self, rule_name): """ Checks if rule_name is currently silenced. Returns false on exception. """ if rule_name in self.silence_cache: if ts_now() < self.silence_cache[rule_name][0]: return True if self.debug: return False query = {'term': {'rule_name': rule_name}} sort = {'sort': {'until': {'order': 'desc'}}} query = {'query': query} query.update(sort) try: doc_type = 'silence' index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) res = self.writeback_es.search(index=index, size=1, body=query, _source_includes=['until', 'exponent']) except ElasticsearchException as e: self.handle_error("Error while querying for alert silence status: %s" % (e), {'rule': rule_name}) return False if res['hits']['hits']: until_ts = res['hits']['hits'][0]['_source']['until'] exponent = res['hits']['hits'][0]['_source'].get('exponent', 0) if rule_name not in list(self.silence_cache.keys()): self.silence_cache[rule_name] = (ts_to_dt(until_ts), exponent) else: self.silence_cache[rule_name] = (ts_to_dt(until_ts), self.silence_cache[rule_name][1]) if ts_now() < ts_to_dt(until_ts): return True return False def handle_error(self, message, data=None): ''' Logs message at error level and writes message, data and traceback to Elasticsearch. ''' elastalert_logger.error(message) body = {'message': message} tb = traceback.format_exc() body['traceback'] = tb.strip().split('\n') if data: body['data'] = data self.writeback('elastalert_error', body) def handle_uncaught_exception(self, exception, rule): """ Disables a rule and sends a notification. """ elastalert_logger.error(traceback.format_exc()) self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']}) if self.disable_rules_on_error: self.rules = [running_rule for running_rule in self.rules if running_rule['name'] != rule['name']] self.disabled_rules.append(rule) self.scheduler.pause_job(job_id=rule['name']) elastalert_logger.info('Rule %s disabled', rule['name']) if self.notify_email: self.send_notification_email(exception=exception, rule=rule) def send_notification_email(self, text='', exception=None, rule=None, subject=None, rule_file=None): email_body = text rule_name = None if rule: rule_name = rule['name'] elif rule_file: rule_name = rule_file if exception and rule_name: if not subject: subject = 'Uncaught exception in ElastAlert - %s' % (rule_name) email_body += '\n\n' email_body += 'The rule %s has raised an uncaught exception.\n\n' % (rule_name) if self.disable_rules_on_error: modified = ' or if the rule config file has been modified' if not self.args.pin_rules else '' email_body += 'It has been disabled and will be re-enabled when ElastAlert restarts%s.\n\n' % (modified) tb = traceback.format_exc() email_body += tb if isinstance(self.notify_email, str): self.notify_email = [self.notify_email] email = MIMEText(email_body) email['Subject'] = subject if subject else 'ElastAlert notification' recipients = self.notify_email if rule and rule.get('notify_email'): if isinstance(rule['notify_email'], str): rule['notify_email'] = [rule['notify_email']] recipients = recipients + rule['notify_email'] recipients = list(set(recipients)) email['To'] = ', '.join(recipients) email['From'] = self.from_addr email['Reply-To'] = self.conf.get('email_reply_to', email['To']) try: smtp = SMTP(self.smtp_host) smtp.sendmail(self.from_addr, recipients, email.as_string()) except (SMTPException, error) as e: self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body}) def get_top_counts(self, rule, starttime, endtime, keys, number=None, qk=None): """ Counts the number of events for each unique value for each key field. Returns a dictionary with top_events_ mapped to the top 5 counts for each key. """ all_counts = {} if not number: number = rule.get('top_count_number', 5) for key in keys: index = self.get_index(rule, starttime, endtime) hits_terms = self.get_hits_terms(rule, starttime, endtime, index, key, qk, number) if hits_terms is None: top_events_count = {} else: buckets = list(hits_terms.values())[0] # get_hits_terms adds to num_hits, but we don't want to count these self.thread_data.num_hits -= len(buckets) terms = {} for bucket in buckets: terms[bucket['key']] = bucket['doc_count'] counts = list(terms.items()) counts.sort(key=lambda x: x[1], reverse=True) top_events_count = dict(counts[:number]) # Save a dict with the top 5 events by key all_counts['top_events_%s' % (key)] = top_events_count return all_counts def next_alert_time(self, rule, name, timestamp): """ Calculate an 'until' time and exponent based on how much past the last 'until' we are. """ if name in self.silence_cache: last_until, exponent = self.silence_cache[name] else: # If this isn't cached, this is the first alert or writeback_es is down, normal realert return timestamp + rule['realert'], 0 if not rule.get('exponential_realert'): return timestamp + rule['realert'], 0 diff = seconds(timestamp - last_until) # Increase exponent if we've alerted recently if diff < seconds(rule['realert']) * 2 ** exponent: exponent += 1 else: # Continue decreasing exponent the longer it's been since the last alert while diff > seconds(rule['realert']) * 2 ** exponent and exponent > 0: diff -= seconds(rule['realert']) * 2 ** exponent exponent -= 1 wait = datetime.timedelta(seconds=seconds(rule['realert']) * 2 ** exponent) if wait >= rule['exponential_realert']: return timestamp + rule['exponential_realert'], exponent - 1 return timestamp + wait, exponent def handle_signal(signal, frame): elastalert_logger.info('SIGINT received, stopping ElastAlert...') # use os._exit to exit immediately and avoid someone catching SystemExit os._exit(0) def main(args=None): signal.signal(signal.SIGINT, handle_signal) if not args: args = sys.argv[1:] client = ElastAlerter(args) if client.prometheus_port and not client.debug: p = PrometheusWrapper(client) p.start() if not client.args.silence: client.start() if __name__ == '__main__': sys.exit(main(sys.argv[1:]))