-
Notifications
You must be signed in to change notification settings - Fork 361
/
Copy pathselinux_server.py
155 lines (139 loc) · 6.36 KB
/
selinux_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/python3 -EsI
import dbus
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GObject
from gi.repository import GLib
import os
import selinux
from subprocess import Popen, PIPE, STDOUT
class selinux_server(dbus.service.Object):
default_polkit_auth_required = "org.selinux.semanage"
def __init__(self, *p, **k):
super(selinux_server, self).__init__(*p, **k)
def is_authorized(self, sender, action_id):
bus = dbus.SystemBus()
proxy = bus.get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
authority = dbus.Interface(proxy, dbus_interface='org.freedesktop.PolicyKit1.Authority')
subject = ('system-bus-name', {'name': sender})
result = authority.CheckAuthorization(subject, action_id, {}, 1, '')
return result[0]
#
# The semanage method runs a transaction on a series of semanage commands,
# these commands can take the output of customized
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def semanage(self, buf, sender):
if not self.is_authorized(sender, "org.selinux.semanage"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semanage", "import"], stdout=PIPE, stderr=PIPE, stdin=PIPE, universal_newlines=True)
p.stdin.write(buf)
output = p.communicate()
if p.returncode and p.returncode != 0:
raise dbus.exceptions.DBusException(output[1])
#
# The customized method will return all of the custommizations for policy
# on the server. This output can be used with the semanage method on
# another server to make the two systems have duplicate policy.
#
@dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender")
def customized(self, sender):
if not self.is_authorized(sender, "org.selinux.customized"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semanage", "export"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
buf = p.stdout.read()
output = p.communicate()
if p.returncode and p.returncode != 0:
raise OSError("Failed to read SELinux configuration: %s", output)
return buf
#
# The semodule_list method will return the output of semodule --list=full, using the customized polkit,
# since this is a readonly behaviour
#
@dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender")
def semodule_list(self, sender):
if not self.is_authorized(sender, "org.selinux.semodule_list"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semodule", "--list=full"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
buf = p.stdout.read()
output = p.communicate()
if p.returncode and p.returncode != 0:
raise OSError("Failed to list SELinux modules: %s", output)
return buf
#
# The restorecon method modifies any file path to the default system label
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def restorecon(self, path, sender):
if not self.is_authorized(sender, "org.selinux.restorecon"):
raise dbus.exceptions.DBusException("Not authorized")
selinux.restorecon(str(path), recursive=1)
#
# The setenforce method turns off the current enforcement of SELinux
#
@dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender")
def setenforce(self, value, sender):
if not self.is_authorized(sender, "org.selinux.setenforce"):
raise dbus.exceptions.DBusException("Not authorized")
selinux.security_setenforce(value)
#
# The setenforce method turns off the current enforcement of SELinux
#
@dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender")
def relabel_on_boot(self, value, sender):
if not self.is_authorized(sender, "org.selinux.relabel_on_boot"):
raise dbus.exceptions.DBusException("Not authorized")
if value == 1:
fd = open("/.autorelabel", "w")
fd.close()
else:
try:
os.unlink("/.autorelabel")
except FileNotFoundError:
pass
def write_selinux_config(self, enforcing=None, policy=None):
path = selinux.selinux_path() + "config"
backup_path = path + ".bck"
fd = open(path)
lines = fd.readlines()
fd.close()
fd = open(backup_path, "w")
for l in lines:
if enforcing and l.startswith("SELINUX="):
fd.write("SELINUX=%s\n" % enforcing)
continue
if policy and l.startswith("SELINUXTYPE="):
fd.write("SELINUXTYPE=%s\n" % policy)
continue
fd.write(l)
fd.close()
os.rename(backup_path, path)
#
# The change_default_enforcement modifies the current enforcement mode
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def change_default_mode(self, value, sender):
if not self.is_authorized(sender, "org.selinux.change_default_mode"):
raise dbus.exceptions.DBusException("Not authorized")
values = ["enforcing", "permissive", "disabled"]
if value not in values:
raise ValueError("Enforcement mode must be %s" % ", ".join(values))
self.write_selinux_config(enforcing=value)
#
# The change_default_policy method modifies the policy type
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def change_default_policy(self, value, sender):
if not self.is_authorized(sender, "org.selinux.change_default_policy"):
raise dbus.exceptions.DBusException("Not authorized")
path = selinux.selinux_path() + value
if os.path.isdir(path):
return self.write_selinux_config(policy=value)
raise ValueError("%s does not exist" % path)
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
system_bus = dbus.SystemBus()
name = dbus.service.BusName("org.selinux", system_bus)
server = selinux_server(system_bus, "/org/selinux/object")
mainloop.run()