Skip to content

Commit

Permalink
Merge pull request #152 from abhisharma404/feature-osint
Browse files Browse the repository at this point in the history
Feature: OSINT analysis about the IP address
  • Loading branch information
rejahrehim authored Jul 16, 2019
2 parents c03a7d8 + 8d24708 commit 8bad28b
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ clamd
beautifulsoup4
pyudev
lxml
ipwhois
254 changes: 254 additions & 0 deletions securetea/lib/osint/osint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# -*- coding: utf-8 -*-
u"""OSINT module for SecureTea.
Project:
╔═╗┌─┐┌─┐┬ ┬┬─┐┌─┐╔╦╗┌─┐┌─┐
╚═╗├┤ │ │ │├┬┘├┤ ║ ├┤ ├─┤
╚═╝└─┘└─┘└─┘┴└─└─┘ ╩ └─┘┴ ┴
Author: Abhishek Sharma <abhishek_official@hotmail.com> , Jul 15 2019
Version: 1.4
Module: SecureTea
"""

import socket
import ipwhois
import geocoder
import csv
import os

from securetea.lib.osint.osint_logger import OSINTLogger


class OSINT(object):
"""OSINT class."""

def __init__(self, debug=False):
"""
Initialize OSINT class.
Args:
debug (bool): Log on terminal or not
Raises:
None
Returns:
None
"""
# Initialize logger
self.logger = OSINTLogger(
__name__,
debug=debug
)

# Initialize CSV file fieldnames
self.fieldnames = ['ip',
'host_name',
'arpa_domains',
'address',
'description',
'state',
'city',
'detailed_addr',
'postal_code']

# Path to save generated CSV report
self._REPORT_PATH = "/etc/securetea/report.csv"


def reverse_dns_lookup(self, ip):
"""
Peform reverse DNS lookup.
Args:
ip (str): IP address on which to perform operation
Raises:
None
Returns:
host_name (str): Name of the host
arpa_domains (str): ARPA domain list
"""
self.logger.log(
"Performing reverse DNS lookup on IP: " + str(ip),
logtype="info"
)
details = socket.gethostbyaddr(ip)
host_name = details[0]
arpa_domains = details[1]

if arpa_domains != []:
arpa_domains = ", ".join(arpa_domains)
else:
arpa_domains = "Not found"

return host_name, arpa_domains

def geo_lookup(self, ip_addr):
"""
Find geographic location of the IP address.
Args:
ip_addr (str): IP address on which to perform operation
Raises:
None
Returns:
address (str): Found address of the IP
"""
self.logger.log(
"Performing geographic lookup on IP: " + str(ip_addr),
logtype="info"
)
geocode_data = geocoder.ip(ip_addr)
dict_data = geocode_data.json
address = dict_data["address"]

if not address:
address = "Not found"

return address

def ip_whois(self, ip):
"""
Peform WHOIS lookup of the IP.
Args:
ip (str): IP address on which to perform operation
Raises:
None
Returns:
ip_dict (dict): Dictionary of the details collected
"""
self.logger.log(
"Performing IP WHOIS lookup on IP: " + str(ip),
logtype="info"
)
# Initialize a temporary dict to store data
temp_ip_whois_dict = dict()

ipwho = ipwhois.IPWhois(ip)
ip_dict = ipwho.lookup_whois()

description = ip_dict["asn_description"]
state = ip_dict["nets"][0]["state"]
city = ip_dict["nets"][0]["city"]
detailed_addr = ip_dict["nets"][0]["address"]
postal_code = ip_dict["nets"][0]["postal_code"]

if description:
temp_ip_whois_dict["description"] = description
else:
temp_ip_whois_dict["description"] = "Not found"

if state:
temp_ip_whois_dict["state"] = state
else:
temp_ip_whois_dict["state"] = "Not found"

if city:
temp_ip_whois_dict["city"] = city
else:
temp_ip_whois_dict["city"] = "Not found"

if detailed_addr:
temp_ip_whois_dict["detailed_addr"] = detailed_addr
else:
temp_ip_whois_dict["detailed_addr"] = "Not found"

if postal_code:
temp_ip_whois_dict["postal_code"] = postal_code
else:
temp_ip_whois_dict["postal_code"] = "Not found"

# Return the generated IP WHOIS dict
return temp_ip_whois_dict

def collect_details(self, ip):
"""
Collect details about the IP address.
Args:
ip (str): IP address on which to perform operation
Raises:
None
Returns:
ip_details_dict (dict): Dictionary containing the details about the IP
"""
self.logger.log(
"Collecting details for IP: " + str(ip),
logtype="info"
)
ip_details_dict = dict()

# Perform reverse DNS lookup
host_name, arpa_domains = self.reverse_dns_lookup(ip=ip)
# Peform geographic lookup
address = self.geo_lookup(ip_addr=ip)
# Perform IP WHOIS lookup
ip_whois_dict = self.ip_whois(ip=ip)

ip_details_dict["ip"] = ip
ip_details_dict["host_name"] = host_name
ip_details_dict["arpa_domains"] = arpa_domains
ip_details_dict["address"] = address
ip_details_dict.update(ip_whois_dict)

return ip_details_dict

def csv_writer(self, data):
"""
Write dictionary details to CSV file.
Args:
data (dict): Data to write into CSV file
Raises:
None
Returns:
None
"""
self.logger.log(
"Writing details to CSV file",
logtype="info"
)
if not os.path.isfile(self._REPORT_PATH):
with open(self._REPORT_PATH, "w") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=self.fieldnames)
# New file is being created, write the headers
writer.writeheader()
writer.writerow(data)
else:
with open(self._REPORT_PATH, "a") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=self.fieldnames)
writer.writerow(data)

def perform_osint_scan(self, ip):
"""
Perform OSINT scan on the given IP address.
Args:
ip (str): IP address on which to perform operation
Raises:
None
Returns:
None
"""
self.logger.log(
"Performing OSINT scan on IP: " + str(ip),
logtype="info"
)
# Collect details about the IP
ip_details_dict = self.collect_details(ip=ip)
# Write the details to the CSV file
self.csv_writer(data=ip_details_dict)
109 changes: 109 additions & 0 deletions securetea/lib/osint/osint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
u"""OSINT Logger for SecureTea OSINT.
Project:
╔═╗┌─┐┌─┐┬ ┬┬─┐┌─┐╔╦╗┌─┐┌─┐
╚═╗├┤ │ │ │├┬┘├┤ ║ ├┤ ├─┤
╚═╝└─┘└─┘└─┘┴└─└─┘ ╩ └─┘┴ ┴
Author: Abhishek Sharma <abhishek_official@hotmail.com> , Jul 15 2019
Version: 1.4
Module: SecureTea
"""

from securetea import logger

import time


class OSINTLogger(logger.SecureTeaLogger):
"""OSINTLogger Class."""

def __init__(self, modulename, debug=False):
"""
Initialize OSINTLogger.
Args:
modulename (str): Name of the module
debug (bool): Log on terminal or not
Raises:
None
Returns:
None
"""
# OSINT Log Path
self._PATH = "/etc/securetea/osint.log"
# Call the parent class
logger.SecureTeaLogger.__init__(self, modulename, debug)

def write_data(self, data):
"""
Write data to the log file.
Args:
data (str): Data to write
Raises:
None
Returns:
None
"""
with open(self._PATH, "a") as f:
LEGEND = '[' + self.modulename + ']' + ' [' + \
str(time.strftime("%Y-%m-%d %H:%M")) + '] '
message = LEGEND + data + "\n"
f.write(message)

def printinfo(self, message):
"""
Over-ride the parent class printinfo method.
Args:
message (str): Message to log
Raises:
None
Returns:
None
"""
# Call the parent method
super().printinfo(message)
self.write_data(message)

def printerror(self, message):
"""
Over-ride the parent class printerror method.
Args:
message (str): Message to log
Raises:
None
Returns:
None
"""
# Call the parent method
super().printerror(message)
self.write_data(message)

def printwarning(self, message):
"""
Over-ride the parent class printwarning method.
Args:
message (str): Message to log
Raises:
None
Returns:
None
"""
# Call the parent method
super().printwarning(message)
self.write_data(message)
Loading

0 comments on commit 8bad28b

Please sign in to comment.