Skip to content

Commit

Permalink
chore: cleanup codebase, support for python >=3.7 (#165)
Browse files Browse the repository at this point in the history
* Update setup.cfg

* Lint, Refactor and Code Reuse

Basic cleanup, error and code climate changes

* CircleCI 2 Images

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* Update config.yml

* ci test updates

* Update mysql.py

* Update config.yml

* Runtime to 3.8 and patch test issue with dataclasses

* Tuple/Union where required

* Tuple

* Cleanup test errors

* Tuple

* Tuple/Union

* Tuple/Union

* Multi-threaded testing

* Create requirements.txt

* Update setup.py

* removed fork process, causing issue on CircleCI

* Python 3.9

* Codeclimate issues

* fix: tuple is unsubscriptable

* fix: remove unused imports

Co-authored-by: Shubhank Vijayvergiya <shubhank.vijayvergiya@gmail.com>
  • Loading branch information
Vincent Koc and shubhank-v authored May 6, 2022
1 parent dbf4e0f commit 593847b
Show file tree
Hide file tree
Showing 19 changed files with 174 additions and 127 deletions.
31 changes: 17 additions & 14 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
test:
working_directory: ~/project
docker:
- image: circleci/python:3.6
- image: cimg/python:3.9
steps:
- checkout
- run:
Expand All @@ -120,36 +120,39 @@ jobs:
- run:
name: install python dependencies
command: |
sudo pip3 install --upgrade setuptools coverage wheel
python3 -m venv venv
. venv/bin/activate
pip3 install --upgrade setuptools coverage wheel
- run:
name: install snappy as its used in tests
##/snappy is not available in the docker image
command: |
sudo apt-get install libsnappy-dev
sudo apt-get update -y
sudo apt-get install -y libsnappy-dev
- run:
name: run tests using setuptools
command: |
. venv/bin/activate
./cc-test-reporter before-build
coverage run --source hip_data_tools setup.py test
coverage xml
python3 -m coverage run --source hip_data_tools setup.py test
python3 -m coverage xml
./cc-test-reporter after-build -t coverage.py
coverage report
python3 -m coverage report
deploy:
docker:
- image: circleci/python:3.6
- image: cimg/python:3.9
steps:
- checkout
- run:
name: setup virtual env
command: |
python3 -m venv env
source ./env/bin/activate
- run:
name: install python dependencies
command: |
sudo pip3 install --upgrade setuptools wheel twine pbr
python3 -m venv venv
. venv/bin/activate
pip3 install --upgrade setuptools wheel twine pbr
- run:
name: Build the distribution
command: |
. venv/bin/activate
export GIT_TAG=$(git describe --tags)
echo "reading the latest version as $GIT_TAG"
python3 setup.py sdist bdist_wheel
Expand All @@ -159,7 +162,7 @@ jobs:
twine upload dist/*
semantic_release:
docker:
- image: circleci/node:10
- image: cimg/node:10
steps:
- *restore_repo
- checkout
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# hip-data-tools
© Hipages Group Pty Ltd 2019
© Hipages Group Pty Ltd 2019-2022

[![PyPI version](https://badge.fury.io/py/hip-data-tools.svg)](https://pypi.org/project/hip-data-tools/#history)
[![CircleCI](https://circleci.com/gh/hipagesgroup/data-tools/tree/master.svg?style=svg)](https://circleci.com/gh/hipagesgroup/data-tools/tree/master)
Expand All @@ -13,7 +13,7 @@ The package is uploaded to PyPi for easy drop and use in various environmnets, s
2. ML Training in Jupyter like notebooks
3. Local machine for dev and exploration


## Installation
Install from PyPi repo:
```bash
Expand All @@ -25,6 +25,12 @@ Install from source
pip3 install .
```

## MacOS Dependencies
```
brew install libev
brew install librdkafka
```

## Connect to aws

You will need to instantiate an AWS Connection:
Expand Down
15 changes: 7 additions & 8 deletions hip_data_tools/aws/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,8 @@ def _show_result(self, execution_id, max_result_size=1000):

def _get_query_result(self, execution_id, max_result_size=1000):
athena = self.get_client()
results = athena.get_query_results(QueryExecutionId=execution_id,
MaxResults=max_result_size)
# TODO: Add ability to parse pages larger than 1000 rows
return results
return athena.get_query_results(QueryExecutionId=execution_id, MaxResults=max_result_size)

def repair_table_partitions(self, table):
"""
Expand All @@ -210,7 +208,7 @@ def repair_table_partitions(self, table):
Returns: None
"""
self.run_query("MSCK REPAIR TABLE {}".format(table))
self.run_query(f"MSCK REPAIR TABLE {table}")

def add_partitions(self, table, partition_keys, partition_values):
"""
Expand All @@ -223,8 +221,8 @@ def add_partitions(self, table, partition_keys, partition_values):
Returns: None
"""
partition_kv = ["{}='{}'".format(key, value) for key, value in
zip(partition_keys, partition_values)]
partition_kv = [f"{key}='{value}'" for key, value in zip(partition_keys, partition_values)]

partition_query = """
ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION ({partitions});
""".format(table_name=table,
Expand Down Expand Up @@ -291,7 +289,8 @@ def get_table_ddl(self, table):
"""
# Read the ddl of temporary table
ddl_result = self.run_query("""SHOW CREATE TABLE {}""".format(table), return_result=True)
ddl_result = self.run_query(f"""SHOW CREATE TABLE {table}""", return_result=True)

ddl = ""
for row in ddl_result["ResultSet"]["Rows"]:
for column in row["Data"]:
Expand Down Expand Up @@ -343,7 +342,7 @@ def drop_table(self, table_name):
Returns: None
"""
self.run_query("""DROP TABLE IF EXISTS {}""".format(table_name))
self.run_query(f"""DROP TABLE IF EXISTS {table_name}""")


@dataclass
Expand Down
19 changes: 6 additions & 13 deletions hip_data_tools/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def download_object_and_deserialise(self, key: str, local_file_path: str = None)
Returns: object
"""
if local_file_path is None:
local_file_path = "/tmp/tmp_file{}".format(str(uuid.uuid4()))
local_file_path = f"/tmp/tmp_file{str(uuid.uuid4())}"

self.download_file(key=key, local_file_path=local_file_path)
return load(local_file_path)
Expand Down Expand Up @@ -290,10 +290,7 @@ def download_json(self, key: str, encoding: str = UTF8) -> dict:
Returns: dict
"""
s3 = self.get_resource()
json_content = json.loads(
s3.Object(self.bucket, key).get()['Body'].read().decode(encoding)
)
return json_content
return json.loads(s3.Object(self.bucket, key).get()['Body'].read().decode(encoding))

def download_strings(self, key: str, encoding: str = UTF8) -> List[str]:
"""
Expand All @@ -306,8 +303,7 @@ def download_strings(self, key: str, encoding: str = UTF8) -> List[str]:
s3 = self.get_resource()
obj = s3.Object(self.bucket, key)
data = obj.get()['Body'].read().decode(encoding)
lines = data.splitlines()
return lines
return data.splitlines()

def get_keys_modified_in_range(self,
key_prefix: str,
Expand All @@ -325,10 +321,8 @@ def get_keys_modified_in_range(self,
LOG.info("sensing files from s3://%s/%s \n between %s to %s", self.bucket, key_prefix,
start_date, end_date)
metadata = self.get_object_metadata(key_prefix)
lines = []
for file in metadata:
if start_date < arrow.get(file.last_modified) <= end_date:
lines += [file.key]
lines = [file.key for file in metadata if start_date < arrow.get(file.last_modified) <= end_date]

LOG.info("found %s s3 files changed", len(lines))
return lines

Expand All @@ -354,8 +348,7 @@ def get_object_metadata(self, key_prefix: str) -> List:
"""
s3 = self.get_resource()
bucket = s3.Bucket(name=self.bucket)
metadata = bucket.objects.filter(Prefix=key_prefix)
return metadata
return bucket.objects.filter(Prefix=key_prefix)

def upload_binary_stream(self, stream: bytes, key: str) -> None:
"""
Expand Down
24 changes: 8 additions & 16 deletions hip_data_tools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@


def _generate_random_file_name():
random_tmp_file_nm = "/tmp/tmp_file{}".format(str(uuid.uuid4()))
return random_tmp_file_nm
return f"/tmp/tmp_file{str(uuid.uuid4())}"


def get_from_env_or_default_with_warning(env_var, default_val):
Expand All @@ -41,9 +40,8 @@ def get_from_env_or_default_with_warning(env_var, default_val):
"""
value = os.environ.get(env_var)
if value is None:
warning_string = "Environmental variable {} not found, " \
"defaulting to {}".format(env_var,
str(default_val))
warning_string = f"Environmental variable {env_var} not found, defaulting to {str(default_val)}"

LOG.warning(warning_string)
value = default_val
return value
Expand Down Expand Up @@ -87,9 +85,7 @@ def exists(self, key):
key (str): the key to be verified for existance
Returns: bool
"""
if os.getenv(key) is None:
return False
return True
return os.getenv(key) is not None

def get(self, key):
"""
Expand Down Expand Up @@ -118,9 +114,7 @@ def exists(self, key):
key (str): the key to be verified for existance
Returns: bool
"""
if key in self.data:
return True
return False
return key in self.data

def get(self, key):
"""
Expand Down Expand Up @@ -155,7 +149,7 @@ def __init__(self, required_keys: list, source: KeyValueSource):
self._source = source
for key in self.keys:
if not self._source.exists(key):
raise Exception("Required Environment Variable {} does not exist!".format(key))
raise Exception(f"Required Environment Variable {key} does not exist!")

def get_secret(self, key):
"""
Expand All @@ -180,7 +174,7 @@ def flatten_nested_dict(data: dict, delimiter: str = "_", snake_cased_keys: bool
"""

def expand(key, value):
if isinstance(value, dict) or isinstance(value, OrderedDict):
if isinstance(value, (dict, OrderedDict)):
return [(key + delimiter + k, v) for k, v in flatten_nested_dict(value).items()]
else:
return [(key, value)]
Expand Down Expand Up @@ -208,9 +202,7 @@ def to_snake_case(column_name: str) -> str:
"""
# Detect and replace special_chars
str_replaced_special_chars = special_characters_detect.sub('_', column_name)
# Convert to Snake Case
camel_column_name = stringcase.snakecase(str_replaced_special_chars)
return camel_column_name
return stringcase.snakecase(str_replaced_special_chars)


def nested_list_of_dict_to_dataframe(data: List[dict]) -> DataFrame:
Expand Down
3 changes: 1 addition & 2 deletions hip_data_tools/etl/adwords_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ def _get_next_page(self) -> DataFrame:
if not self.query:
raise ValueError(
"query is not set properly. please use the build_query() method to set it up.")
data = self._get_adwords_util().download_next_page_as_dataframe()
return data
return self._get_adwords_util().download_next_page_as_dataframe()

def transfer_all(self) -> None:
"""
Expand Down
10 changes: 5 additions & 5 deletions hip_data_tools/etl/athena_to_adwords.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
handle ETL of data from Athena to Cassandra
"""
from typing import List
from typing import List, Union

from attr import dataclass
from cassandra.cqlengine import ValidationError
Expand Down Expand Up @@ -49,7 +49,7 @@ def __init__(self, settings: AthenaToAdWordsOfflineConversionSettings):
super().__init__(settings)
self._adwords = None

def upload_next(self) ->(List[dict], List[dict], List[dict]):
def upload_next(self) -> tuple(List[dict], List[dict], List[dict]):
"""
Upload the next file in line from the athena table onto AdWords offline conversion
Returns:
Expand All @@ -62,7 +62,7 @@ def upload_next(self) ->(List[dict], List[dict], List[dict]):
"""
return self._process_data_frame(self.next())

def upload_all(self) -> (List[dict], List[dict], List[dict]):
def upload_all(self) -> tuple(List[dict], List[dict], List[dict]):
"""
Upload all files from the Athena table onto AdWords offline conversion
Returns:
Expand Down Expand Up @@ -162,7 +162,7 @@ def _state_manager_connect(self):

sync_etl_state_table()

def _mark_processing(self, data: List[dict]) -> (List[dict], List[dict]):
def _mark_processing(self, data: List[dict]) -> Union[List[dict], List[dict]]:
data_for_processing = []
issues = []
for dat in data:
Expand All @@ -179,7 +179,7 @@ def _mark_upload_results(self, fail: List[dict], success: List[dict]) -> None:
for dat in fail:
self._get_sink_manager(dat["data"]).failed()

def _verify_data_before_upsert(self, data: List[dict]) -> (List[dict], List[dict]):
def _verify_data_before_upsert(self, data: List[dict]) -> Union[List[dict], List[dict]]:
data, issues = map(list, zip(*[self._sanitise_data(dat) for dat in data]))

if len(issues) > 0:
Expand Down
10 changes: 5 additions & 5 deletions hip_data_tools/etl/athena_to_googleads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
handle ETL of offline conversion data from Athena to Google Ads API
"""
from typing import List, Tuple
from typing import List, Tuple, Dict, Any

from attr import dataclass
from cassandra.cqlengine import ValidationError
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(self, settings: AthenaToGoogleAdsOfflineConversionSettings):
self._click_conversion = None
self._upload_click_conversion_request = None

def upload_next(self) -> (List[dict], List[dict], List[dict]):
def upload_next(self) -> Tuple[List[dict], List[dict], List[dict]]:
"""
Upload the next file in line from the athena table onto AdWords offline conversion
Returns:
Expand All @@ -80,7 +80,7 @@ def upload_next(self) -> (List[dict], List[dict], List[dict]):
"""
return self._process_data_frame(self.next())

def upload_all(self) -> (List[dict], List[dict], List[dict]):
def upload_all(self) -> Tuple[List[dict], List[dict], List[dict]]:
"""
Upload all files from the Athena table onto AdWords offline conversion
Returns:
Expand Down Expand Up @@ -220,7 +220,7 @@ def _state_manager_connect(self):

sync_etl_state_table()

def _mark_processing(self, data: List[dict]) -> (List[dict], List[dict]):
def _mark_processing(self, data: List[dict]) -> Tuple[List[dict], List[Dict[str, Any]]]:
data_for_processing = []
issues = []
for dat in data:
Expand All @@ -240,7 +240,7 @@ def _mark_upload_results(self, fail: List[dict], success: List[dict]) -> None:
LOG.debug(f'Uploading fail data to Cassandra: {dat}')
self._get_sink_manager(dat).failed()

def _verify_data_before_upsert(self, data: List[dict]) -> (List[dict], List[dict]):
def _verify_data_before_upsert(self, data: List[dict]) -> Tuple[list, list]:
data, issues = map(list, zip(*[self._sanitise_data(dat) for dat in data]))

# Remove None from the List
Expand Down
Loading

0 comments on commit 593847b

Please sign in to comment.