Skip to content

Commit

Permalink
persistent pool improvements:
Browse files Browse the repository at this point in the history
- track all flocks (active or waiting) in persist set
- don't add to wait queue until flock fully stopped
- ensure non-persisted flocks are not queued for reuse
- resume() loops through all waiting containers until a valid one is found
- tests: update tests, add sleep remove to minimize race conditions
  • Loading branch information
ikreymer committed Dec 10, 2018
1 parent 9669498 commit 6d81b8c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 37 deletions.
1 change: 1 addition & 0 deletions shepherd/network_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def create_network(self):
try:
name = self.redis.spop(self.networks_key)
network = self.docker.networks.get(name)
assert len(network.containers) == 0
return network
except:
return super(CachedNetworkPool, self).create_network()
Expand Down
88 changes: 61 additions & 27 deletions shepherd/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def start(self, reqid, **kwargs):
**kwargs)

if 'error' not in res:
self.redis.sadd(self.flocks_key, reqid)
self.add_running(reqid)

self._mark_wait_duration(reqid)

Expand All @@ -77,7 +77,7 @@ def stop(self, reqid, **kwargs):
**kwargs)

if 'error' not in res:
self.redis.srem(self.flocks_key, reqid)
self.remove_running(reqid)

self._mark_stopped(reqid)

Expand All @@ -86,9 +86,15 @@ def stop(self, reqid, **kwargs):
def pause(self, reqid):
return self.stop(reqid)

def add_running(self, reqid):
return self.redis.sadd(self.flocks_key, reqid)

def is_running(self, reqid):
return self.redis.sismember(self.flocks_key, reqid)

def remove_running(self, reqid):
return self.redis.srem(self.flocks_key, reqid)

def curr_size(self):
return self.redis.scard(self.flocks_key)

Expand Down Expand Up @@ -141,7 +147,6 @@ def shutdown(self):
for reqid in self.redis.smembers(self.flocks_key):
self.stop(reqid)


if self.network_pool:
self.network_pool.shutdown()

Expand Down Expand Up @@ -268,6 +273,8 @@ class PersistentPool(LaunchAllPool):

POOL_WAIT_SET = 'p:{id}:s'

POOL_ALL_SET = 'p:{id}:a'

def __init__(self, *args, **kwargs):
super(PersistentPool, self).__init__(*args, **kwargs)

Expand All @@ -282,6 +289,8 @@ def __init__(self, *args, **kwargs):

self.pool_wait_set = self.POOL_WAIT_SET.format(id=self.name)

self.pool_all_set = self.POOL_ALL_SET.format(id=self.name)

self.grace_time = int(kwargs.get('grace_time', 0))

self.stop_on_pause = kwargs.get('stop_on_pause', False)
Expand All @@ -298,13 +307,24 @@ def start(self, reqid, environ=None):
elif self.redis.sismember(self.pool_wait_set, reqid):
return {'queued': self._find_wait_pos(reqid)}

self._add_persist(reqid)

if self.num_avail() == 0:
pos = self._push_wait(reqid)
return {'queued': pos - 1}

return super(PersistentPool, self).start(reqid, environ=environ,
pausable=self.stop_on_pause)

def _is_persist(self, reqid):
return self.redis.sismember(self.pool_all_set, reqid)

def _add_persist(self, reqid):
return self.redis.sadd(self.pool_all_set, reqid)

def _remove_persist(self, reqid):
return self.redis.srem(self.pool_all_set, reqid)

def _push_wait(self, reqid):
self.redis.sadd(self.pool_wait_set, reqid)
return self.redis.rpush(self.pool_wait_q, reqid)
Expand Down Expand Up @@ -332,15 +352,16 @@ def pause(self, reqid):

#if no next key, extend this for same duration
if next_reqid is None:
if self.is_running(reqid) and self.shepherd.is_valid_flock(reqid, 'running'):
if not self._is_persist(reqid):
self.remove_running(reqid)

elif self.is_running(reqid):
self._mark_wait_duration(reqid)

return {'success': True}

else:
self._push_wait(reqid)

self.redis.srem(self.flocks_key, reqid)
self.remove_running(reqid)

self._mark_stopped(reqid)

Expand All @@ -353,39 +374,52 @@ def pause(self, reqid):
pause_res = self.shepherd.pause_flock(reqid,
grace_time=self.grace_time)

if 'error' not in pause_res and self._is_persist(reqid):
self._push_wait(reqid)

self.resume(next_reqid)

return pause_res

def resume(self, reqid):
if not reqid:
return
res = None
while reqid:
try:
assert self._is_persist(reqid)

if not self.stop_on_pause:
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool)
if not self.stop_on_pause:
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool)

else:
res = self.shepherd.resume_flock(reqid)
else:
res = self.shepherd.resume_flock(reqid)

if res.get('error') == 'not_paused' and res.get('state') == 'new':
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool,
pausable=True)
if res.get('error') == 'not_paused' and res.get('state') == 'new':
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool,
pausable=True)

if 'error' not in res:
self.redis.sadd(self.flocks_key, reqid)
assert 'error' not in res

self._mark_wait_duration(reqid)
else:
self.redis.srem(self.flocks_key, reqid)
self.add_running(reqid)

self._mark_wait_duration(reqid)
break

except Exception as e:
import traceback
traceback.print_exc()
self.remove_running(reqid)
reqid = self._pop_wait()

return res

def stop(self, reqid, **kwargs):
removed_running = self.redis.srem(self.flocks_key, reqid)
self._remove_persist(reqid)

removed_res = self.remove_running(reqid)

# remove from wait list always just in case
self._remove_wait(reqid)
Expand All @@ -395,7 +429,7 @@ def stop(self, reqid, **kwargs):

# only attempt to resume next if was currently running
# and stopping succeeded
if removed_running and 'error' not in stop_res:
if removed_res and 'error' not in stop_res:
self.resume(self._pop_wait())

return stop_res
15 changes: 7 additions & 8 deletions shepherd/shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,15 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None, network_pool=None
try:
container.remove(v=True, link=False, force=True)

except docker.errors.APIError:
except docker.errors.APIError as e:
pass

try:
network_pool = network_pool or self.network_pool
network_pool.remove_network(network)
except:
pass
if network:
try:
network_pool = network_pool or self.network_pool
network_pool.remove_network(network)
except:
pass

return {'success': True}

Expand Down Expand Up @@ -380,8 +381,6 @@ def resume_flock(self, reqid):
try:
containers = self.get_flock_containers(flock_req)

network = self.get_network(flock_req)

for container in containers:
container.start()

Expand Down
8 changes: 6 additions & 2 deletions test/test_05_persist_pool_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def test_full_queue_additional(self, redis, persist_pool):
for x in range(1, 10):
time.sleep(2.1)

assert redis.llen('p:persist-pool:q') == 3
assert redis.scard('p:persist-pool:s') == 3
llen = redis.llen('p:persist-pool:q')
scard = redis.scard('p:persist-pool:s')
assert llen in (2, 3)
assert scard in (2, 3)

def assert_done():
assert len(persist_pool.reqid_starts) >= 6
Expand Down Expand Up @@ -127,12 +129,14 @@ def test_stop_all(self, redis, persist_pool):
while len(self.reqids) > 0:
remove = self.reqids.pop()
self.stop(remove)
time.sleep(0.2)

def assert_done():
assert redis.scard('p:persist-pool:f') == 0

assert redis.llen('p:persist-pool:q') == 0
assert redis.scard('p:persist-pool:s') == 0
assert redis.scard('p:persist-pool:a') == 0

assert persist_pool.reqid_starts == persist_pool.reqid_stops

Expand Down

0 comments on commit 6d81b8c

Please sign in to comment.