Skip to content

Commit

Permalink
reorg a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
dalejung committed Jun 26, 2023
1 parent 626d787 commit b1b1273
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 62 deletions.
39 changes: 38 additions & 1 deletion nbx_deux/bundle_manager/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import os
from pathlib import Path
import dataclasses as dc
from typing import cast
from typing import Literal, cast

import nbformat
from IPython.utils import tz
Expand All @@ -27,6 +27,41 @@
)


@dc.dataclass(frozen=True, kw_only=True)
class PathItem:
path: Path
type: Literal['file', 'directory']
is_bundle: bool = False

@property
def is_regular_file(self):
return self.type == 'file' and not self.is_bundle


def bundle_get_path_item(os_path):
os_path = Path(os_path)
type = 'file'
is_bundle = False

if os_path.is_dir():
if BundlePath.valid_path(os_path):
type = 'file'
is_bundle = True
else:
type = 'directory'
item = PathItem(path=os_path, type=type, is_bundle=is_bundle)
return item


def bundle_list_dir(os_path):
os_path = Path(os_path)
content = []
for p in os_path.iterdir():
item = bundle_get_path_item(p)
content.append(item)
return content


@dc.dataclass(kw_only=True)
class BundleModel(BaseModel):
type: str = dc.field(default='file', init=False)
Expand Down Expand Up @@ -270,3 +305,5 @@ def get_bundle_file_content(self):
assert model.bundle_files['howdy.txt'] == 'howdy'
assert model.name == 'example.ipynb'
assert model.path == 'subdir/example.ipynb'

items = bundle_list_dir(subdir)
100 changes: 39 additions & 61 deletions nbx_deux/bundle_manager/bundle_nbmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from nbx_deux.models import DirectoryModel

from ..nbx_manager import NBXContentsManager, ApiPath
from .bundle import NotebookBundlePath, BundlePath
from .bundle import NotebookBundlePath, BundlePath, bundle_get_path_item


class BundleContentsManager(FileManagerMixin, NBXContentsManager):
Expand All @@ -22,23 +22,6 @@ class BundleContentsManager(FileManagerMixin, NBXContentsManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fm = FileContentsManager(root_dir=self.root_dir)
self.bundle_class = NotebookBundlePath

def _fcm_file_type(self, path, type):
"""
Just replicating FCM logic and validation for path / type
"""
os_path = self._get_os_path(path=path)
if os.path.isdir(os_path):
if type not in (None, "directory"):
raise Exception(f"{path} is a directory not a {type}")
return 'directory'
elif type == "notebook" or (type is None and path.endswith(".ipynb")):
return 'notebook'
else:
if type == "directory":
raise Exception(f"{path} is not a directory")
return 'file'

def is_bundle(self, path: ApiPath | Path):
if isinstance(path, Path) and path.is_absolute():
Expand All @@ -47,30 +30,54 @@ def is_bundle(self, path: ApiPath | Path):
os_path = self._get_os_path(path=path)
return BundlePath.valid_path(os_path)

def get_bundle(self, path: ApiPath):
def get_bundle(self, path: ApiPath, type=None):
os_path = self._get_os_path(path=path)
if self.bundle_class.valid_path(os_path):
bundle = self.bundle_class(os_path)
if type == "notebook" or (type is None and path.endswith(".ipynb")):
bundle = NotebookBundlePath(os_path)
else:
bundle = BundlePath(os_path)
return bundle

def get(self, path, content=True, type=None, format=None):
"""
TODO: I imagine I could create a multi step process where:
1. List all directory content where files and bundle are same just with an is_bundle
2. Run the same fcm_type logic on paths
3. Dispatch to fcm / bundle depending on is_bundle flag.
The idea being that on some level bundle files should act transparently as single files.
"""
os_path = self._get_os_path(path=path)
path_item = bundle_get_path_item(os_path)
# TODO: Someday we might allow accessing other files in bundle. But that's later.
# This will also handle any non bundle notebooks
if os.path.isfile(os_path):
return self.fm.get(path, content=content, type=type, format=format)
if path_item.type == 'file':
if type == "directory":
raise Exception(f"{path} is not a directory")
if not path_item.is_bundle:
# regular files use fm
return self.fm.get(path, content=content, type=type, format=format)
else:
return self.bundle_get(path, content=content, type=type, format=format)

# non content directories can just use fcm.
if path_item.type == 'directory':
if type not in (None, "directory"):
raise Exception(f"{path} is a directory not a {type}")

if content is not True:
return self.fm.get(path, content=False)
else:
return self.get_dir(path, content)

return self.bundle_get(path, content=content, type=type, format=format)
raise Exception(f"Unable to handle {path}")

def get_dir(self, path, content=True):
os_path = self._get_os_path(path=path)
model = DirectoryModel.from_filepath(
os_path,
self.root_dir,
content=content,
model_get=self.get, # use out CM.get logic
)
return model

def bundle_get(self, path, content=True, type=None, format=None):
bundle = self.get_bundle(path, type=type)
model = bundle.get_model(self.root_dir, content=content)
return model

def save(self, model, path):
if self.is_bundle(path):
Expand Down Expand Up @@ -114,35 +121,6 @@ def save_file(self, model, path=''):
def is_hidden(self, path):
return self.fm.is_hidden(path)

def _dir_model(self, path):
model = {}
model['name'] = path.rsplit('/', 1)[-1]
model['path'] = path
model['type'] = 'directory'
return model

def bundle_get(self, path, content=True, type=None, format=None):
fcm_type = self._fcm_file_type(path, type)

if self.is_bundle(path):
bundle = self.get_bundle(path)
model = bundle.get_model(self.root_dir, content=content)
return model

# non content directories can just use fcm.
# NOTE: We first have to check that the directory is not a bundle.
if fcm_type == 'directory' and content is not True:
return self.fm.get(path, content=False)

os_path = self._get_os_path(path=path)
model = DirectoryModel.from_filepath(
os_path,
self.root_dir,
content=content,
model_get=self.get, # use out CM.get logic
)
return model

def delete_bundle(self, path):
if not self.is_bundle(path):
return self.fm.delete_file(path)
Expand Down

0 comments on commit b1b1273

Please sign in to comment.