Skip to content

Commit

Permalink
v1.0.1 Fixes (oldweb-today#2)
Browse files Browse the repository at this point in the history
* misc fixes:
- cleanup: ensure user params 'up:*' is cleaned up when container is removed, delete FlockRequest key
- deferred overrides: flock request includes a 'deferred' dict, which can set the deferred per container, overriding default flock settings
- tests: update for deferred overrides, clean up testing
- setup: bump version to 1.0.1, ensure README set for long_description
  • Loading branch information
ikreymer authored Dec 20, 2018
1 parent 3ee2295 commit 22a00af
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 33 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def run_tests(self):
author_email='ikreymer@gmail.com',
license='Apache 2.0',
packages=find_packages(exclude=['test']),
#long_description=open('README.rst').read(),
long_description=open('README.md').read(),
provides=[
'shepherd',
],
Expand Down
2 changes: 1 addition & 1 deletion shepherd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = '1.0.0'
__version__ = '1.0.1'

1 change: 1 addition & 0 deletions shepherd/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FlockRequestOptsSchema(Schema):
overrides = string_dict()
user_params = string_dict()
environ = string_dict()
deferred = fields.Dict(keys=fields.String(), values=fields.Boolean(), default=None)

class GenericResponseSchema(Schema):
reqid = fields.String()
Expand Down
81 changes: 54 additions & 27 deletions shepherd/shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Shepherd(object):
DEFAULT_FLOCKS = 'flocks.yaml'

USER_PARAMS_KEY = 'up:{0}'
C_TO_U_KEY = 'cu:{0}'

SHEP_REQID_LABEL = 'owt.shepherd.reqid'

Expand Down Expand Up @@ -107,8 +108,12 @@ def is_valid_flock(self, reqid, ensure_state=None):

return True

def start_flock(self, reqid, labels=None, environ=None, pausable=False,
def start_flock(self, reqid,
labels=None,
environ=None,
pausable=False,
network_pool=None):

flock_req = FlockRequest(reqid)
if not flock_req.load(self.redis):
return {'error': 'invalid_reqid'}
Expand All @@ -127,6 +132,8 @@ def start_flock(self, reqid, labels=None, environ=None, pausable=False,
return {'error': 'invalid_flock',
'flock': flock_name}

req_deferred = flock_req.data.get('deferred', {})

network = None
containers = {}

Expand All @@ -146,7 +153,14 @@ def start_flock(self, reqid, labels=None, environ=None, pausable=False,
volume_binds, volumes = self.get_volumes(flock_req, flock_spec, labels, create=True)

for image, spec in zip(image_list, flock_spec['containers']):
if spec.get('deferred'):
name = spec['name']

if name in req_deferred:
deferred = req_deferred[name]
else:
deferred = spec.get('deferred', False)

if deferred:
info = {'deferred': True, 'image': image}

else:
Expand Down Expand Up @@ -304,7 +318,8 @@ def run_container(self, image, spec, flock_req, network, labels=None,
name = spec['name'] + '-' + flock_req.reqid

environ = spec.get('environment') or {}
environ.update(flock_req.data['environ'])
if 'environ' in flock_req.data:
environ.update(flock_req.data['environ'])

cdata = api.create_container(
image,
Expand Down Expand Up @@ -341,10 +356,12 @@ def run_container(self, image, spec, flock_req, network, labels=None,

info['ports'] = self.get_ports(container, ports)

if info['ip'] and flock_req.data['user_params'] and spec.get('set_user_params'):
if info['ip'] and flock_req.data.get('user_params') and spec.get('set_user_params'):
# add reqid to userparams
flock_req.data['user_params']['reqid'] = flock_req.reqid
self.redis.hmset(self.USER_PARAMS_KEY.format(info['ip']), flock_req.data['user_params'])
up_key = self.USER_PARAMS_KEY.format(info['ip'])
self.redis.hmset(up_key, flock_req.data['user_params'])
self.redis.set(self.C_TO_U_KEY.format(info['id']), up_key)

return container, info

Expand Down Expand Up @@ -419,17 +436,13 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None, network_pool=None
except docker.errors.APIError as e:
pass

try:
container.remove(v=True, link=False, force=True)

except docker.errors.APIError as e:
pass
self._remove_container(container)

if network:
try:
network_pool = network_pool or self.network_pool
network_pool.remove_network(network)
except:
except Exception as e:
pass

try:
Expand All @@ -439,6 +452,21 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None, network_pool=None

return {'success': True}

def _remove_container(self, container, v=False):
try:
short_id = self.short_id(container)
c_to_uparams = self.C_TO_U_KEY.format(short_id)
res = self.redis.get(c_to_uparams)
if res:
self.redis.delete(res)
self.redis.delete(c_to_uparams)

container.remove(force=True, v=v)
return short_id

except docker.errors.APIError:
return None

def _do_graceful_stop(self, container, grace_time):
def do_stop():
try:
Expand Down Expand Up @@ -558,20 +586,10 @@ def untracked_check_loop(self):

reqids.add(reqid)

try:
network_names.update(container.attrs['NetworkSettings']['Networks'].keys())
except:
pass

short_id = self._remove_container(container)
print('Removed untracked container: ' + str(short_id))

try:
short_id = self.short_id(container)
container.remove(force=True)
print('Removed untracked container: ' + short_id)
except:
pass

# remove volumes
# remove volumes + reqid
for reqid in reqids:
try:
res = self.docker.volumes.prune(filters={'label': self.reqid_label + '=' + reqid})
Expand All @@ -581,6 +599,8 @@ def untracked_check_loop(self):
except:
pass

FlockRequest(reqid).delete(self.redis)

# remove any networks these containers were connected to
for network_name in network_names:
try:
Expand Down Expand Up @@ -617,13 +637,20 @@ def _make_reqid(self):
def init_new(self, flock_name, req_opts):
self.data = {'id': self.reqid,
'flock': flock_name,
'overrides': req_opts.get('overrides', {}),
'user_params': req_opts.get('user_params', {}),
'environ': req_opts.get('environ', {}),
'state': 'new',
}

self._copy_if_set('overrides', req_opts)
self._copy_if_set('environ', req_opts, default={})
self._copy_if_set('user_params', req_opts)
self._copy_if_set('deferred', req_opts)
return self

def _copy_if_set(self, param, src, default=None):
res = src.get(param, default)
if res is not None:
self.data[param] = res

def update_env(self, environ):
if not environ:
return
Expand Down
1 change: 1 addition & 0 deletions test/data/test_flocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ containers:

- name: box-2
image: test-shepherd/alpine
set_user_params: true


---
Expand Down
20 changes: 20 additions & 0 deletions test/test_00_shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def test_verify_launch(self, docker_client, redis, shepherd):
else:
assert not redis.exists(user_params_key)


# verify network
assert docker_client.networks.get(flock['network'])

Expand Down Expand Up @@ -313,4 +314,23 @@ def test_start_deferred_container(self, shepherd, docker_client):

assert res['success'] == True

def test_start_not_deferred_container(self, shepherd, docker_client):
res = shepherd.request_flock('test_deferred', {'deferred': {'box-p': False}})

reqid = res['reqid']

res = shepherd.start_flock(reqid)

assert res['containers']['box-1']['id']
assert res['containers']['box-p']['id']

box_1 = docker_client.containers.get(res['containers']['box-1']['id'])
assert box_1.status == 'running'

box_p = docker_client.containers.get(res['containers']['box-p']['id'])
assert box_p.status == 'running'

res = shepherd.stop_flock(reqid)

assert res['success'] == True

16 changes: 16 additions & 0 deletions test/test_01_deferred_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,21 @@ def test_deferred_only(self):
res = self.client.post('/api/pool/stop_flock/{0}'.format(self.reqid))
assert res.json == {'success': True}

def test_deferred_override(self):
# switch which container is deferred
json = {'deferred': {'box-p': False, 'box-1': True}}

res = self.client.post('/api/pool/request_flock/test_deferred', json=json)

print(res.json)
reqid = res.json['reqid']

res = self.client.post('/api/pool/start_flock/{0}'.format(reqid))

assert res.json['containers']
assert 'deferred' in res.json['containers']['box-1']
assert 'deferred' not in res.json['containers']['box-p']

res = self.client.post('/api/pool/stop_flock/{0}'.format(reqid))
assert res.json == {'success': True}

4 changes: 2 additions & 2 deletions test/test_04_fixed_pool_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_expire_queue_next_in_order(self, redis, docker_client):
def assert_done():
assert redis.scard('p:fixed-pool:f') == 2

self.sleep_try(0.2, 3.0, assert_done)
self.sleep_try(0.2, 6.0, assert_done)

res = self.client.post('/api/start_flock/' + self.pending[1])
assert res.json['queued'] == 1
Expand All @@ -102,7 +102,7 @@ def test_expire_queue_next_out_of_order(self, redis, docker_client):
def assert_done():
assert redis.scard('p:fixed-pool:f') == 1

self.sleep_try(0.2, 3.0, assert_done)
self.sleep_try(0.2, 6.0, assert_done)

res = self.start(self.pending[4])
assert res['queued'] == 3
Expand Down
37 changes: 35 additions & 2 deletions test/test_06_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from shepherd.wsgi import create_app



@pytest.mark.usefixtures('client_class', 'docker_client')
class TestCleanup(object):
@classmethod
Expand Down Expand Up @@ -74,6 +76,37 @@ def test_check_untracked_cleanup(self, docker_client, redis, shepherd):
def assert_removed():
assert num_containers == self._count_containers(docker_client, shepherd)
assert num_volumes == self._count_volumes(docker_client, shepherd)
assert num_networks == self._count_networks(docker_client, shepherd)
#assert num_networks == self._count_networks(docker_client, shepherd)

self.sleep_try(2.0, 20.0, assert_removed)

def test_redis_pool_and_reqid_cleanup(self, docker_client, redis):
reqids = []

# start and kill containers
res = self.client.post('/api/request_flock/test_vol', json={'user_params': {'foo': 'bar'}})

reqid = res.json['reqid']

res = self.client.post('/api/start_flock/{0}'.format(reqid))

reqids.append(reqid)

try:
docker_client.containers.get(res.json['containers']['box-1']['id']).remove(force=True)
except:
pass

try:
docker_client.containers.get(res.json['containers']['box-2']['id']).remove(force=True)
except:
pass

def assert_removed():
for reqid in reqids:
assert not redis.exists('req:' + reqid)

assert len(redis.smembers('p:test-pool:f')) == 0
assert redis.keys('*') == []

self.sleep_try(0.5, 10.0, assert_removed)
self.sleep_try(1.0, 10.0, assert_removed)

0 comments on commit 22a00af

Please sign in to comment.