forked from P1sec/QCSuper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson_geo_dump.py
73 lines (48 loc) · 2.42 KB
/
json_geo_dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/python3
#-*- encoding: Utf-8 -*-
from protocol.gsmtap import build_gsmtap_ip
from protocol.log_types import *
from struct import pack, unpack
from base64 import b64encode
from subprocess import run
from json import dumps
from time import time
from modules._enable_log_mixin import EnableLogMixin, TYPES_FOR_RAW_PACKET_LOGGING
"""
This module registers various diag LOG events, and generates a JSON file
with both GPS location from the Android ADB, and raw LOG diag frames.
It produces raw output deemed for off-line reuse by another module.
Format:
{"lat": 49.52531, "lng": 2.17493, "timestamp": 1521834122.2525692}
{"log_type": 0xb0e2, "log_frame": "[base64 encoded]", "timestamp": 1521834122.2525692}
This module is meant to be used with the "ADB" input.
"""
DELAY_CHECK_GEOLOCATION = 10 # Check GPS location every 10 seconds
class JsonGeoDumper(EnableLogMixin):
def __init__(self, diag_input, json_geo_file):
self.json_geo_file = json_geo_file
self.diag_input = diag_input
self.limit_registered_logs = TYPES_FOR_RAW_PACKET_LOGGING
self.last_time_geolocation_was_checked = 0
self.lat, self.lng = None, None
def on_log(self, log_type, log_payload, log_header, timestamp = 0):
if hasattr(self.diag_input, 'get_gps_location'):
if self.last_time_geolocation_was_checked < time() - DELAY_CHECK_GEOLOCATION:
lat, lng = self.diag_input.get_gps_location()
if lat and lng and (lat, lng) != (self.lat, self.lng):
json_record = dumps({
'lat': lat,
'lng': lng,
'timestamp': time()
}, sort_keys = True)
self.json_geo_file.write(json_record + '\n')
self.last_time_geolocation_was_checked = time()
if log_type in TYPES_FOR_RAW_PACKET_LOGGING:
json_record = dumps({
'log_type': log_type,
'log_frame': b64encode(log_header + log_payload).decode('ascii'),
'timestamp': time()
}, sort_keys = True)
self.json_geo_file.write(json_record + '\n')
def __del__(self):
self.json_geo_file.close()