Skip to content

Commit

Permalink
Support for cleaning up untracked containers/volumes/networks (oldweb…
Browse files Browse the repository at this point in the history
…-today#1)

* untracked container cleanup:
- add a greenlet loop to detect and cleanup 'untracked' containers
that are not tied to a valid reqid, also remove any attached volumes
and networks from untracked containers
- config: support custom reqid and network labels for testing
- tests: add cleanup tests to ensure untracked flocks are removed
  • Loading branch information
ikreymer authored Dec 15, 2018
1 parent 7ec9999 commit c654a33
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 20 deletions.
10 changes: 7 additions & 3 deletions shepherd/network_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ class NetworkPool(object):
NETWORK_NAME = 'shepherd.net-{0}'
NETWORK_LABEL = 'owt.network.managed'

def __init__(self, docker, network_templ=None, name='owt.netpool.default'):
def __init__(self, docker, network_templ=None,
name='owt.netpool.default',
network_label=None):

self.docker = docker
self.network_templ = network_templ or self.NETWORK_NAME
self.labels = {self.NETWORK_LABEL: name}
self.network_label = network_label or self.NETWORK_LABEL
self.labels = {self.network_label: name}
self.pool_name = name

def new_name(self):
Expand All @@ -32,7 +36,7 @@ def disconnect_all(self, network):

def remove_network(self, network):
try:
assert(network.attrs['Labels'][self.NETWORK_LABEL] == self.pool_name)
assert(network.attrs['Labels'][self.network_label] == self.pool_name)
self.disconnect_all(network)
network.remove()
return True
Expand Down
5 changes: 2 additions & 3 deletions shepherd/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def event_loop(self):
if not self.running:
break

reqid = event['Actor']['Attributes'][self.shepherd.SHEP_REQID_LABEL]
reqid = event['Actor']['Attributes'][self.shepherd.reqid_label]
if event['status'] == 'die':
self.handle_die_event(reqid, event)

Expand All @@ -129,8 +129,7 @@ def event_loop(self):
print(e)

def handle_die_event(self, reqid, event):
key = self.req_key + reqid
self.redis.delete(key)
self._mark_stopped(reqid)

def handle_start_event(self, reqid, event):
pass
Expand Down
84 changes: 77 additions & 7 deletions shepherd/shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,31 @@ class Shepherd(object):

DEFAULT_REQ_TTL = 120

UNTRACKED_CHECK_TIME = 30

DEFAULT_SHM_SIZE = '1g'

VOLUME_TEMPL = 'vol-{name}-{reqid}'

def __init__(self, redis, network_templ=None, volume_templ=None):
def __init__(self, redis, network_templ=None, volume_templ=None,
reqid_label=None, untracked_check_time=None, network_label=None):
self.flocks = {}
self.docker = docker.from_env()
self.redis = redis

self.network_pool = NetworkPool(self.docker, network_templ=network_templ)
self.network_pool = NetworkPool(self.docker,
network_templ=network_templ,
network_label=network_label)

self.volume_templ = volume_templ or self.VOLUME_TEMPL

self.reqid_label = reqid_label or self.SHEP_REQID_LABEL

self.untracked_check_time = untracked_check_time or self.UNTRACKED_CHECK_TIME

if self.untracked_check_time > 0:
gevent.spawn(self.untracked_check_loop)

def load_flocks(self, flocks_file_or_dir):
num_loaded = 0
if os.path.isfile(flocks_file_or_dir):
Expand Down Expand Up @@ -119,7 +131,7 @@ def start_flock(self, reqid, labels=None, environ=None, pausable=False,
containers = {}

labels = labels or {}
labels[self.SHEP_REQID_LABEL] = flock_req.reqid
labels[self.reqid_label] = flock_req.reqid

try:
network_pool = network_pool or self.network_pool
Expand Down Expand Up @@ -220,7 +232,7 @@ def start_deferred_container(self, reqid, image_name, labels=None):

try:
labels = labels or {}
labels[self.SHEP_REQID_LABEL] = flock_req.reqid
labels[self.reqid_label] = flock_req.reqid

spec = self.find_spec_for_flock_req(flock_req, image_name)

Expand Down Expand Up @@ -390,7 +402,7 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None, network_pool=None
containers = self.get_flock_containers(flock_req)

for container in containers:
if container.labels.get(self.SHEP_REQID_LABEL) != reqid:
if container.labels.get(self.reqid_label) != reqid:
continue

try:
Expand Down Expand Up @@ -457,13 +469,13 @@ def get_volumes(self, flock_req, flock_spec, labels=None, create=False):
return volume_binds, volumes_list

def get_flock_containers(self, flock_req):
return self.docker.containers.list(all=True, filters={'label': self.SHEP_REQID_LABEL + '=' + flock_req.reqid})
return self.docker.containers.list(all=True, filters={'label': self.reqid_label + '=' + flock_req.reqid})

def remove_flock_volumes(self, flock_req):
num_volumes = flock_req.data.get('num_volumes', 0)

for x in range(0, 3):
res = self.docker.volumes.prune(filters={'label': self.SHEP_REQID_LABEL + '=' + flock_req.reqid})
res = self.docker.volumes.prune(filters={'label': self.reqid_label + '=' + flock_req.reqid})
num_volumes -= len(res.get('VolumesDeleted', []))

if num_volumes == 0:
Expand Down Expand Up @@ -526,6 +538,64 @@ def resume_flock(self, reqid):

return {'success': True}

def untracked_check_loop(self):
print('Untracked Container Check Loop Started')

filters = {'label': self.reqid_label}

while True:
try:
all_containers = self.docker.containers.list(all=False, filters=filters)

reqids = set()
network_names = set()

for container in all_containers:
reqid = container.labels.get(self.reqid_label)

if self.is_valid_flock(reqid):
continue

reqids.add(reqid)

try:
network_names.update(container.attrs['NetworkSettings']['Networks'].keys())
except:
pass


try:
short_id = self.short_id(container)
container.remove(force=True)
print('Removed untracked container: ' + short_id)
except:
pass

# remove volumes
for reqid in reqids:
try:
res = self.docker.volumes.prune(filters={'label': self.reqid_label + '=' + reqid})
pruned_volumes = len(res.get('VolumesDeleted', []))
if pruned_volumes:
print('Removed Untracked Volumes: {0}'.format(pruned_volumes))
except:
pass

# remove any networks these containers were connected to
for network_name in network_names:
try:
network = self.docker.networks.get(network_name)
if len(network.containers) == 0:
self.network_pool.remove_network(network)
print('Removed untracked network: ' + network_name)
except:
pass

except:
traceback.print_exc()

time.sleep(self.untracked_check_time)

@classmethod
def full_tag(cls, tag):
return tag + ':latest' if ':' not in tag else tag
Expand Down
15 changes: 12 additions & 3 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

TEST_FLOCKS = os.path.join(TEST_DIR, 'test_flocks.yaml')

TEST_REQID_LABEL = 'owt.test.shepherd'

TEST_NETWORK_LABEL = 'owt.test.network'


# ============================================================================
class DebugMixin(object):
Expand All @@ -31,7 +35,7 @@ def handle_die_event(self, reqid, event):
self.stop_events.append(event)

try:
reqid = event['Actor']['Attributes']['owt.shepherd.reqid']
reqid = event['Actor']['Attributes'][TEST_REQID_LABEL]
self.reqid_stops[reqid] = self.reqid_stops.get(reqid, 0) + 1
except:
pass
Expand All @@ -41,7 +45,7 @@ def handle_start_event(self, reqid, event):
self.start_events.append(event)

try:
reqid = event['Actor']['Attributes']['owt.shepherd.reqid']
reqid = event['Actor']['Attributes'][TEST_REQID_LABEL]
self.reqid_starts[reqid] = self.reqid_starts.get(reqid, 0) + 1
except:
pass
Expand All @@ -63,7 +67,12 @@ def redis():

@pytest.fixture(scope='module')
def shepherd(redis):
shep = Shepherd(redis, NETWORKS_NAME)
shep = Shepherd(redis,
reqid_label=TEST_REQID_LABEL,
network_templ=NETWORKS_NAME,
network_label=TEST_NETWORK_LABEL,
untracked_check_time=2.0)

shep.load_flocks(TEST_FLOCKS)
return shep

Expand Down
4 changes: 2 additions & 2 deletions test/test_00_shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_already_launched(self, shepherd):
flock2 = shepherd.start_flock(self.reqid)
assert flock2 == TestShepherd.flock

def test_verify_launch(self, docker_client, redis):
def test_verify_launch(self, docker_client, redis, shepherd):
flock = TestShepherd.flock
containers = flock['containers']

Expand Down Expand Up @@ -107,7 +107,7 @@ def test_verify_launch(self, docker_client, redis):
assert 'FOO=BAR2' in container.attrs['Config']['Env']

# assert labels
assert container.labels[Shepherd.SHEP_REQID_LABEL] == self.reqid
assert container.labels[shepherd.reqid_label] == self.reqid

# assert ip is set
assert info['ip'] != ''
Expand Down
4 changes: 2 additions & 2 deletions test/test_01_basic_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_start_flock(self, pool, redis):
assert len(pool.start_events) == 2
for event in pool.start_events:
assert event['Action'] == 'start'
assert event['Actor']['Attributes'][pool.shepherd.SHEP_REQID_LABEL] == self.reqid
assert event['Actor']['Attributes'][pool.shepherd.reqid_label] == self.reqid

assert redis.exists('p:test-pool:rq:' + self.reqid)
assert redis.scard('p:test-pool:f') == 1
Expand All @@ -70,7 +70,7 @@ def test_stop_flock(self, pool, redis):
assert len(pool.stop_events) == 2
for event in pool.stop_events:
assert event['Action'] == 'die'
assert event['Actor']['Attributes'][pool.shepherd.SHEP_REQID_LABEL] == self.reqid
assert event['Actor']['Attributes'][pool.shepherd.reqid_label] == self.reqid

assert not redis.exists('p:test-pool:rq:' + self.reqid)
assert redis.scard('p:test-pool:f') == 0
Expand Down
79 changes: 79 additions & 0 deletions test/test_06_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from gevent.monkey import patch_all; patch_all()

import pytest
import time
import itertools
import docker

from shepherd.wsgi import create_app

@pytest.mark.usefixtures('client_class', 'docker_client')
class TestCleanup(object):
@classmethod
def sleep_try(cls, sleep_interval, max_time, test_func):
max_count = float(max_time) / sleep_interval
for counter in itertools.count():
try:
time.sleep(sleep_interval)
test_func()
return
except:
if counter >= max_count:
raise

def _count_containers(self, docker_client, shepherd):
return len(docker_client.containers.list(filters={'label': shepherd.reqid_label}))

def _count_volumes(self, docker_client, shepherd):
return len(docker_client.volumes.list(filters={'label': shepherd.reqid_label}))

def _count_networks(self, docker_client, shepherd):
return len(docker_client.networks.list(filters={'label': shepherd.network_pool.network_label}))

def test_ensure_flock_stop(self, docker_client):
res = self.client.post('/api/request_flock/test_b')

reqid = res.json['reqid']

res = self.client.post('/api/start_flock/{0}'.format(reqid))

assert res.json['containers']

box = docker_client.containers.get(res.json['containers']['box']['id'])
box_2 = docker_client.containers.get(res.json['containers']['box-2']['id'])

box.remove(force=True)

def assert_removed():
with pytest.raises(docker.errors.NotFound):
box = docker_client.containers.get(res.json['containers']['box-2']['id'])

self.sleep_try(0.3, 10.0, assert_removed)

def test_check_untracked_cleanup(self, docker_client, redis, shepherd):
num_containers = self._count_containers(docker_client, shepherd)
num_volumes = self._count_volumes(docker_client, shepherd)
num_networks = self._count_volumes(docker_client, shepherd)

for x in range(0, 3):
res = self.client.post('/api/request_flock/test_vol')

reqid = res.json['reqid']

res = self.client.post('/api/start_flock/{0}'.format(reqid))

assert res.json['containers']

assert num_containers < self._count_containers(docker_client, shepherd)
assert num_volumes < self._count_volumes(docker_client, shepherd)
assert num_networks < self._count_networks(docker_client, shepherd)

# wipe all redis data
redis.flushdb()

def assert_removed():
assert num_containers == self._count_containers(docker_client, shepherd)
assert num_volumes == self._count_volumes(docker_client, shepherd)
assert num_networks == self._count_networks(docker_client, shepherd)

self.sleep_try(0.5, 10.0, assert_removed)

0 comments on commit c654a33

Please sign in to comment.