forked from galaxyproject/galaxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocker_swarm_manager.py
593 lines (527 loc) · 25 KB
/
docker_swarm_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
#!/usr/bin/env python
"""
Docker Swarm mode management
"""
import argparse
import errno
import logging
import os
import subprocess
import sys
import time
try:
import daemon
import daemon.pidfile
import lockfile
except ImportError:
print('ERROR: The daemon module is required to use the swarm manager, '
'install it with `pip install python-daemon`', file=sys.stderr)
sys.exit(1)
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, 'lib')))
from galaxy.containers import (
build_container_interfaces,
ContainerInterfaceConfig,
parse_containers_config,
)
from galaxy.containers.docker_model import (
CPUS_CONSTRAINT,
DockerServiceConstraints,
IMAGE_CONSTRAINT,
)
DESCRIPTION = "Daemon to manage a Docker Swarm (running in Docker Swarm mode)."
SWARM_MANAGER_CONF_DEFAULTS = {
'pid_file': '{xdg_data_home}/galaxy_swarm_manager.pid',
'log_file': '{xdg_data_home}/galaxy_swarm_manager.log',
'service_wait_count_limit': 0,
'service_wait_time_limit': 5,
'slots_min_limit': 0,
'slots_max_limit': sys.maxsize,
'slots_min_spare': 0,
'node_idle_limit': 120,
'limits': [],
'spawn_wait_time': 30,
'spawn_command': '/bin/true',
'destroy_command': '/bin/true',
'command_failure_command': '/bin/true',
'command_retries': 0,
'command_retry_wait': 10,
'terminate_when_idle': True,
'log_environment_variables': [],
}
log = logging.getLogger(__name__)
class SwarmManager:
def __init__(self, conf, docker_interface):
self._conf = conf
self._cpus = docker_interface._conf.cpus
self._docker_interface = docker_interface
self._state = SwarmState(conf, docker_interface._conf)
self._log_interval = 60
self._last_log = time.time() - self._log_interval
def run(self):
while True:
self._maintain_pool()
self._check_for_new_nodes()
self._clean_services()
self._log_state()
self._terminate_if_idle()
time.sleep(1)
def _run_command(self, command, command_retries=None, returncodes=None, **kwargs):
stdout = None
attempt = 0
if not command_retries:
command_retries = self._conf.command_retries
allowed_returncodes = (0,)
if returncodes:
allowed_returncodes = returncodes
raw_cmd = command.format(**kwargs)
log.debug('running command: %s', raw_cmd)
success = False
while not success and attempt < command_retries + 1:
attempt += 1
p = subprocess.Popen(raw_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, shell=True)
stdout, stderr = p.communicate()
if p.returncode not in allowed_returncodes:
msg = f"error running '{raw_cmd}': returned {p.returncode}"
if attempt < command_retries + 1:
msg += ', waiting %s seconds' % self._conf.command_retry_wait
time.sleep(self._conf.command_retry_wait)
log.warning(msg + "\nstdout: %s\nstderr: %s\n", stdout, stderr)
else:
msg += ' (final attempt)'
log.error(msg + "\nstdout: %s\nstderr: %s\n", stdout, stderr)
self._run_command(self._conf.command_failure_command.format(failed_command=raw_cmd), command_retries=0)
stdout = None
else:
stdout = stdout.strip()
success = True
if returncodes:
return p.returncode, stdout
else:
return stdout
# main loop methods
def _maintain_pool(self):
waiting = self._docker_interface.services_waiting_by_constraints()
active = self._docker_interface.nodes_active_by_constraints()
for constraints, needed_dict in self._state.slots_needed(waiting, active).items():
services = needed_dict['services']
nodes = needed_dict['nodes']
slots_needed = needed_dict['slots_needed']
if slots_needed > 0:
self._spawn_nodes(constraints, services, slots_needed)
elif slots_needed < 0:
self._destroy_nodes(constraints, nodes, slots_needed)
def _check_for_new_nodes(self):
nodes = None
for node_state in self._state.spawning_nodes():
name = node_state['name']
elapsed = node_state['elapsed']
constraints = node_state['constraints']
state = node_state['state']
if not nodes:
nodes = self._docker_interface.nodes()
node = ([x for x in nodes if x.name == name] + [None])[0]
if not node:
if elapsed > self._conf.spawn_wait_time:
log.warning("spawning node '%s' not found in `docker node ls` and spawn_wait_time exceeded! %d seconds have elapsed", name, elapsed)
self._run_command(self._conf.command_failure_command.format(failed_command='wait_for_spawning_node %s' % name), command_retries=0)
self.mark_spawning_node_timeout(name)
elif node.is_ok():
node.set_labels_for_constraints(constraints)
self._state.mark_spawning_node_ready(name)
log.info("spawning node '%s' is ready!", name)
elif node.state != state:
log.info("spawning node '%s' state changed from '%s' to '%s'", name, state, node.state)
node.set_labels_for_constraints(constraints)
self._state.mark_spawning_node_state(name, node.state)
elif elapsed > self._conf.spawn_wait_time:
log.warning("spawning node '%s' state is '%s' after %s seconds", name, node.state, elapsed)
def _clean_services(self):
cleaned_services = self._docker_interface.services_clean()
if cleaned_services:
self._state.clean_services(cleaned_services)
log.info("cleaned services: %s", ', '.join(x.id for x in cleaned_services))
@staticmethod
def _env_str(envs, service):
if envs.get(service.id):
return ' [' + ', '.join(envs.get(service.id, [])) + ']'
return ''
def _log_state(self, now=False):
if not now and not (self._last_log < (time.time() - self._log_interval)):
return
services = list(self._docker_interface.services())
nodes = list(self._docker_interface.nodes())
terminal = [s for s in services if s.terminal]
node_task_ids = [t.id for nt in [n.tasks for n in nodes] for t in nt]
envs = {}
for service in services:
envs[service.id] = ['{}={}'.format(k, service.env.get(k, 'unset')) for k in self._conf.log_environment_variables]
log.info('%s nodes, %s services (%s terminal)', len(nodes), len(services), len(terminal))
if terminal:
service_strs = [f'{s.name} (state: {s.state})' for s in terminal]
log.info('terminal services: %s', ', '.join(service_strs) or 'none')
for service in services:
unassigned_tasks = [t for t in service.tasks if t.id not in node_task_ids]
if service not in terminal and unassigned_tasks:
task = unassigned_tasks[0]
log.info('service %s (%s)%s is not assigned to a node; state: %s %s', service.name, service.id,
self._env_str(envs, service), service.state, task.current_state_time)
for node in nodes:
log.info('node %s (%s) state: %s, %s tasks (%s terminal)', node.name, node.id, node.state,
len(node.tasks), len([t for t in node.tasks if t.terminal]))
for task in node.tasks:
if not task.service:
log.warning('node %s (%s) task %s (%s) has no service! state: %s %s', node.name, node.id,
task.slot, task.id, task.state, task.current_state_time)
else:
log.info('node %s (%s) service %s (%s) task %s (%s)%s state: %s %s', node.name, node.id,
task.service.name, task.service.id, task.slot, task.id,
self._env_str(envs, task.service), task.state, task.current_state_time)
self._last_log = time.time()
def _terminate_if_idle(self):
if not self._conf.terminate_when_idle:
return
for node in self._docker_interface.nodes():
if node.task_count > 0:
return # nodes are running a galaxy service
# no tasks running, check that all nodes are destroyed that are going to be destroyed
waiting = self._docker_interface.services_waiting_by_constraints()
active = self._docker_interface.nodes_active_by_constraints()
for constraints, nodes in active.items():
services = waiting.get(constraints, [])
needed, total = self._state.slots_delta(constraints, services, nodes)
if needed > 0:
return # services are waiting
if needed < 0:
extra_slots = max(
self._state.get_limit(constraints, 'slots_min_limit'),
self._state.get_limit(constraints, 'slots_min_spare')
)
if total + needed != extra_slots:
return # otherwise, nodes remaining are for configured minimums
# FIXME: there's a race condition here
log.info('nothing to manage, shutting down')
sys.exit(0)
# other methods
def _get_spawn_property(self, constraints, constraint_name, services):
if services:
# this isn't very nice
if constraint_name == IMAGE_CONSTRAINT:
return services[0].image
elif constraint_name == CPUS_CONSTRAINT:
return services[0].cpus
for constraint in constraints:
if constraint.name == constraint_name:
return constraint.value
return None
def _spawn_nodes(self, constraints, services, slots_needed):
service_ids = [x.id for x in services]
if service_ids:
log.info("requesting node(s) for services needing %s slots with constraints [%s]: %s",
slots_needed, constraints, ', '.join(service_ids))
else:
log.info("requesting node(s) for %s slots (due to minimum limits with constraints [%s]",
slots_needed, constraints)
command = self._conf.spawn_command.format(
service_ids=','.join(service_ids),
service_count=len(services),
image=self._get_spawn_property(constraints, IMAGE_CONSTRAINT, services) or '',
cpus=self._get_spawn_property(constraints, CPUS_CONSTRAINT, services) or '',
slots=slots_needed,
)
rc, output = self._run_command(command, returncodes=(0, 2))
if rc == 2:
log.info('spawn_command indicated that spawning should be retried: %s', output)
elif not output:
log.warning('spawn_command returned no new nodes, cannot manage nodes')
self._state.mark_services_handled(services)
else:
log.info("node allocator will spawn: %s", output)
self._state.nodes_requested(constraints, output.split())
self._state.mark_services_handled(services)
def _destroy_nodes(self, constraints, nodes, slots_needed):
# nodes here is active nodes only, should be fine
destroy_nodes = []
destroyed_slots = 0
for node in nodes:
node_slots = node.cpus / self._cpus
if self._node_ready_for_destruction(node) and (destroyed_slots + node_slots) <= abs(slots_needed):
node.drain()
destroy_nodes.append(node)
destroyed_slots += node_slots
if destroy_nodes:
command = self._conf.destroy_command.format(
nodes=' '.join(x.name for x in destroy_nodes))
destroyed_nodes = self._run_command(command)
if not destroyed_nodes:
log.warning('destroy_command returned no destroyed nodes')
else:
log.info("destroyed nodes: %s", destroyed_nodes)
def _node_ready_for_destruction(self, node):
ready = False
if node.destroyable():
if self._state.mark_node_idle(node.name):
log.debug("node '%s' is now idle", node.name)
ready = self._state.is_destruction_time(node)
elif self._state.clear_node_idle(node.name):
log.debug("node '%s' is no longer idle", node.name)
return ready
class SwarmState:
def __init__(self, conf, interface_conf):
self._conf = conf
self._cpus = interface_conf.cpus # this is effectively the slot size
self._service_create_image_constraint = interface_conf.service_create_image_constraint
self._service_create_cpus_constraint = interface_conf.service_create_cpus_constraint
self._handled_services = set()
self._waiting_since = {}
self._spawning_nodes = {}
self._surplus_nodes = {}
self._limits = {}
for limit in conf.limits:
constraints = DockerServiceConstraints.from_constraint_string_list(limit.get('constraints', []))
self._limits[constraints] = self._make_limit_dict(limit)
def _make_limit_dict(self, limit):
return {
'slots_min_limit': limit.get('slots_min_limit', self._conf.slots_min_limit),
'slots_max_limit': limit.get('slots_max_limit', self._conf.slots_max_limit),
'slots_min_spare': limit.get('slots_min_spare', self._conf.slots_min_spare),
'node_idle_limit': limit.get('node_idle_limit', self._conf.node_idle_limit),
}
def slots_needed(self, waiting_services, active_nodes):
"""Given a list of services waiting of each constraint set, and active nodes of each constraint set, return the
number of slots needed of each constraint set if the maximum wait thresholds have been reached, constrained by
the configured limits.
"""
rval = {}
services_constraints = set(waiting_services.keys())
nodes_constraints = set(active_nodes.keys())
limits_constraints = set(self._limits.keys())
all_constraints = services_constraints.union(nodes_constraints).union(limits_constraints)
if not all_constraints and (self._conf.slots_min_spare or self._conf.slots_min_limit):
if self._service_create_image_constraint or self._service_create_cpus_constraint:
raise Exception("Global 'slots_min_limit' and/or 'slots_min_spare' are set and "
"'service_create_image_constraint' and/or 'service_create_cpus_constraint' are set but "
"constraint-specific limits are unset, minimum nodes cannot be started since the constraints are not "
"known until service creation time. Either disable 'service_create_*_constraint' or create "
"constraint-specific limits in the 'limits' section of 'manager_conf' in containers_conf.yml")
all_constraints.add(DockerServiceConstraints.from_constraint_string_list([]))
for constraints in all_constraints:
services = waiting_services.get(constraints, [])
nodes = active_nodes.get(constraints, [])
# filter out any services that have already been handled
services = [s for s in services if s not in self._handled_services]
# calculate slots needed
slots_needed, total_slots = self.slots_delta(constraints, services, nodes)
# set or clear wait time
if services and constraints not in self._waiting_since:
self._waiting_since[constraints] = time.time()
elif not services and constraints in self._waiting_since:
del self._waiting_since[constraints]
rval[constraints] = {
'services': services,
'nodes': nodes,
'slots_needed': slots_needed,
}
return rval
def slots_delta(self, constraints, services, nodes):
total = 0
used = 0
if not self._cpus:
# there are no cpu constraints, so no calculation can be done
return 0, 0
for node in nodes:
used += sum([t.cpus for t in node.non_terminal_tasks]) / self._cpus
total += node.cpus / self._cpus
# need at least this many slots
needed = used + self.get_limit(constraints, 'slots_min_spare')
if (len(services) > self._conf.service_wait_count_limit
and time.time() - self._waiting_since.get(constraints, time.time()) > self._conf.service_wait_time_limit):
# add slots for waiting services that have exceeded limits
needed += sum([s.cpus for s in services]) / self._cpus
# subtract slots for spawning nodes
needed -= sum([n.get('slots', 0) for n in self._spawning_nodes.get(constraints, {})])
# ensure no less than slots_min_limit slots will exist (free or used)
needed = max(needed, self.get_limit(constraints, 'slots_min_limit'))
# ensure no more than slots_max_limit slots will exist
needed = min(needed, self.get_limit(constraints, 'slots_max_limit'))
# need to add/remove this many slots
return int(needed - total), total
def get_limit(self, constraints, limit):
limits = self._limits.get(constraints, {})
return limits.get(limit, self._conf[limit])
def spawning_nodes(self):
now = time.time()
for constraints in self._spawning_nodes.keys():
for name, node in self._spawning_nodes[constraints].items():
yval = {
'name': name,
'elapsed': now - node['time_requested'],
'constraints': constraints,
}
yval.update(node)
yield yval
def nodes_requested(self, constraints, nodes):
if constraints not in self._spawning_nodes:
self._spawning_nodes[constraints] = {}
for node in nodes:
name = node.split(':')[0]
try:
slots = int(node.split(':')[1])
except IndexError:
slots = int(1 / self._cpus)
self._spawning_nodes[constraints][name] = {
'state': 'requested',
'time_requested': time.time(),
'slots': slots,
}
def mark_services_handled(self, services):
self._handled_services.update(services)
def mark_spawning_node_ready(self, node_name):
self._delete_spawning_node(node_name)
def mark_spawning_node_timeout(self, node_name):
self._delete_spawning_node(node_name)
def _delete_spawning_node(self, node_name):
for constraints in self._spawning_nodes.keys():
if node_name in self._spawning_nodes[constraints]:
del self._spawning_nodes[constraints][node_name]
def mark_spawning_node_state(self, node_name, state):
for constraints in self._spawning_nodes.keys():
if node_name in self._spawning_nodes[constraints]:
self._spawning_nodes[constraints][node_name]['state'] = state
def is_destruction_time(self, node):
now = time.time()
limit = self.get_limit(node.labels_as_constraints, 'node_idle_limit')
return now - self._surplus_nodes.get(node.name, now) > limit
def mark_node_idle(self, node_name):
if node_name not in self._surplus_nodes:
self._surplus_nodes[node_name] = time.time()
return True
return False
def clear_node_idle(self, node_name):
if node_name in self._surplus_nodes:
del self._surplus_nodes[node_name]
return True
return False
def clean_services(self, services):
self._handled_services.difference_update(services)
def main():
args = _arg_parser().parse_args()
_run_swarm_manager(args)
def _arg_parser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("-c", "--containers-config-file", default=None)
parser.add_argument("-f", "--foreground", action="store_true", default=False)
parser.add_argument("-d", "--debug", action="store_true", default=False)
parser.add_argument("-s", "--swarm", default="_default_",
help='Swarm name in containers config to manage')
return parser
def _run_swarm_manager(args):
containers_config_file = _containers_config_file(args)
containers_conf = parse_containers_config(containers_config_file)
container_conf = _container_conf(containers_conf, args.swarm)
swarm_manager_conf = _swarm_manager_conf(container_conf)
_configure_logging(args, swarm_manager_conf)
docker_interface = build_container_interfaces(containers_config_file, containers_conf=containers_conf)[args.swarm]
pidfile = _swarm_manager_pidfile(swarm_manager_conf)
if not args.foreground:
_swarm_manager_daemon(pidfile, swarm_manager_conf['log_file'], swarm_manager_conf, docker_interface)
else:
if swarm_manager_conf['terminate_when_idle']:
log.info('running in the foreground, disabling automatic swarm manager termination')
swarm_manager_conf['terminate_when_idle'] = False
else:
log.info("running in the foreground")
try:
pidfile.acquire()
except lockfile.AlreadyLocked:
pid = pidfile.read_pid()
try:
os.kill(pid, 0)
log.warning("swarm manager is already running in pid %s", pid)
return
except OSError:
log.warning("removing stale lockfile: %s", pidfile.path)
pidfile.break_lock()
pidfile.acquire()
try:
_swarm_manager(swarm_manager_conf, docker_interface)
finally:
pidfile.release()
def _containers_config_file(args):
containers_config_file = args.containers_config_file
if not containers_config_file:
for path in ('./config', '.'):
testf = os.path.join(path, 'containers_conf.yml')
if os.path.exists(testf):
containers_config_file = testf
assert containers_config_file, \
"containers_conf.yml cannot be found, please set with '-c' or '--containers-config-file'"
return containers_config_file
def _container_conf(containers_conf, swarm):
assert swarm in containers_conf, \
"invalid container configuration name: %s" % swarm
assert containers_conf[swarm]['type'] == 'docker_swarm', \
"'%s' container configuration is not 'docker_swarm' type" % swarm
assert containers_conf[swarm].get('managed', True), \
"'%s' swarm is not managed" % swarm
return containers_conf[swarm]
def _swarm_manager_conf(new_conf):
conf = ContainerInterfaceConfig()
conf.update(SWARM_MANAGER_CONF_DEFAULTS)
conf.update(new_conf.get('manager_conf', {}))
xdg_env = _load_xdg_environment()
for opt in ('pid_file', 'log_file'):
conf[opt] = conf[opt].format(xdg_data_home=xdg_env['data_home'])
return conf
def _configure_logging(args, conf):
global log
if args and args.debug:
log_level = logging.DEBUG
else:
log_level = logging.getLevelName(conf.get('log_level', 'INFO').upper())
assert int(log_level), 'invalid log level: %s' % conf['log_level']
log = logging.getLogger(__name__)
gxlog = logging.getLogger('galaxy')
log.setLevel(log_level)
gxlog.setLevel(log_level)
log_format = conf.get('log_format', '%(name)s %(levelname)s %(asctime)s %(message)s')
formatter = logging.Formatter(log_format)
# file logging is handled by daemon
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
log.addHandler(handler)
gxlog.addHandler(handler)
def _load_xdg_environment():
return dict(
data_home=os.path.expanduser(os.environ.get('XDG_DATA_HOME', '~/.local/share')),
)
def _swarm_manager_pidfile(conf):
try:
os.makedirs(os.path.dirname(conf['pid_file']))
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
return daemon.pidfile.PIDLockFile(conf['pid_file'])
def _swarm_manager_daemon(pidfile, logfile, swarm_manager_conf, docker_interface):
log.info("daemonizing, logs will be written to '%s'", logfile)
with open(logfile, 'a') as logfh:
try:
with daemon.DaemonContext(
pidfile=pidfile,
stdout=logfh,
stderr=logfh,
):
_swarm_manager(swarm_manager_conf, docker_interface)
except lockfile.AlreadyLocked:
log.debug("attempt to daemonize with swarm manager already running ignored")
def _swarm_manager(conf, docker_interface):
swarm_manager = SwarmManager(conf, docker_interface)
while True:
try:
log.info("swarm manager loaded, running...")
swarm_manager.run()
except Exception:
log.exception("exception raised to run loop:")
log.error("restarting due to fatal error")
if __name__ == '__main__':
__name__ = 'swarm_manager'
main()