forked from OWASP/SecureTea-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiot_mode.py
225 lines (188 loc) · 6.4 KB
/
iot_mode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# -*- coding: utf-8 -*-
u"""IoT Mode for SecureTea.
Project:
╔═╗┌─┐┌─┐┬ ┬┬─┐┌─┐╔╦╗┌─┐┌─┐
╚═╗├┤ │ │ │├┬┘├┤ ║ ├┤ ├─┤
╚═╝└─┘└─┘└─┘┴└─└─┘ ╩ └─┘┴ ┴
Author: Abhishek Sharma <abhishek_official@hotmail.com> , Jul 31 2019
Version: 1.5.1
Module: SecureTea
"""
# Import all the modules necessary for IoT mode
from securetea.lib.ids import secureTeaIDS
from securetea.lib.firewall import secureTeaFirewall
from securetea.lib.iot import iot_checker
from securetea import logger
import multiprocessing
import sys
class IoTMode(object):
"""IoTMode class."""
def __init__(self, debug=False, cred=None):
"""
Initialize IoTMode.
Args:
debug (bool): Log on terminal or not
cred (dict): Configuration credentials
Raises:
None
Returns
None
"""
self.debug = debug
# Initialize logger
self.logger = logger.SecureTeaLogger(
__name__,
debug=self.debug
)
# Initialize credentials
if cred is not None:
self.cred = cred
else:
self.logger.log(
"No configuraton parameters found, exiting",
logtype="error"
)
sys.exit(0)
# Initialize objects presence as false
self.firewall = False
self.ids = False
self.iot_checker = False
# Initialize empty process pool list
self.process_pool = []
def create_objects(self):
"""
Create module (Firewall, IDS, IoT Checker) objects
if configuraton parameters are available for these.
Args:
None
Raises:
None
Returns:
None
"""
if self.cred.get("firewall"):
try:
self.logger.log(
"Initializing Firewall object",
logtype="info"
)
# Initialize Firewall object
self.firewallObj = secureTeaFirewall.SecureTeaFirewall(cred=self.cred,
debug=self.debug)
self.firewall = True
self.logger.log(
"Initialized Firewall object",
logtype="info"
)
except KeyError:
self.logger.log(
"Firewall configuration parameter not configured.",
logtype="error"
)
except Exception as e:
self.logger.log(
"Error occured: " + str(e),
logtype="error"
)
if self.cred.get("ids"):
try:
self.logger.log(
"Initializing IDS object",
logtype="info"
)
# Initialize IDS object
self.ids_obj = secureTeaIDS.SecureTeaIDS(cred=self.cred['ids'],
debug=self.debug)
self.ids = True
self.logger.log(
"Initialized IDS object",
logtype="info"
)
except KeyError:
self.logger.log(
"Intrusion Detection System (IDS) parameter not configured.",
logtype="error"
)
except Exception as e:
self.logger.log(
"Error occured: " + str(e),
logtype="error"
)
try:
self.logger.log(
"Initializing IoT checker object",
logtype="info"
)
# Initialize IoT Checker object
self.iot_checker_obj = iot_checker.IoTChecker(debug=self.debug,
api_key=self.cred['iot-check']['shodan-api-key'],
ip=self.cred['iot-check']['ip'])
except KeyError:
self.logger.log(
"IoT checker parameters not configured.",
logtype="error"
)
except Exception as e:
self.logger.log(
"Error occured: " + str(e),
logtype="error"
)
def create_process(self):
"""
Create process for the initialized objects.
Args:
None
Raises:
None
Returns:
None
"""
if self.firewall: # if Firewall object is initialized
firewall_process = multiprocessing.Process(target=self.firewallObj.start_firewall)
self.process_pool.append(firewall_process)
if self.ids: # if IDS object is initialized
ids_process = multiprocessing.Process(target=self.ids_obj.start_ids)
self.process_pool.append(ids_process)
if self.iot_checker: # if IoT object is initialized
iot_checker_process = multiprocessing.Process(target=self.iot_checker_obj.check_shodan_range)
self.process_pool.append(iot_checker_process)
def start_process(self):
"""
Start all the process in the process pool
and terminate gracefully in Keyboard Interrupt.
Args:
None
Raises:
None
Returns:
None
"""
try:
for process in self.process_pool:
process.start()
for process in self.process_pool:
process.join()
except KeyboardInterrupt:
for process in self.process_pool:
process.terminate()
except Exception as e:
self.logger.log(
"Error occured: " + str(e),
logtype="error"
)
def start_iot_mode(self):
"""
Start SecureTea in IoT mode.
Args:
None
Raises:
None
Returns:
None
"""
# Create / initialize required objects
self.create_objects()
# Create process for the objects
self.create_process()
# Start the process
self.start_process()