Skip to content

Commit

Permalink
Add mypy to pre-commit, and correct and export inlined type h…
Browse files Browse the repository at this point in the history
…ints (#28)
  • Loading branch information
graingert authored Jun 23, 2023
1 parent 5dc8d11 commit 8ccc092
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 94 deletions.
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8 doesn't support pyproject.toml yet https://github.com/PyCQA/flake8/issues/234
[flake8]
max-line-length = 104
21 changes: 18 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ repos:
rev: 5.12.0
hooks:
- id: isort
args:
- "--profile"
- "black"
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
Expand All @@ -24,3 +21,21 @@ repos:
- id: pyupgrade
args:
- --py38-plus
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
# Override default --ignore-missing-imports
# Use pyproject.toml if possible instead of adding command line parameters here
args: [--warn-unused-configs]
additional_dependencies:
# Type stubs
- types-setuptools
- boto3-stubs
- pytest
- dask
- deltalake
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include requirements.txt
include dask_deltatable/py.typed
8 changes: 7 additions & 1 deletion dask_deltatable/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
from .core import read_delta_history, read_delta_table, vacuum
from __future__ import annotations

__all__ = ["read_delta_history", "read_delta_table", "vacuum"]

from .core import read_delta_history as read_delta_history
from .core import read_delta_table as read_delta_table
from .core import vacuum as vacuum
3 changes: 0 additions & 3 deletions dask_deltatable/__init__.pyi

This file was deleted.

70 changes: 36 additions & 34 deletions dask_deltatable/core.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
from __future__ import annotations

import json
import os
from typing import Dict, List, Optional
from typing import Any
from urllib.parse import urlparse

import dask
import dask.dataframe as dd
import pyarrow.parquet as pq
import pyarrow.parquet as pq # type: ignore[import]
from dask.base import tokenize
from dask.dataframe.io import from_delayed
from dask.delayed import delayed
from deltalake import DataCatalog, DeltaTable
from fsspec.core import get_fs_token_paths
from fsspec.core import get_fs_token_paths # type: ignore[import]
from pyarrow import dataset as pa_ds


class DeltaTableWrapper:
path: str
version: int
columns: List[str]
datetime: str
storage_options: Dict[str, any]
version: int | None
columns: list[str] | None
datetime: str | None
storage_options: dict[str, Any] | None

def __init__(
self,
path: str,
version: int,
columns: List[str],
datetime: Optional[str] = None,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
version: int | None,
columns: list[str] | None,
datetime: str | None = None,
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
) -> None:
self.path: str = path
self.version: int = version
Expand All @@ -45,7 +47,7 @@ def __init__(
)
self.schema = self.dt.schema().to_pyarrow()

def read_delta_dataset(self, f: str, **kwargs: Dict[any, any]):
def read_delta_dataset(self, f: str, **kwargs: dict[Any, Any]):
schema = kwargs.pop("schema", None) or self.schema
filter = kwargs.pop("filter", None)
if filter:
Expand All @@ -68,7 +70,7 @@ def read_delta_dataset(self, f: str, **kwargs: Dict[any, any]):
.to_pandas()
)

def _make_meta_from_schema(self) -> Dict[str, str]:
def _make_meta_from_schema(self) -> dict[str, str]:
meta = self.schema.empty_table().to_pandas()
if self.columns:
meta = meta[self.columns]
Expand All @@ -82,7 +84,7 @@ def _history_helper(self, log_file_name: str):
if "commitInfo" in meta_data:
return meta_data["commitInfo"]

def history(self, limit: Optional[int] = None, **kwargs) -> dd.core.DataFrame:
def history(self, limit: int | None = None, **kwargs) -> dd.core.DataFrame:
delta_log_path = str(self.path).rstrip("/") + "/_delta_log"
log_files = self.fs.glob(f"{delta_log_path}/*.json")
if len(log_files) == 0: # pragma no cover
Expand Down Expand Up @@ -112,7 +114,7 @@ def _vacuum_helper(self, filename_to_delete: str) -> None:
)
self.fs.rm_file(self.path + "/" + filename_to_delete)

def vacuum(self, retention_hours: int = 168, dry_run: bool = True) -> None:
def vacuum(self, retention_hours: int = 168, dry_run: bool = True) -> list[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no
longer referenced by the Delta table and are older than the
Expand Down Expand Up @@ -143,7 +145,7 @@ def vacuum(self, retention_hours: int = 168, dry_run: bool = True) -> None:
]
return dask.compute(parts)[0]

def get_pq_files(self) -> List[str]:
def get_pq_files(self) -> list[str]:
"""
get the list of parquet files after loading the
current datetime version
Expand All @@ -170,7 +172,7 @@ def read_delta_table(self, **kwargs) -> dd.core.DataFrame:
]
meta = self._make_meta_from_schema()
verify_meta = kwargs.get("verify_meta", False)
return from_delayed(parts, meta=meta, verify_meta=verify_meta)
return from_delayed(parts, meta=meta, verify_meta=verify_meta) # type: ignore[return-value]


def _read_from_catalog(
Expand All @@ -197,15 +199,15 @@ def _read_from_catalog(


def read_delta_table(
path: Optional[str] = None,
catalog: Optional[str] = None,
database_name: str = None,
table_name: str = None,
version: int = None,
columns: List[str] = None,
storage_options: Dict[str, str] = None,
datetime: str = None,
delta_storage_options: Dict[str, str] = None,
path: str | None = None,
catalog: str | None = None,
database_name: str | None = None,
table_name: str | None = None,
version: int | None = None,
columns: list[str] | None = None,
storage_options: dict[str, str] | None = None,
datetime: str | None = None,
delta_storage_options: dict[str, str] | None = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -300,9 +302,9 @@ def read_delta_table(

def read_delta_history(
path: str,
limit: Optional[int] = None,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
limit: int | None = None,
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
) -> dd.core.DataFrame:
"""
Run the history command on the DeltaTable.
Expand Down Expand Up @@ -337,9 +339,9 @@ def vacuum(
path: str,
retention_hours: int = 168,
dry_run: bool = True,
storage_options: Dict[str, str] = None,
delta_storage_options: Dict[str, str] = None,
) -> None:
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
) -> list[str]:
"""
Run the Vacuum command on the Delta Table: list and delete
files no longer referenced by the Delta table and are
Expand All @@ -354,7 +356,7 @@ def vacuum(
Returns
-------
None or List of tombstones
List of tombstones
i.e the list of files no longer referenced by the Delta Table
and are older than the retention threshold.
"""
Expand Down
47 changes: 0 additions & 47 deletions dask_deltatable/core.pyi

This file was deleted.

Empty file added dask_deltatable/py.typed
Empty file.
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[tool.mypy]
strict = true
no_implicit_reexport = false
allow_incomplete_defs = true
allow_untyped_defs = true
warn_return_any = false
disallow_untyped_calls = false

[tool.isort]
profile = "black"
add_imports = ["from __future__ import annotations"]
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python

from __future__ import annotations

from setuptools import setup

with open("README.md", encoding="utf-8") as f:
Expand All @@ -21,7 +23,6 @@
"dev": ["pytest", "requests", "pytest-cov>=2.10.1"],
"s3": ["s3fs", "boto3"],
},
package_data={"dask_deltatable": ["*.pyi" "__init__.pyi", "core.pyi"]},
include_package_data=True,
zip_safe=False,
)
8 changes: 3 additions & 5 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import glob
import os
import zipfile
Expand Down Expand Up @@ -245,15 +247,11 @@ def test_catalog(simple_table):
dt = MagicMock()

def delta_mock(**kwargs):
from deltalake import DeltaTable

files = glob.glob(simple_table + "/*parquet")
dt.file_uris = MagicMock(return_value=files)
return dt

with patch(
"deltalake.DeltaTable.from_data_catalog", side_effect=delta_mock
) as mock:
with patch("deltalake.DeltaTable.from_data_catalog", side_effect=delta_mock):
os.environ["AWS_ACCESS_KEY_ID"] = "apple"
os.environ["AWS_SECRET_ACCESS_KEY"] = "greatsecret"
df = ddt.read_delta_table(
Expand Down

0 comments on commit 8ccc092

Please sign in to comment.