Skip to content

Commit

Permalink
added timeouts and started db
Browse files Browse the repository at this point in the history
  • Loading branch information
mhorst00 committed Oct 3, 2022
1 parent dff10f5 commit de2bd12
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 43 deletions.
95 changes: 60 additions & 35 deletions mining/db.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,60 @@
import sqlite3


TRIP_TABLE_NAME = "trip_table"
STATION_TABLE_NAME = "station_table"

con = sqlite3.connect("trip_delays.db")
cur = con.cursor()


def trips_to_db(trips: list, cursor: sqlite3.Cursor):
table_list = cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND"
f" name='{TRIP_TABLE_NAME}';"
).fetchall()
if len(table_list) == 0:
cursor.execute(f"CREATE TABLE {TRIP_TABLE_NAME} (id integer, data json)")
for trip in trips:
cursor.execute(f"insert into {TRIP_TABLE_NAME} values ?", trip)


def station_delays_to_db(station_delays: list, cursor: sqlite3.Cursor):
table_list = cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND"
f" name='{STATION_TABLE_NAME}';"
).fetchall()
if len(table_list) == 0:
cursor.execute(
f"CREATE TABLE {STATION_TABLE_NAME} (id integer, data varchar(100))"
)
for station_delay in station_delays:
print(station_delay)
cursor.execute(
f"insert into {STATION_TABLE_NAME} values(?,?)", (1, station_delay)
)
from datetime import datetime
import functools
from sqlalchemy import create_engine, Integer, JSON, Column, Sequence
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

EntityBase = declarative_base()

CURRENT_DATE = datetime.now().date()
ENGINE = create_engine(f"sqlite:///db{str(CURRENT_DATE)}.db", echo=True)
SESSION = sessionmaker(bind=ENGINE)()


class Trip(EntityBase):
__tablename__ = "trips"
id = Column(
Integer, Sequence("trip_id_seq"), primary_key=True, nullable=False
)
information = Column(JSON, nullable=True)


# Create all tables derived from the EntityBase object
EntityBase.metadata.create_all(ENGINE)


def daily_db(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
new_date = datetime.now().date()
if new_date > CURRENT_DATE:
ENGINE = create_engine(
f"sqlite:///db{str(new_date)}.db", echo=True
)
SESSION = sessionmaker(bind=ENGINE)()
CURRENT_DAY = new_date
EntityBase.metadata.create_all(ENGINE)
return func(*args, **kwargs)

return wrapper


@daily_db
def new_entry(trip):
SESSION.add(trip)
SESSION.commit()
return True


# Declare a new row
first_item = Trip()
first_item.information = dict(a=1, b="foo", c=[1, 1, 2, 3, 5, 8, 13])

new_entry(first_item)


# Get all saved items from the database
for item in SESSION.query(Trip).all():
print(type(item.information))
# <class 'dict'>
print(item.id, item.information)
34 changes: 26 additions & 8 deletions mining/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,36 @@
import concurrent.futures
import requests
import utils
import db
from datetime import datetime


def get_all_trips_from_station(start: str, stations: list[str], time: datetime):
def get_all_trips_from_station(
start: str, stations: list[str], time: datetime
):
results = []
session = requests.Session()
for destination in stations:
if start is not destination:
try:
trips = vvspy.get_trips(start, destination, time, session=session)
trips = vvspy.get_trips(
start,
destination,
time,
session=session,
limit=5,
timeout=(3.05, 6.1),
)
for i in trips:
trip = i.raw
results.append(trip)
if isinstance(i, vvspy.obj.Trip):
trip = i.raw
results.append(trip)
except Exception as err:
print(err, start, destination, type(trip))
print(
err,
utils.station_id_to_name(start),
utils.station_id_to_name(destination),
type(trip),
)
return results


Expand All @@ -28,7 +42,9 @@ def get_station_departures(station: str, time: datetime):

def get_all_trips(stations: list[str], curr_time: datetime):
trips = []
with concurrent.futures.ThreadPoolExecutor(max_workers=len(stations)) as executor:
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(stations)
) as executor:
future_to_trip = {
executor.submit(
get_all_trips_from_station, start, stations, curr_time
Expand All @@ -45,7 +61,9 @@ def get_all_trips(stations: list[str], curr_time: datetime):

def get_all_station_departures(stations: list[str], curr_time: datetime):
station_delays = []
with concurrent.futures.ThreadPoolExecutor(max_workers=len(stations)) as executor:
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(stations)
) as executor:
future_to_trip = {
executor.submit(get_station_departures, start, curr_time): start
for start in stations
Expand Down

0 comments on commit de2bd12

Please sign in to comment.