Skip to content

Commit

Permalink
Completing project
Browse files Browse the repository at this point in the history
  • Loading branch information
san089 committed Feb 20, 2020
1 parent aca7f6e commit 5f60d4e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Utility/bootstrap_script.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ sudo pip install psycopg2
or try
sudo pip-3.6 install psycopg2

ssh hadoop@ec2-3-235-6-13.compute-1.amazonaws.com -i EMR_KEY_PAIR.pem "cd /home/hadoop/goodreads_etl_pipeline/src;–export PYSPARK_DRIVER_PYTHON=python3;export PYSPARK_PYTHON=python3;spark-submit --master yarn goodreads_driver.py;"




Expand Down
Binary file added docs/images/goodreads_dag.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 20 additions & 15 deletions goodreadsfaker/generate_fake_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ def __init__(self):
self._book_data_list = list()
self._base_directory = "D:\GoodReadsData\\fake"

def generate(self):
review_obj = self._generate_fake_review_obj()
self._review_data_list.append(self._parse_review_data(review_obj))
self._user_data_list.append(self._parse_user_data(review_obj))
self._author_data_list.append(self._parse_author_data(review_obj))
self._book_data_list.append(self._parse_book_data(review_obj))

def generate(self, num_records):
for i in range(num_records):
review_obj = self._generate_fake_review_obj()
self._review_data_list.append(self._parse_review_data(review_obj))
self._user_data_list.append(self._parse_user_data(review_obj))
self._author_data_list.append(self._parse_author_data(review_obj))
self._book_data_list.append(self._parse_book_data(review_obj))
for module_name, module_data in zip(["reviews", "user", "author", "book"],
[self._review_data_list, self._user_data_list, self._author_data_list,
self._book_data_list]):
self._write_to_disk(module_name, module_data)
self._write_to_disk(module_name, module_data)
self._clear_modules()

def _write_to_disk(self, module_name, module_data):
file = os.path.join(self._base_directory, f"{module_name}.csv")
Expand All @@ -44,10 +45,13 @@ def _write_to_disk(self, module_name, module_data):
pd \
.DataFrame(module_data) \
.to_csv(path_or_buf=file, sep=',',index=False, mode=write_mode, header=header, quoting=csv.QUOTE_MINIMAL, encoding='utf-8')
self._user_data_list = list()
self._review_data_list = list()
self._author_data_list = list()
self._book_data_list = list()


def _clear_modules(self):
self._user_data_list = list()
self._review_data_list = list()
self._author_data_list = list()
self._book_data_list = list()

def _clean_text(cls, text):
return ' '.join((text.replace('\n','')).split())
Expand All @@ -56,7 +60,7 @@ def _generate_fake_review_obj(self):
return {

#Fake review
"review_id" : self._faker.random_int(0, 100000),
"review_id" : self._faker.random_int(0, 10000000),
"user_id" : self._faker.random_int(0, 100000),
"book_id" : self._faker.random_int(0, 100000),
"author_id" : self._faker.random_int(0, 100000),
Expand Down Expand Up @@ -199,5 +203,6 @@ def _parse_author_data(self, review_obj):
required.add_argument("-n", "--num_records", type=int, metavar='', required=True, help="Number of records to genertae.")
args = parser.parse_args()
fk = GoodreadsFake()
for i in range(args.num_records):
fk.generate()
for i in range(100):
print(f"Running iteration : {i}")
fk.generate(args.num_records)
4 changes: 2 additions & 2 deletions src/goodreads_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class GoodreadsTransform:

def __init__(self, spark):
self._spark = spark
self._load_path = 's3://' + config.get('BUCKET', 'WORKING_ZONE')
self._save_path = 's3://' + config.get('BUCKET', 'PROCESSED_ZONE')
self._load_path = 's3a://' + config.get('BUCKET', 'WORKING_ZONE')
self._save_path = 's3a://' + config.get('BUCKET', 'PROCESSED_ZONE')


def transform_author_dataset(self):
Expand Down

0 comments on commit 5f60d4e

Please sign in to comment.