Skip to content

Commit

Permalink
network pool: move network creation/removal to separate network pool
Browse files Browse the repository at this point in the history
- add support for optional cacheing of networks upto fixed pool size
- shepherd uses default non-reusable network pool, flock pools accept optional 'network_pool_size'
- pooled network names stored in redis key for reuse
- add tests for network reuse
  • Loading branch information
ikreymer committed Dec 10, 2018
1 parent 758dbc9 commit 9669498
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 44 deletions.
83 changes: 83 additions & 0 deletions shepherd/network_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import base64
import os
import traceback


# ============================================================================
class NetworkPool(object):
NETWORK_NAME = 'shepherd.net-{0}'
NETWORK_LABEL = 'owt.network.managed'

def __init__(self, docker, network_templ=None, name='owt.netpool.default'):
self.docker = docker
self.network_templ = network_templ or self.NETWORK_NAME
self.labels = {self.NETWORK_LABEL: name}
self.pool_name = name

def new_name(self):
name = base64.b32encode(os.urandom(10)).decode('utf-8')
return self.network_templ.format(name)

def create_network(self):
name = self.new_name()
return self.docker.networks.create(name, labels=self.labels)

def remove_network(self, network):
try:
assert(network.attrs['Labels'][self.NETWORK_LABEL] == self.pool_name)
network.remove()
return True
except Exception as e:
return False

def shutdown(self):
pass


# ============================================================================
class CachedNetworkPool(NetworkPool):
NETWORKS_LIST_KEY = 'n:{0}'

def __init__(self, docker, redis, max_size=10, **kwargs):
super(CachedNetworkPool, self).__init__(docker, **kwargs)
self.max_size = max_size
self.redis = redis
self.networks_key = self.NETWORKS_LIST_KEY.format(self.pool_name)

def shutdown(self):
while True:
network_name = self.redis.spop(self.networks_key)
if not network_name:
break

try:
network = self.docker.networks.get(network_name)
network.remove()
except:
pass

def create_network(self):
try:
name = self.redis.spop(self.networks_key)
network = self.docker.networks.get(name)
return network
except:
return super(CachedNetworkPool, self).create_network()

def remove_network(self, network):
try:
if self.redis.scard(self.networks_key) >= self.max_size:
return super(CachedNetworkPool, self).remove_network(network)

network.reload()
if len(network.containers) != 0:
return False

self.redis.sadd(self.networks_key, network.name)
return True

except:
traceback.print_exc()
return False


29 changes: 26 additions & 3 deletions shepherd/pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gevent
from shepherd.network_pool import CachedNetworkPool


# ============================================================================
Expand All @@ -10,11 +11,14 @@ class LaunchAllPool(object):

POOL_REQ = 'p:{id}:rq:'

POOL_NETWORK_TEMPL = 'shepherd-net:%s:{0}'

DEFAULT_DURATION = 3600

EXPIRE_CHECK = 30

def __init__(self, name, shepherd, redis, duration=None, expire_check=None, **kwargs):
def __init__(self, name, shepherd, redis, duration=None, expire_check=None,
network_pool_size=0, **kwargs):
self.name = name
self.shepherd = shepherd
self.redis = redis
Expand All @@ -31,6 +35,14 @@ def __init__(self, name, shepherd, redis, duration=None, expire_check=None, **kw

self.api = shepherd.docker.api

self.network_pool = None

if network_pool_size > 0:
self.network_pool = CachedNetworkPool(shepherd.docker,
redis=self.redis,
network_templ=self.POOL_NETWORK_TEMPL % self.name,
max_size=network_pool_size)

self.running = True

gevent.spawn(self.event_loop)
Expand All @@ -49,6 +61,7 @@ def _mark_stopped(self, reqid):
def start(self, reqid, **kwargs):
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool,
**kwargs)

if 'error' not in res:
Expand All @@ -59,7 +72,9 @@ def start(self, reqid, **kwargs):
return res

def stop(self, reqid, **kwargs):
res = self.shepherd.stop_flock(reqid, **kwargs)
res = self.shepherd.stop_flock(reqid,
network_pool=self.network_pool,
**kwargs)

if 'error' not in res:
self.redis.srem(self.flocks_key, reqid)
Expand Down Expand Up @@ -127,6 +142,10 @@ def shutdown(self):
self.stop(reqid)


if self.network_pool:
self.network_pool.shutdown()


# ============================================================================
class FixedSizePool(LaunchAllPool):
NOW_SERVING = 'now_serving'
Expand Down Expand Up @@ -327,6 +346,7 @@ def pause(self, reqid):

if not self.stop_on_pause:
pause_res = self.shepherd.stop_flock(reqid,
network_pool=self.network_pool,
keep_reqid=True,
grace_time=self.grace_time)
else:
Expand All @@ -342,14 +362,17 @@ def resume(self, reqid):
return

if not self.stop_on_pause:
res = self.shepherd.start_flock(reqid, labels=self.labels)
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool)

else:
res = self.shepherd.resume_flock(reqid)

if res.get('error') == 'not_paused' and res.get('state') == 'new':
res = self.shepherd.start_flock(reqid,
labels=self.labels,
network_pool=self.network_pool,
pausable=True)

if 'error' not in res:
Expand Down
40 changes: 26 additions & 14 deletions shepherd/shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from redis import StrictRedis

from shepherd.schema import AllFlockSchema, InvalidParam
from shepherd.network_pool import NetworkPool

import gevent

Expand All @@ -17,8 +18,6 @@
class Shepherd(object):
DEFAULT_FLOCKS = 'flocks.yaml'

NETWORK_NAME = 'shepherd.net-{0}'

USER_PARAMS_KEY = 'up:{0}'

SHEP_REQID_LABEL = 'owt.shepherd.reqid'
Expand All @@ -27,11 +26,12 @@ class Shepherd(object):

DEFAULT_SHM_SIZE = '1g'

def __init__(self, redis, networks_templ):
def __init__(self, redis, network_templ=None):
self.flocks = {}
self.docker = docker.from_env()
self.redis = redis
self.networks_templ = networks_templ

self.network_pool = NetworkPool(self.docker, network_templ=network_templ)

def load_flocks(self, flocks_file):
with open(flocks_file) as fh:
Expand Down Expand Up @@ -73,7 +73,8 @@ def is_valid_flock(self, reqid, ensure_state=None):

return True

def start_flock(self, reqid, labels=None, environ=None, pausable=False):
def start_flock(self, reqid, labels=None, environ=None, pausable=False,
network_pool=None):
flock_req = FlockRequest(reqid)
if not flock_req.load(self.redis):
return {'error': 'invalid_reqid'}
Expand All @@ -96,7 +97,10 @@ def start_flock(self, reqid, labels=None, environ=None, pausable=False):
containers = {}

try:
network = self.create_flock_network(flock_req)
network_pool = network_pool or self.network_pool
network = network_pool.create_network()

flock_req.set_network(network.name)

links = flock.get('links', [])
for link in links:
Expand Down Expand Up @@ -236,11 +240,12 @@ def run_container(self, image, spec, flock_req, network, labels=None,

return container, info

def create_flock_network(self, flock_req):
return self.docker.networks.create(self.NETWORK_NAME.format(flock_req.reqid))
def get_network(self, flock_req):
name = flock_req.get_network()
if not name:
return None

def get_flock_network(self, flock_req):
return self.docker.networks.get(self.NETWORK_NAME.format(flock_req.reqid))
return self.docker.networks.get(name)

def resolve_image_list(self, specs, overrides):
image_list = []
Expand Down Expand Up @@ -271,7 +276,7 @@ def is_ancestor_of(self, name, ancestor):

return False

def stop_flock(self, reqid, keep_reqid=False, grace_time=None):
def stop_flock(self, reqid, keep_reqid=False, grace_time=None, network_pool=None):
flock_req = FlockRequest(reqid)
if not flock_req.load(self.redis):
return {'error': 'invalid_reqid'}
Expand All @@ -282,7 +287,7 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None):
flock_req.stop(self.redis)

try:
network = self.get_flock_network(flock_req)
network = self.get_network(flock_req)
containers = network.containers
except:
network = None
Expand Down Expand Up @@ -318,7 +323,8 @@ def stop_flock(self, reqid, keep_reqid=False, grace_time=None):
pass

try:
network.remove()
network_pool = network_pool or self.network_pool
network_pool.remove_network(network)
except:
pass

Expand Down Expand Up @@ -374,7 +380,7 @@ def resume_flock(self, reqid):
try:
containers = self.get_flock_containers(flock_req)

network = self.get_flock_network(flock_req)
network = self.get_network(flock_req)

for container in containers:
container.start()
Expand Down Expand Up @@ -434,6 +440,12 @@ def set_state(self, state, redis):
self.data['state'] = state
self.save(redis)

def set_network(self, network_name):
self.data['net'] = network_name

def get_network(self):
return self.data.get('net')

def load(self, redis):
data = redis.get(self.key)
self.data = json.loads(data) if data else {}
Expand Down
20 changes: 19 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def persist_pool(request, redis, shepherd):
max_size=3,
expire_check=0.3,
grace_time=1,
stop_on_pause=stop_on_pause)
stop_on_pause=stop_on_pause,
network_pool_size=2)

yield pool

Expand Down Expand Up @@ -132,5 +133,22 @@ def docker_client():
docker_cli.images.remove(image.tags[0], force=True)


@pytest.fixture(scope='module')
def external_net(docker_client):
with pytest.raises(docker.errors.NotFound):
assert docker_client.networks.get('test-shepherd-external-net')

net = docker_client.networks.create('test-shepherd-external-net')

try:
yield net

finally:
net.remove()

with pytest.raises(docker.errors.NotFound):
assert docker_client.networks.get('test-shepherd-external-net')




36 changes: 12 additions & 24 deletions test/test_00_shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,7 @@ def test_start_with_external_link(self, docker_client, shepherd):
with pytest.raises(docker.errors.NotFound):
assert docker_client.networks.get(res['network'])


def test_start_with_external_net(self, docker_client, shepherd):
# if this fails, network already exists!
# need to clean up manually
with pytest.raises(docker.errors.NotFound):
assert docker_client.networks.get('test-shepherd-external-net')


def test_no_external_net_error(self,docker_client, shepherd):
res = shepherd.request_flock('test_external_net')

reqid = res['reqid']
Expand All @@ -191,11 +184,11 @@ def test_start_with_external_net(self, docker_client, shepherd):

assert res['error'] == 'start_error'

net = None
TestShepherd.net_reqid = reqid

def test_start_with_external_net(self, docker_client, shepherd, external_net):
reqid = self.net_reqid
try:
net = docker_client.networks.create('test-shepherd-external-net')

res = shepherd.start_flock(reqid)

assert res['error'] == 'invalid_reqid'
Expand All @@ -213,28 +206,23 @@ def test_start_with_external_net(self, docker_client, shepherd):

# ensure external network only in container-2
assert 'test-shepherd-external-net' not in container_1.attrs['NetworkSettings']['Networks']
assert len(container_1.attrs['NetworkSettings']['Networks']) == 1

assert 'test-shepherd-external-net' in container_2.attrs['NetworkSettings']['Networks']
assert len(container_2.attrs['NetworkSettings']['Networks']) == 2

net.reload()
assert len(net.containers) == 1
assert net.containers[0] == container_2
external_net.reload()
assert len(external_net.containers) == 1
assert external_net.containers[0] == container_2

finally:
try:
shepherd.stop_flock(reqid)
except:
pass


try:
net.reload()
assert net.containers == []

finally:
net.remove()

with pytest.raises(docker.errors.NotFound):
assert docker_client.networks.get('test-shepherd-external-net')
external_net.reload()
assert external_net.containers == []

def test_pause_resume(self, shepherd, docker_client):
res = shepherd.request_flock('test_b')
Expand Down
Loading

0 comments on commit 9669498

Please sign in to comment.