forked from Eve-PySpy/PySpy-backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dayfetch.py
executable file
·331 lines (274 loc) · 11.5 KB
/
dayfetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#!/usr/bin/env python3
import decimal
import sqlite3
import json
import logging.config
import logging
from io import BytesIO
import tarfile
import time
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import requests
import boto3
# import config
import db
Logger = logging.getLogger(__name__)
logging.basicConfig(
format='%(asctime)s [%(levelname)s]: %(message)s',
encoding='utf-8',
level=logging.INFO)
session = boto3.Session(region_name='us-west-2')
dynamodb = session.resource('dynamodb')
table = dynamodb.Table('pyspy-intel')
# Example call: Logger.info("Something badhappened", exc_info=True) ****
def generate_date_range(start_date: str, end_date: str = (
datetime.now() - timedelta(days=1)).strftime('%Y%m%d')) -> list:
"""
Generates a list of dates between a specified start date and an optional end date in YYYYMMDD format.
If end_date is not provided, it defaults to yesterday's date.
Args:
start_date (str): The start date in YYYYMMDD format.
end_date (str, optional): The end date in YYYYMMDD format. Defaults to yesterday.
Returns:
list: A list of dates between the start and end dates, inclusive.
"""
try:
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
step = timedelta(days=1)
date_list = []
while start <= end:
date_list.append(start.strftime('%Y%m%d'))
start += step
return date_list
except ValueError as e:
Logger.exception("Invalid date format", exc_info=True)
raise e
def get_latest_killmail_date(connection: sqlite3.Connection) -> str:
"""
Queries the km_summary table for the latest killmail_date.
Args:
connection (sqlite3.Connection): The SQLite connection object.
Returns:
str: The latest killmail_date in the table, or None if the table is empty.
"""
query = """
SELECT MAX(killmail_date) AS latest_killmail_date
FROM km_summary
"""
try:
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchone()
return result['latest_killmail_date'] if result and result['latest_killmail_date'] is not None else None
except sqlite3.Error as e:
Logger.exception("sqlite error occured", exc_info=True)
raise e
def fetch_and_unpack_killmail(date: str):
"""
Downloads and unpacks a tar.bz2 file containing killmail data for a given date.
Args:
date (str): The date in YYYYMMDD format for which to fetch killmail data.
Returns:
None: Prints the contents of the files within the tar archive.
"""
# Construct the URL
date_parsed = datetime.strptime(date, '%Y%m%d')
year = date_parsed.strftime('%Y')
month = date_parsed.strftime('%m')
day = date_parsed.strftime('%d')
url = f"https://data.everef.net/killmails/{
year}/killmails-{year}-{month}-{day}.tar.bz2"
# Define custom headers
headers = {'User-Agent': 'PySpy-backend jhmartin@toger.us'}
# Download the file with timeout and headers
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
if response.status_code == 200:
# Unpack the tar.bz2 file in memory
file_like_object = BytesIO(response.content)
with tarfile.open(mode="r:bz2", fileobj=file_like_object) as tar:
results = []
# List contents or process files within the tar file
for member in tar.getmembers():
f = tar.extractfile(member)
if f:
# Processing file contents, here just printing the file
# name
Logger.debug(f)
results.append(json.load(f))
# Example to read the file content: content = f.read()
return results
else:
Logger.error(
"Failed to download file from %s. Status code: %s",
url,
response.stastus_code)
raise Exception("Unable to download file {response.status_code}")
def is_abyssal_system(system_id: int) -> bool:
""" Take a system id and check if it is in the abyssal range"""
if 32000200 >= system_id >= 32000001:
return True
return False
def has_normal_cyno(victim_dict: dict) -> bool:
""" Check items on a victim if any match the normal cyno type"""
if "items" in victim_dict:
for item in victim_dict["items"]:
if item["item_type_id"] == 21096 and item["flag"] >= 27 and item["flag"] <= 34:
return True
return False
def has_covert_cyno(victim_dict: dict) -> bool:
""" Check items on a victim if any match the covert cyno type"""
if "items" in victim_dict:
for item in victim_dict["items"]:
if item["item_type_id"] == 28646 and item["flag"] >= 27 and item["flag"] <= 34:
return True
return False
def get_killmail_date(killmail_time: str) -> int:
""" Take a EVE killmail datetime and format a killmail date in YYYYMMDD"""
return int(killmail_time[0:4] + killmail_time[5:7] + killmail_time[8:10])
def get_attacker_info(killmail: dict) -> (int, set[int]):
""" Given a killmail, figure out how many attackers there were and their ids"""
attacker_count = 0
attacker_ids = set()
if "attackers" in killmail:
for attacker in killmail["attackers"]:
if "character_id" in attacker: # would not be the case for NPC KMs
attacker_count += 1
attacker_ids.add(attacker["character_id"])
return (attacker_count, attacker_ids)
def create_killmail_summaries(killmails: list[dict]):
""" Take a list of killmails and construct the KM summary info"""
Logger.info("Killmail total is %s", len(killmails))
km_summaries = []
atk_summaries = []
result_attackers = set()
result_victims = set()
for killmail in killmails:
Logger.debug(killmail)
if 'victim' not in killmail or 'character_id' not in killmail['victim']:
continue
# "killmail_time" : "2018-07-12T00:00:39Z"
killmail_id = killmail['killmail_id']
killmail_date = get_killmail_date(killmail['killmail_time'])
solar_system_id = killmail['solar_system_id']
abyssal = is_abyssal_system(solar_system_id)
(attacker_count, attacker_ids) = get_attacker_info(killmail)
victim_id = killmail["victim"]["character_id"]
ship_id = killmail["victim"]["ship_type_id"]
covert_cyno = has_covert_cyno(killmail["victim"])
normal_cyno = has_normal_cyno(killmail["victim"])
result_attackers.update(attacker_ids)
result_victims.add(victim_id)
km_summary = (
killmail_id,
killmail_date,
solar_system_id,
abyssal,
attacker_count,
victim_id,
ship_id,
covert_cyno,
normal_cyno
)
km_summaries.append(km_summary)
for attacker_id in attacker_ids:
atk_summary = (
killmail_id,
killmail_date,
attacker_id,
attacker_count
)
atk_summaries.append(atk_summary)
Logger.info("Candidate killmails are %s", len(km_summaries))
Logger.info("Distinct attackers is %s", len(result_attackers))
return (km_summaries, atk_summaries,
result_attackers, result_victims)
def dict_factory(cursor, row):
""" Used by the sqlite engine to convert a row to dict"""
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}
def clean_dict(in_dict: dict) -> dict:
""" Remove all dict keys that have null/none values """
return {k: v for k, v in in_dict.items() if v is not None}
def insert_item(item, dest_table):
""" Upload a intel item to DynamoDB. Have to handle throughput exceptions with increasing delay retries.
a batch upload command, but we have to handle partial success. Avoiding that is simpler for daily uploads"""
item = json.loads(json.dumps(item), parse_float=decimal.Decimal)
for attempt in range(1, 6): # Retry up to 5 times
try:
response = dest_table.put_item(Item=item)
return
except ClientError as err:
if err.response['Error']['Code'] not in [
"ProvisionedThroughputExceededException"]:
raise err
Logger.warning('WHOA, too fast, slow it down retries=%s', attempt)
time.sleep(2 ** attempt)
def persist_attacker_summaries(atk_summaries, sqlcon):
""" Write the attacker summaries to sqlite"""
Logger.info(
"Persisting %s attacker summaries into sqlite",
len(atk_summaries))
# Write to sql in a tight batch to reduce round-trops
sqlcon.executemany(
'''REPLACE INTO attackers (killmail_id, killmail_date, attacker_id, attacker_count) VALUES (?, ?, ?, ?)''',
atk_summaries)
sqlcon.commit()
Logger.info("Finished persisting attacker summaries into sqlite")
def persist_killmail_summaries(km_summaries, sqlcon):
""" Write the killmail summaries to sqlite"""
Logger.info("Persisting %s killmails into sqlite", len(km_summaries))
# Write to sql in a tight batch to reduce round-trops
sqlcon.executemany(
'''REPLACE INTO km_summary (killmail_id, killmail_date,
solar_system_id, abyssal, attacker_count, victim_id,
ship_id, covert_cyno, normal_cyno)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', km_summaries
)
sqlcon.commit()
Logger.info("Finished persisting killmails into sqlite")
def fetch_victim_intel(victims, cursor) -> list:
""" Given a list of victims, fetch their data out of the sqlite view"""
Logger.info("Identified %s victims to update", len(victims))
Logger.debug(victims)
vic_data = cursor.execute(
'''SELECT * FROM vic_sum WHERE character_id IN
(%s)''' % (', '.join(['?'] * len(victims))),
list(victims)
).fetchall()
return vic_data
def fetch_attacker_intel(attackers, cursor) -> list:
""" Given a list of attackers, fetch their data out of the sqlite view"""
Logger.info("Identified %s attackers to update", len(attackers))
Logger.debug(attackers)
atk_data = cursor.execute(
'''SELECT * FROM atk_sum WHERE character_id IN
(%s)''' % (', '.join(['?'] * len(attackers))),
list(attackers)
).fetchall()
return atk_data
if __name__ == "__main__":
sql = db.connect_db()
sql.row_factory = dict_factory
cur = sql.cursor()
latest_date = get_latest_killmail_date(sql)
date_range = generate_date_range(str(latest_date))
for km_date in date_range:
Logger.info("Processing data for %s", km_date)
day_killmails = fetch_and_unpack_killmail(str(km_date))
(killmail_summaries, attacker_summaries, touched_attackers,
touched_victims) = create_killmail_summaries(day_killmails)
persist_killmail_summaries(killmail_summaries, sql)
persist_attacker_summaries(attacker_summaries, sql)
victim_data = fetch_victim_intel(touched_victims, cur)
attacker_data = fetch_attacker_intel(touched_attackers, cur)
all_data = victim_data + attacker_data
total_len = len(all_data)
PROGRESS = 0
for update in all_data:
Logger.info("Uploading %s/%s for %s", PROGRESS, total_len, km_date)
PROGRESS += 1
insert_item(update, table)
sql.close()