Skip to content

Commit

Permalink
Add s3.delete_objects_prefix() and more
Browse files Browse the repository at this point in the history
  • Loading branch information
igorborgest committed Feb 20, 2020
1 parent 2062841 commit 8960c6c
Show file tree
Hide file tree
Showing 18 changed files with 365 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[flake8]
max-line-length = 120
max-line-length = 80
ignore = E501,E126,W503
2 changes: 1 addition & 1 deletion .style.yapf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[style]
BASED_ON_STYLE = pep8
SPACES_BEFORE_COMMENT = 2
COLUMN_LIMIT = 120
COLUMN_LIMIT = 80
27 changes: 2 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,10 @@

> Pandas on AWS
## IMPORTANT NOTE: Version 1.0.0 coming soon with several breaking changes.

Please, pin the version you are using on your environment.

AWS Data Wrangler is completing 1 year, and the team is working to collect feedbacks and features requests to put in our 1.0.0 version. By now we have 3 major changes listed:

- API redesign
- Nested data types support
- Deprecation of PySpark support
- PySpark support takes considerable part of the development time and it has not been reflected in user adoption. Only 2 of our 66 issues on GitHub are related to Spark.
- In addition, the integration between PySpark and PyArrow/Pandas remains in experimental stage and we have been experiencing tough times to keep it stable.

---

[![Release](https://img.shields.io/badge/release-0.3.2-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-1.0.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/?badge=latest)
[![Coverage](https://img.shields.io/badge/coverage-89%25-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/awslabs/aws-data-wrangler.svg)](http://isitmaintained.com/project/awslabs/aws-data-wrangler "Average time to resolve an issue")
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

Expand All @@ -42,15 +28,6 @@ AWS Data Wrangler is completing 1 year, and the team is working to collect feedb

## Use Cases

### PySpark

| FROM | TO | Features |
|-----------------------------|---------------------------|------------------------------------------------------------------------------------------|
| PySpark DataFrame | Amazon Redshift | Blazing fast using parallel parquet on S3 behind the scenesAppend/Overwrite/Upsert modes |
| PySpark DataFrame | Glue Catalog | Register Parquet or CSV DataFrame on Glue Catalog |
| Nested PySpark<br>DataFrame | Flat PySpark<br>DataFrames| Flatten structs and break up arrays in child tables |


### Pandas

| FROM | TO | Features |
Expand Down
28 changes: 3 additions & 25 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,15 @@
"""

from logging import NullHandler, getLogger
from typing import Optional, Type

from awswrangler import exceptions # noqa
from awswrangler import utils # noqa
from awswrangler.__metadata__ import __description__, __license__, __title__, __version__ # noqa
from awswrangler.__metadata__ import __version__ # noqa
from awswrangler.s3 import S3 as _S3
from awswrangler.session import Session

default_session: Session = Session()

class _LazyDefaultSession:
"""Class to instantiate classes with the default Session dynamically and lazily."""

__default_session: Optional[Session] = None

@staticmethod
def get_class(class_ref: Type[_S3]) -> _S3:
"""Return an instance of the received class.
Also instantiate the default Session if necessary.
Parameters
----------
class_ref
Reference to the class that will be instantiate.
"""
if _LazyDefaultSession.__default_session is None:
_LazyDefaultSession.__default_session = Session()
return class_ref(_LazyDefaultSession.__default_session)


s3: _S3 = _LazyDefaultSession().get_class(_S3)
s3: _S3 = default_session.s3

getLogger("awswrangler").addHandler(NullHandler())
6 changes: 6 additions & 0 deletions awswrangler/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ class S3WaitObjectTimeout(Exception):
"""Raise after wr.s3.wait_object_exists() reaches the timeout."""

pass


class AWSCredentialsNotFound(Exception):
"""Boto3 didn't find any AWS credential."""

pass
143 changes: 131 additions & 12 deletions awswrangler/s3.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Amazon S3 Module."""

import multiprocessing as mp
from logging import Logger, getLogger
from time import sleep
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from botocore.exceptions import ClientError # type: ignore

from awswrangler import utils
from awswrangler.exceptions import S3WaitObjectTimeout

if TYPE_CHECKING:
from awswrangler.session import Session # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from awswrangler.session import Session, _SessionPrimitives
import boto3 # type: ignore

logger: Logger = getLogger(__name__)

Expand Down Expand Up @@ -65,7 +68,10 @@ def does_object_exists(self, path: str) -> bool:
return False
raise ex # pragma: no cover

def wait_object_exists(self, path: str, polling_sleep: float = 0.1, timeout: Optional[float] = 10.0) -> None:
def wait_object_exists(self,
path: str,
polling_sleep: float = 0.1,
timeout: Optional[float] = 10.0) -> None:
"""Wait object exists on S3.
Parameters
Expand Down Expand Up @@ -99,7 +105,8 @@ def wait_object_exists(self, path: str, polling_sleep: float = 0.1, timeout: Opt
if timeout is not None:
time_acc += polling_sleep
if time_acc >= timeout:
raise S3WaitObjectTimeout(f"Waited for {path} for {time_acc} seconds")
raise S3WaitObjectTimeout(
f"Waited for {path} for {time_acc} seconds")

@staticmethod
def parse_path(path: str) -> Tuple[str, str]:
Expand Down Expand Up @@ -150,7 +157,8 @@ def get_bucket_region(self, bucket: str) -> str:
"""
logger.debug(f"bucket: {bucket}")
region: str = self._session.s3_client.get_bucket_location(Bucket=bucket)["LocationConstraint"]
region: str = self._session.s3_client.get_bucket_location(
Bucket=bucket)["LocationConstraint"]
region = "us-east-1" if region is None else region
logger.debug(f"region: {region}")
return region
Expand All @@ -166,7 +174,7 @@ def list_objects(self, path: str) -> List[str]:
Returns
-------
List[str]
List of objects paths
List of objects paths.
Examples
--------
Expand All @@ -179,11 +187,122 @@ def list_objects(self, path: str) -> List[str]:
bucket: str
prefix: str
bucket, prefix = self.parse_path(path=path)
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000})
response_iterator = paginator.paginate(
Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000})
paths: List[str] = []
for page in response_iterator:
for content in page.get("Contents"):
if (content is not None) and ("Key" in content):
key: str = content["Key"]
paths.append(f"s3://{bucket}/{key}")
if page.get("Contents") is not None:
for content in page.get("Contents"):
if (content is not None) and ("Key" in content):
key: str = content["Key"]
paths.append(f"s3://{bucket}/{key}")
return paths

def delete_objects_list(self,
paths: List[str],
parallel: bool = True) -> None:
"""Delete all listed Amazon S3 objects.
Note
----
In case of `parallel=True` the number of process that will be spawned will be get from os.cpu_count().
Parameters
----------
paths : str
S3 path (e.g. s3://bucket/prefix).
parallel : bool
True to enable parallel requests, False to disable.
Returns
-------
None
None.
Examples
--------
>>> import awswrangler as wr
>>> wr.s3.delete_objects_list(["s3://bucket/key0", "s3://bucket/key1"])
"""
cpus: int = utils.get_cpu_count(parallel=parallel)
buckets: Dict[str, List[str]] = self._split_paths_by_bucket(paths=paths)
for bucket, keys in buckets.items():
if cpus == 1:
self._delete_objects(s3_client=self._session.s3_client,
bucket=bucket,
keys=keys)
else:
chunks: List[List[str]] = utils.chunkify(lst=keys,
num_chunks=cpus)
procs: List[mp.Process] = []
for chunk in chunks:
proc: mp.Process = mp.Process(
target=self._delete_objects_remote,
args=(
self._session.primitives,
bucket,
chunk,
),
)
proc.daemon = False
proc.start()
procs.append(proc)
for proc in procs:
proc.join()

@staticmethod
def _delete_objects_remote(session_primitives: "_SessionPrimitives",
bucket: str, keys: List[str]) -> None:
session: "Session" = session_primitives.build_session()
s3_client: boto3.client = session.s3_client
S3._delete_objects(s3_client=s3_client, bucket=bucket, keys=keys)

@staticmethod
def _delete_objects(s3_client: "boto3.client", bucket: str,
keys: List[str]) -> None:
chunks: List[List[str]] = utils.chunkify(lst=keys, max_length=1_000)
logger.debug(f"len(chunks): {len(chunks)}")
for chunk in chunks:
batch: List[Dict[str, str]] = [{"Key": key} for key in chunk]
s3_client.delete_objects(Bucket=bucket, Delete={"Objects": batch})

@staticmethod
def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]:
buckets: Dict[str, List[str]] = {}
bucket: str
key: str
for path in paths:
bucket, key = S3.parse_path(path=path)
if bucket not in buckets:
buckets[bucket] = []
buckets[bucket].append(key)
return buckets

def delete_objects_prefix(self, path: str, parallel: bool = True) -> None:
"""Delete all Amazon S3 objects under the received prefix.
Note
----
In case of `parallel=True` the number of process that will be spawned will be get from os.cpu_count().
Parameters
----------
path : str
S3 prefix path (e.g. s3://bucket/prefix).
parallel : bool
True to enable parallel requests, False to disable.
Returns
-------
None
None.
Examples
--------
>>> import awswrangler as wr
>>> wr.s3.delete_objects_prefix(path="s3://bucket/prefix"])
"""
paths: List[str] = self.list_objects(path=path)
self.delete_objects_list(paths=paths, parallel=parallel)
Loading

0 comments on commit 8960c6c

Please sign in to comment.