forked from hyperledger/indy-plenum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor_strategies.py
111 lines (87 loc) · 3.64 KB
/
monitor_strategies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from abc import ABC, abstractmethod
from collections import defaultdict
from plenum.common.moving_average import EMAEventFrequencyEstimator
from stp_core.common.log import getlogger
logger = getlogger()
class MonitorStrategy(ABC):
@abstractmethod
def add_instance(self, inst_id):
pass
@abstractmethod
def remove_instance(self, inst_id):
pass
@abstractmethod
def reset(self):
pass
@abstractmethod
def update_time(self, timestamp: float):
pass
@abstractmethod
def request_received(self, id: str):
pass
@abstractmethod
def request_ordered(self, id: str, inst_id: int):
pass
@abstractmethod
def is_master_degraded(self) -> bool:
return False
@abstractmethod
def is_instance_degraded(self, inst_id: int) -> bool:
return False
class AccumulatingMonitorStrategy(MonitorStrategy):
def __init__(self, start_time: float, instances: set, txn_delta_k: int, timeout: float,
input_rate_reaction_half_time: float):
self._instances = instances
self._txn_delta_k = txn_delta_k
self._timeout = timeout
self._ordered = defaultdict(int)
self._timestamp = start_time
self._alert_timestamp = defaultdict(lambda: None)
self._input_txn_rate = EMAEventFrequencyEstimator(start_time, input_rate_reaction_half_time)
def add_instance(self, inst_id):
self._instances.add(inst_id)
def remove_instance(self, inst_id):
self._instances.remove(inst_id)
def reset(self):
logger.info("Resetting accumulating monitor")
self._input_txn_rate.reset(self._timestamp)
self._alert_timestamp = {inst: None for inst in self._instances}
self._ordered.clear()
def update_time(self, timestamp: float):
self._timestamp = timestamp
self._input_txn_rate.update_time(timestamp)
for inst_id in self._instances:
is_alerted = self._alert_timestamp[inst_id] is not None
is_degraded = self._is_degraded(inst_id)
if is_alerted and not is_degraded:
logger.info("Accumulating monitor is no longer alerted on instance {}, {}".
format(inst_id, self._statistics()))
self._alert_timestamp[inst_id] = None
elif not is_alerted and is_degraded:
logger.info("Accumulating monitor became alerted on instance {}, {}".
format(inst_id, self._statistics()))
self._alert_timestamp[inst_id] = self._timestamp
def request_received(self, id: str):
self._input_txn_rate.add_events(1)
def request_ordered(self, id: str, inst_id: int):
self._ordered[inst_id] += 1
def is_master_degraded(self) -> bool:
return self.is_instance_degraded(0)
def is_instance_degraded(self, inst_id: int) -> bool:
if self._alert_timestamp[inst_id] is None:
return False
return self._timestamp - self._alert_timestamp[inst_id] > self._timeout
@property
def _threshold(self):
return self._txn_delta_k * self._input_txn_rate.value
def _is_degraded(self, inst_id):
if len(self._instances) < 2:
return False
instance_ordered = self._ordered[inst_id]
max_ordered = max(self._ordered[i] for i in self._instances)
return (max_ordered - instance_ordered) > self._threshold
def _statistics(self):
return "txn rate {}, threshold {}, ordered: {}".\
format(self._input_txn_rate.value,
self._threshold,
", ".join([str(n) for n in self._ordered.values()]))