forked from bunkerity/bunkerweb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DockerController.py
122 lines (102 loc) · 4.89 KB
/
DockerController.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
from time import sleep
from typing import Any, Dict, List
from docker import DockerClient
from re import compile as re_compile
from traceback import format_exc
from docker.models.containers import Container
from Controller import Controller
class DockerController(Controller):
def __init__(self, docker_host):
super().__init__("docker")
self.__client = DockerClient(base_url=docker_host)
self.__custom_confs_rx = re_compile(r"^bunkerweb.CUSTOM_CONF_(SERVER_HTTP|MODSEC_CRS|MODSEC)_(.+)$")
def _get_controller_instances(self) -> List[Container]:
return self.__client.containers.list(filters={"label": "bunkerweb.INSTANCE"})
def _get_controller_services(self) -> List[Container]:
return self.__client.containers.list(filters={"label": "bunkerweb.SERVER_NAME"})
def _to_instances(self, controller_instance) -> List[dict]:
instance = {}
instance["name"] = controller_instance.name
instance["hostname"] = controller_instance.name
instance["health"] = controller_instance.status == "running" and controller_instance.attrs["State"]["Health"]["Status"] == "healthy"
instance["env"] = {}
for env in controller_instance.attrs["Config"]["Env"]:
variable = env.split("=")[0]
value = env.replace(f"{variable}=", "", 1)
instance["env"][variable] = value
return [instance]
def _to_services(self, controller_service) -> List[dict]:
service = {}
for variable, value in controller_service.labels.items():
if not variable.startswith("bunkerweb."):
continue
service[variable.replace("bunkerweb.", "", 1)] = value
return [service]
def get_configs(self) -> Dict[str, Dict[str, Any]]:
configs = {config_type: {} for config_type in self._supported_config_types}
# get site configs from labels
for container in self.__client.containers.list(filters={"label": "bunkerweb.SERVER_NAME"}):
labels = container.labels # type: ignore (labels is inside a container)
if isinstance(labels, list):
labels = {label: "" for label in labels}
# extract server_name
server_name = labels.get("bunkerweb.SERVER_NAME", "").split(" ")[0]
# extract configs
if not server_name:
continue
# check if server_name exists
if not self._is_service_present(server_name):
self._logger.warning(f"Ignoring config because {server_name} doesn't exist")
continue
for variable, value in labels.items():
if not variable.startswith("bunkerweb."):
continue
result = self.__custom_confs_rx.search(variable)
if result is None:
continue
configs[result.group(1).lower().replace("_", "-")][f"{server_name}/{result.group(2)}"] = value
return configs
def apply_config(self) -> bool:
return self.apply(
self._instances,
self._services,
configs=self._configs,
first=not self._loaded,
)
def __process_event(self, event):
return (
"Actor" in event
and "Attributes" in event["Actor"]
and ("bunkerweb.INSTANCE" in event["Actor"]["Attributes"] or "bunkerweb.SERVER_NAME" in event["Actor"]["Attributes"])
)
def process_events(self):
self._set_autoconf_load_db()
for event in self.__client.events(decode=True, filters={"type": "container"}):
applied = False
try:
if not self.__process_event(event):
continue
to_apply = False
while not applied:
waiting = self.have_to_wait()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not to_apply and not self.update_needed(self._instances, self._services, configs=self._configs):
applied = True
continue
to_apply = True
if waiting:
sleep(1)
continue
self._logger.info("Caught Docker event, deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info("Successfully deployed new configuration 🚀")
self._set_autoconf_load_db()
applied = True
except:
self._logger.error(f"Exception while processing events :\n{format_exc()}")