Skip to content

Commit

Permalink
Removing Pipenv and adding Yapf.
Browse files Browse the repository at this point in the history
  • Loading branch information
igorborgest committed Jul 5, 2019
1 parent dd6deda commit 136828a
Show file tree
Hide file tree
Showing 45 changed files with 457 additions and 1,002 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[flake8]
ignore = E501,E203,W503
ignore = E501,E126,W503
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ output/
# Development
dev/
metrics/
python/

# SAM
.aws-sam
*parameters.json
*requirements*.txt
testing/*parameters.json
testing/*requirements*.txt
building/*parameters.json
building/*requirements*.txt
25 changes: 0 additions & 25 deletions Pipfile

This file was deleted.

510 changes: 0 additions & 510 deletions Pipfile.lock

This file was deleted.

4 changes: 3 additions & 1 deletion awswrangler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import importlib

from awswrangler.__version__ import __title__, __description__, __version__ # noqa
from awswrangler.session import Session # noqa
Expand All @@ -7,8 +8,9 @@
from awswrangler.athena import Athena # noqa
from awswrangler.glue import Glue # noqa
from awswrangler.redshift import Redshift # noqa
from awswrangler.spark import Spark # noqa
import awswrangler.utils # noqa

if importlib.util.find_spec("pyspark"):
from awswrangler.spark import Spark # noqa

logging.getLogger("awswrangler").addHandler(logging.NullHandler())
2 changes: 1 addition & 1 deletion awswrangler/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__title__ = "awswrangler"
__description__ = "Utilities for Pandas and Apache Spark on AWS."
__version__ = "0.0b14"
__version__ = "0.0b16"
__license__ = "Apache License 2.0"
19 changes: 8 additions & 11 deletions awswrangler/athena.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from time import sleep
import logging


LOGGER = logging.getLogger(__name__)

QUERY_WAIT_POLLING_DELAY = 0.2 # MILLISECONDS
Expand All @@ -13,8 +12,7 @@ def __init__(self, session):

def run_query(self, query, database, s3_output):
client = self._session.boto3_session.client(
service_name="athena", config=self._session.botocore_config
)
service_name="athena", config=self._session.botocore_config)
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
Expand All @@ -24,14 +22,13 @@ def run_query(self, query, database, s3_output):

def wait_query(self, query_execution_id):
client = self._session.boto3_session.client(
service_name="athena", config=self._session.botocore_config
)
service_name="athena", config=self._session.botocore_config)
final_states = ["FAILED", "SUCCEEDED", "CANCELLED"]
response = client.get_query_execution(QueryExecutionId=query_execution_id)
while (
response.get("QueryExecution").get("Status").get("State")
not in final_states
):
response = client.get_query_execution(
QueryExecutionId=query_execution_id)
while (response.get("QueryExecution").get("Status").get("State") not in
final_states):
sleep(QUERY_WAIT_POLLING_DELAY)
response = client.get_query_execution(QueryExecutionId=query_execution_id)
response = client.get_query_execution(
QueryExecutionId=query_execution_id)
return response
153 changes: 92 additions & 61 deletions awswrangler/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from awswrangler.exceptions import UnsupportedType, UnsupportedFileFormat


LOGGER = logging.getLogger(__name__)


Expand All @@ -13,16 +12,16 @@ def __init__(self, session):
self._session = session

def metadata_to_glue(
self,
dataframe,
path,
objects_paths,
file_format,
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
self,
dataframe,
path,
objects_paths,
file_format,
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
):
schema = Glue._build_schema(
dataframe=dataframe,
Expand All @@ -44,8 +43,7 @@ def metadata_to_glue(
)
if partition_cols:
partitions_tuples = Glue._parse_partitions_tuples(
objects_paths=objects_paths, partition_cols=partition_cols
)
objects_paths=objects_paths, partition_cols=partition_cols)
self.add_partitions(
database=database,
table=table,
Expand All @@ -55,43 +53,43 @@ def metadata_to_glue(

def delete_table_if_exists(self, database, table):
client = self._session.boto3_session.client(
service_name="glue", config=self._session.botocore_config
)
service_name="glue", config=self._session.botocore_config)
try:
client.delete_table(DatabaseName=database, Name=table)
except client.exceptions.EntityNotFoundException:
pass

def does_table_exists(self, database, table):
client = self._session.boto3_session.client(
service_name="glue", config=self._session.botocore_config
)
service_name="glue", config=self._session.botocore_config)
try:
client.get_table(DatabaseName=database, Name=table)
return True
except client.exceptions.EntityNotFoundException:
return False

def create_table(
self, database, table, schema, path, file_format, partition_cols=None
):
def create_table(self,
database,
table,
schema,
path,
file_format,
partition_cols=None):
client = self._session.boto3_session.client(
service_name="glue", config=self._session.botocore_config
)
service_name="glue", config=self._session.botocore_config)
if file_format == "parquet":
table_input = Glue.parquet_table_definition(
table, partition_cols, schema, path
)
table, partition_cols, schema, path)
elif file_format == "csv":
table_input = Glue.csv_table_definition(table, partition_cols, schema, path)
table_input = Glue.csv_table_definition(table, partition_cols,
schema, path)
else:
raise UnsupportedFileFormat(file_format)
client.create_table(DatabaseName=database, TableInput=table_input)

def add_partitions(self, database, table, partition_paths, file_format):
client = self._session.boto3_session.client(
service_name="glue", config=self._session.botocore_config
)
service_name="glue", config=self._session.botocore_config)
if not partition_paths:
return None
partitions = list()
Expand All @@ -107,23 +105,24 @@ def add_partitions(self, database, table, partition_paths, file_format):
for _ in range(pages_num):
page = partitions[:100]
del partitions[:100]
client.batch_create_partition(
DatabaseName=database, TableName=table, PartitionInputList=page
)
client.batch_create_partition(DatabaseName=database,
TableName=table,
PartitionInputList=page)

def get_connection_details(self, name):
client = self._session.boto3_session.client(
service_name="glue", config=self._session.botocore_config
)
return client.get_connection(Name=name, HidePassword=False)["Connection"]
service_name="glue", config=self._session.botocore_config)
return client.get_connection(Name=name,
HidePassword=False)["Connection"]

@staticmethod
def _build_schema(dataframe, partition_cols, preserve_index):
if not partition_cols:
partition_cols = []
schema_built = []
if preserve_index:
name = str(dataframe.index.name) if dataframe.index.name else "index"
name = str(
dataframe.index.name) if dataframe.index.name else "index"
dataframe.index.name = "index"
dtype = str(dataframe.index.dtype)
if name not in partition_cols:
Expand Down Expand Up @@ -168,9 +167,14 @@ def csv_table_definition(table, partition_cols, schema, path):
if not partition_cols:
partition_cols = []
return {
"Name": table,
"PartitionKeys": [{"Name": x, "Type": "string"} for x in partition_cols],
"TableType": "EXTERNAL_TABLE",
"Name":
table,
"PartitionKeys": [{
"Name": x,
"Type": "string"
} for x in partition_cols],
"TableType":
"EXTERNAL_TABLE",
"Parameters": {
"classification": "csv",
"compressionType": "none",
Expand All @@ -180,15 +184,22 @@ def csv_table_definition(table, partition_cols, schema, path):
"areColumnsQuoted": "false",
},
"StorageDescriptor": {
"Columns": [{"Name": x[0], "Type": x[1]} for x in schema],
"Columns": [{
"Name": x[0],
"Type": x[1]
} for x in schema],
"Location": path,
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"OutputFormat":
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"Compressed": False,
"NumberOfBuckets": -1,
"SerdeInfo": {
"Parameters": {"field.delim": ","},
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"Parameters": {
"field.delim": ","
},
"SerializationLibrary":
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
},
"StoredAsSubDirectories": False,
"SortColumns": [],
Expand All @@ -210,8 +221,11 @@ def csv_partition_definition(partition):
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"Location": partition[0],
"SerdeInfo": {
"Parameters": {"field.delim": ","},
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"Parameters": {
"field.delim": ","
},
"SerializationLibrary":
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
},
"StoredAsSubDirectories": False,
},
Expand All @@ -223,24 +237,37 @@ def parquet_table_definition(table, partition_cols, schema, path):
if not partition_cols:
partition_cols = []
return {
"Name": table,
"PartitionKeys": [{"Name": x, "Type": "string"} for x in partition_cols],
"TableType": "EXTERNAL_TABLE",
"Name":
table,
"PartitionKeys": [{
"Name": x,
"Type": "string"
} for x in partition_cols],
"TableType":
"EXTERNAL_TABLE",
"Parameters": {
"classification": "parquet",
"compressionType": "none",
"typeOfData": "file",
},
"StorageDescriptor": {
"Columns": [{"Name": x[0], "Type": x[1]} for x in schema],
"Columns": [{
"Name": x[0],
"Type": x[1]
} for x in schema],
"Location": path,
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"InputFormat":
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat":
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"Compressed": False,
"NumberOfBuckets": -1,
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"Parameters": {"serialization.format": "1"},
"SerializationLibrary":
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"Parameters": {
"serialization.format": "1"
},
},
"StoredAsSubDirectories": False,
"SortColumns": [],
Expand All @@ -260,8 +287,11 @@ def parquet_partition_definition(partition):
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"Location": partition[0],
"SerdeInfo": {
"Parameters": {"serialization.format": "1"},
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"Parameters": {
"serialization.format": "1"
},
"SerializationLibrary":
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
},
"StoredAsSubDirectories": False,
},
Expand All @@ -271,14 +301,15 @@ def parquet_partition_definition(partition):
@staticmethod
def _parse_partitions_tuples(objects_paths, partition_cols):
paths = {f"{path.rpartition('/')[0]}/" for path in objects_paths}
return [
(
path,
Glue._parse_partition_values(path=path, partition_cols=partition_cols),
)
for path in paths
]
return [(
path,
Glue._parse_partition_values(path=path,
partition_cols=partition_cols),
) for path in paths]

@staticmethod
def _parse_partition_values(path, partition_cols):
return [re.search(f"/{col}=(.*?)/", path).group(1) for col in partition_cols]
return [
re.search(f"/{col}=(.*?)/", path).group(1)
for col in partition_cols
]
Loading

0 comments on commit 136828a

Please sign in to comment.