Skip to content

Commit

Permalink
[pipelined]Fix Restart Setup Instabilities (magma#3533)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Yurchenko <koolzz@fb.com>
  • Loading branch information
koolzz authored and xjtian committed Nov 9, 2020
1 parent 51d0b93 commit bf293cc
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lte/gateway/c/session_manager/LocalEnforcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ LocalEnforcer::LocalEnforcer(
session_force_termination_timeout_ms),
quota_exhaustion_termination_on_init_ms_(
quota_exhaustion_termination_on_init_ms),
retry_timeout_(2),
retry_timeout_(2000),
mconfig_(mconfig),
access_timezone_(compute_access_timezone()) {}

Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/c/session_manager/LocalEnforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class LocalEnforcer {
// [CWF-ONLY] This configures how long we should wait before terminating a
// session after it is created without any monitoring quota
long quota_exhaustion_termination_on_init_ms_;
std::chrono::seconds retry_timeout_;
std::chrono::milliseconds retry_timeout_;
magma::mconfig::SessionD mconfig_;
std::unique_ptr<Timezone> access_timezone_;

Expand Down
4 changes: 2 additions & 2 deletions lte/gateway/c/session_manager/LocalSessionManagerHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ LocalSessionManagerHandlerImpl::LocalSessionManagerHandlerImpl(
events_reporter_(events_reporter),
current_epoch_(0),
reported_epoch_(0),
retry_timeout_(1) {}
retry_timeout_(5000) {}

void LocalSessionManagerHandlerImpl::ReportRuleStats(
ServerContext* context, const RuleRecordTable* request,
Expand Down Expand Up @@ -158,7 +158,7 @@ void LocalSessionManagerHandlerImpl::handle_setup_callback(
return;
} else if (resp.result() == resp.FAILURE) {
MLOG(MWARNING) << "Pipelined setup failed, retrying pipelined setup "
"for epoch "
"after delay, for epoch "
<< epoch;
}

Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/c/session_manager/LocalSessionManagerHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class LocalSessionManagerHandlerImpl : public LocalSessionManagerHandler {
SessionIDGenerator id_gen_;
uint64_t current_epoch_;
uint64_t reported_epoch_;
std::chrono::seconds retry_timeout_;
std::chrono::milliseconds retry_timeout_;
static const std::string hex_digit_;

private:
Expand Down
8 changes: 5 additions & 3 deletions lte/gateway/python/magma/pipelined/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ def datapath_event_handler(self, ev):
self.logger.error(
'Error %s %s flow rules: %s', act, self.APP_NAME, e)

def is_ready_for_restart_recovery(self, epoch):
def check_setup_request_epoch(self, epoch):
"""
Check if the controller is ready to be intialized after restart
Check if the controller is ready to be initialized after restart,
returns: status code if epoch is invalid/controller is initialized
None if controller can be initialized
"""
if epoch != global_epoch:
self.logger.warning(
Expand All @@ -131,7 +133,7 @@ def is_ready_for_restart_recovery(self, epoch):
self.logger.warning('Controller already initialized, ignoring')
return SetupFlowsResult.SUCCESS

return SetupFlowsResult.SUCCESS
return None

def initialize_on_connect(self, datapath):
"""
Expand Down
58 changes: 33 additions & 25 deletions lte/gateway/python/magma/pipelined/app/enforcement_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ def _handle_flow_stats(self, stats_msgs):
current_usage, stat)

# Calculate the delta values from last stat update
delta_usage = _delta_usage_maps(current_usage,
self.last_usage_for_delta)
delta_usage = self._delta_usage_maps(current_usage,
self.last_usage_for_delta)
self.total_usage = current_usage

# Append any records which we couldn't send to session manager earlier
Expand Down Expand Up @@ -438,8 +438,8 @@ def _delete_old_flows(self, stats_msgs):
'(version: %s): %s', stat_rule_id,
stat_sid, rule_version, e)

self.last_usage_for_delta = _delta_usage_maps(self.total_usage,
deleted_flow_usage)
self.last_usage_for_delta = self._delta_usage_maps(self.total_usage,
deleted_flow_usage)

def _old_flow_stats(self, stats_msgs):
"""
Expand Down Expand Up @@ -497,6 +497,35 @@ def _get_rule_id(self, flow):
rule_num, e)
return ""

def _delta_usage_maps(self, current_usage, last_usage):
"""
Calculate the delta between the 2 usage maps and returns a new
usage map.
"""
if len(last_usage) == 0:
return current_usage
new_usage = {}
for key, current in current_usage.items():
last = last_usage.get(key, None)
if last is not None:
rec = RuleRecord()
rec.MergeFrom(current) # copy metadata
if current.bytes_rx < last.bytes_rx or \
current.bytes_tx < last.bytes_tx:
self.logger.error(
'Resetting usage for rule %s, for subscriber %s, '
'current usage(rx/tx) %d/%d, last usage %d/%d',
rec.sid, rec.rule_id, current.bytes_rx,
current.bytes_tx, last.bytes_rx, last.bytes_tx)
rec.bytes_rx = last.bytes_rx
rec.bytes_tx = last.bytes_tx
else:
rec.bytes_rx = current.bytes_rx - last.bytes_rx
rec.bytes_tx = current.bytes_tx - last.bytes_tx
new_usage[key] = rec
else:
new_usage[key] = current
return new_usage

def _generate_rule_match(imsi, ip_addr, rule_num, version, direction):
"""
Expand All @@ -509,27 +538,6 @@ def _generate_rule_match(imsi, ip_addr, rule_num, version, direction):
**ip_match)


def _delta_usage_maps(current_usage, last_usage):
"""
Calculate the delta between the 2 usage maps and returns a new
usage map.
"""
if len(last_usage) == 0:
return current_usage
new_usage = {}
for key, current in current_usage.items():
last = last_usage.get(key, None)
if last is not None:
rec = RuleRecord()
rec.MergeFrom(current) # copy metadata
rec.bytes_rx = current.bytes_rx - last.bytes_rx
rec.bytes_tx = current.bytes_tx - last.bytes_tx
new_usage[key] = rec
else:
new_usage[key] = current
return new_usage


def _merge_usage_maps(current_usage, last_usage):
"""
Merge the usage records from 2 map into a single map
Expand Down
10 changes: 7 additions & 3 deletions lte/gateway/python/magma/pipelined/app/ue_mac.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, *args, **kwargs):
self._service_manager.get_table_num(INGRESS)
self.arpd_controller_fut = kwargs['app_futures']['arpd']
self.arp_contoller = None
self._loop = kwargs['loop']
self._datapath = None
tbls = self._service_manager.allocate_scratch_tables(self.APP_NAME, 2)
self._passthrough_set_tbl = tbls[0]
Expand Down Expand Up @@ -88,14 +89,17 @@ def handle_restart(self, ue_requests: List[UEMacFlowRequest]
for ue_req in ue_requests:
self.add_ue_mac_flow(ue_req.sid.id, ue_req.mac_addr)

self._loop.call_soon_threadsafe(self._setup_arp, ue_requests)

self.init_finished = True
return SetupFlowsResult(result=SetupFlowsResult.SUCCESS)

def _setup_arp(self, ue_requests: List[UEMacFlowRequest]):
if self.arp_contoller or self.arpd_controller_fut.done():
if not self.arp_contoller:
self.arp_contoller = self.arpd_controller_fut.result()
self.arp_contoller.handle_restart(ue_requests)

self.init_finished = True
return SetupFlowsResult(result=SetupFlowsResult.SUCCESS)

def delete_all_flows(self, datapath):
flows.delete_all_flows_from_table(datapath, self.tbl_num)
flows.delete_all_flows_from_table(datapath, self._passthrough_set_tbl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, config):
def next_ip(self):
with self._lock:
ip = next(self.internal_ip_iterator, None)
while str(ip) in self._invalid_ips:
while ip is None or str(ip) in self._invalid_ips:
ip = next(self.internal_ip_iterator, None)
if ip is None:
self.internal_ip_iterator = self.internal_ip_network.hosts()
Expand Down
15 changes: 7 additions & 8 deletions lte/gateway/python/magma/pipelined/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ def SetupPolicyFlows(self, request, context) -> SetupFlowsResult:

for controller in [self._gy_app, self._enforcer_app,
self._enforcement_stats]:
ret = controller.is_ready_for_restart_recovery(request.epoch)
if ret != SetupFlowsResult.SUCCESS:
ret = controller.check_setup_request_epoch(request.epoch)
if ret:
return SetupFlowsResult(result=ret)

fut = Future()
self._loop.call_soon_threadsafe(self._setup_flows,
request, fut)
self._loop.call_soon_threadsafe(self._setup_flows, request, fut)
return fut.result()

def _setup_flows(self, request: SetupPolicyRequest,
Expand Down Expand Up @@ -388,8 +387,8 @@ def SetupUEMacFlows(self, request, context) -> SetupFlowsResult:
context.set_details('Service not enabled!')
return None

ret = self._ue_mac_app.is_ready_for_restart_recovery(request.epoch)
if ret != SetupFlowsResult.SUCCESS:
ret = self._ue_mac_app.check_setup_request_epoch(request.epoch)
if ret:
return SetupFlowsResult(result=ret)

fut = Future()
Expand Down Expand Up @@ -491,8 +490,8 @@ def SetupQuotaFlows(self, request, context) -> SetupFlowsResult:
context.set_details('Service not enabled!')
return None

ret = self._check_quota_app.is_ready_for_restart_recovery(request.epoch)
if ret != SetupFlowsResult.SUCCESS:
ret = self._check_quota_app.check_setup_request_epoch(request.epoch)
if ret:
return SetupFlowsResult(result=ret)

fut = Future()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ def wait_after_send(test_controller, wait_time=1, max_sleep_time=20):
def setup_controller(controller, setup_req, sleep_time: float = 1,
retries: int = 5):
for _ in range(0, retries):
if controller.is_ready_for_restart_recovery(
setup_req.epoch) == SetupFlowsResult.SUCCESS:
ret = controller.check_setup_request_epoch( setup_req.epoch)
if ret == SetupFlowsResult.SUCCESS:
return ret
else:
res = controller.handle_restart(setup_req.requests)
if res.result == SetupFlowsResult.SUCCESS:
return SetupFlowsResult.SUCCESS
Expand Down Expand Up @@ -263,6 +265,7 @@ def fake_controller_setup(enf_controller=None,
TestCase().assertEqual(enf_controller._clean_restart, True)
if enf_stats_controller:
TestCase().assertEqual(enf_stats_controller._clean_restart, True)
enf_controller.init_finished = False
TestCase().assertEqual(setup_controller(
enf_controller, setup_flows_request),
SetupFlowsResult.SUCCESS)
Expand Down

0 comments on commit bf293cc

Please sign in to comment.