Skip to content

Commit

Permalink
improve demotion/promotion functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Shishkin <vladimir.shishkin@dsr-corporation.com>
  • Loading branch information
VladimirWork committed Jan 27, 2020
1 parent c99cc4c commit 959680a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
2 changes: 2 additions & 0 deletions scripts/performance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Note: batches are evenly distributed, but txs inside one batch are sent as fast

'--promotion_shift' : Delay between demotion and promotion when 'demote' and 'promote' are used.

'--nodes_untouched' : Number of nodes to keep them safe from demotions and promotions when 'demote' and 'promote' are used.

One or more parameters could be stored in config file. Format of the config is simple YAML. Long parameter name should be used. Config name should be passed to script as an additional unnamed argument.
```
python3 perf_processes.py -n 1000 -k nym config_file_name
Expand Down
16 changes: 6 additions & 10 deletions scripts/performance/perf_load/perf_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,16 @@
parser.add_argument('--promotion_shift', default=60, type=int, required=False,
help='Shift between demotions and promotions')

parser.add_argument('--nodes_untouched', default=0, type=int, required=False,
help='Number of nodes to keep them safe from demotions and promotions')


class LoadRunner:
def __init__(self, clients=0, genesis_path="~/.indy-cli/networks/sandbox/pool_transactions_genesis",
seed=["000000000000000000000000Trustee1"], req_kind="nym", batch_size=10, refresh_rate=10,
buff_req=30, out_dir=".", val_sep="|", wallet_key="key", mode="p", pool_config='',
sync_mode="freeflow", load_rate=10, out_file="", load_time=0, taa_text="", taa_version="", shift=60,
ext_set=None, client_runner=LoadClient.run, log_lvl=logging.INFO,
nodes_untouched=0, ext_set=None, client_runner=LoadClient.run, log_lvl=logging.INFO,
short_stat=False):
self._client_runner = client_runner
self._clients = dict() # key process future; value ClientRunner
Expand Down Expand Up @@ -141,6 +144,7 @@ def __init__(self, clients=0, genesis_path="~/.indy-cli/networks/sandbox/pool_tr
self._taa_text = taa_text
self._taa_version = taa_version
self._shift = shift
self._nodes_untouched = nodes_untouched
self._ext_set = ext_set
if pool_config:
try:
Expand All @@ -153,14 +157,6 @@ def __init__(self, clients=0, genesis_path="~/.indy-cli/networks/sandbox/pool_tr
self._out_file = self.prepare_fs(out_file)
self._short_stat = short_stat

@property
def genesis_path(self):
return self._genesis_path

@property
def promotion_shift(self):
return self._shift

def process_reqs(self, stat, name: str = ""):
assert self._failed_f
assert self._total_f
Expand Down Expand Up @@ -466,7 +462,7 @@ def check_genesis(gen_path):
args, extra = parser.parse_known_args()
dict_args = vars(args)

pool_registry = PoolRegistry(dict_args["genesis_path"], dict_args["promotion_shift"])
pool_registry = PoolRegistry(dict_args["genesis_path"], dict_args["promotion_shift"], dict_args["nodes_untouched"])

if len(extra) > 1:
raise argparse.ArgumentTypeError("Only path to config file expected, but found extra arguments: {}".format(extra))
Expand Down
6 changes: 6 additions & 0 deletions scripts/performance/perf_load/perf_req_gen_demote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from indy import ledger
import json
import testinfra

from perf_load.perf_req_gen import RequestGenerator
from perf_load.perf_utils import PoolRegistry
Expand All @@ -23,3 +24,8 @@ async def _gen_req(self, submitter_did, req_data):
'services': []
}
return await ledger.build_node_request(submitter_did, req_data['node_dest'], json.dumps(data))

async def on_request_replied(self, req_data, gen_req, resp_or_exp):
# stop demoted node
host = testinfra.get_host('ssh://persistent_node' + req_data['node_alias'][4:])
host.run('sudo systemctl stop indy-node')
4 changes: 2 additions & 2 deletions scripts/performance/perf_load/perf_req_gen_promote.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ async def _gen_req(self, submitter_did, req_data):
return await ledger.build_node_request(submitter_did, req_data['node_dest'], json.dumps(data))

async def on_request_replied(self, req_data, gen_req, resp_or_exp):
# restart promoted node
# start promoted node
host = testinfra.get_host('ssh://persistent_node' + req_data['node_alias'][4:])
host.run('sudo systemctl restart indy-node')
host.run('sudo systemctl start indy-node')
21 changes: 15 additions & 6 deletions scripts/performance/perf_load/perf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from collections import Sequence
from typing import Dict, List
from queue import Queue, Empty

import base58
import libnacl
Expand Down Expand Up @@ -145,11 +146,12 @@ def __call__(cls, *args, **kwargs):

class PoolRegistry(metaclass=Singleton): # instantiate it once

def __init__(self, genesis_path=None, promotion_shift=None):
def __init__(self, genesis_path=None, promotion_shift=None, nodes_untouched=0):
self._genesis_path = genesis_path
self._promotion_shift = promotion_shift
self._nodes_untouched = nodes_untouched
self._pool_data = None
self._current_node = None
self._nodes_queue = Queue()

# read genesis to get aliases and dests
with open(self._genesis_path, 'r') as f:
Expand All @@ -165,16 +167,23 @@ def __init__(self, genesis_path=None, promotion_shift=None):
for node_alias, node_dest in zip(aliases, dests)
]

# pick random node from pool and set it as current (for demotion)
# remove untouched nodes from the tail
if self._nodes_untouched > 0:
self._pool_data = self._pool_data[:-self._nodes_untouched]

# pick random node from pool and add to nodes queue (for demotion)
def select_new_random_node(self):
current_node = random.choice(self._pool_data)
self._current_node = current_node
self._nodes_queue.put(current_node)
return current_node

# return current node or random node if there is no current node yet (for promotion)
# return first node from nodes queue or random node if queue is empty (for promotion)
@property
def current_node(self):
return self._current_node if self._current_node else random.choice(self._pool_data)
try:
return self._nodes_queue.get()
except Empty:
return random.choice(self._pool_data)

@property
def promotion_shift(self):
Expand Down

0 comments on commit 959680a

Please sign in to comment.