Skip to content

Commit

Permalink
updated models
Browse files Browse the repository at this point in the history
  • Loading branch information
dalejung committed Jun 26, 2023
1 parent 1fb1c43 commit fe34d2f
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 61 deletions.
31 changes: 9 additions & 22 deletions nbx_deux/bundle_manager/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
`/root/frank/frank.txt` is the actual file.
"""
import os
import io
from pathlib import Path
import dataclasses as dc
from typing import cast
Expand All @@ -22,6 +21,10 @@
from IPython.utils import tz

from nbx_deux.models import BaseModel
from nbx_deux.fileio import (
_read_notebook,
_save_notebook,
)


@dc.dataclass(kw_only=True)
Expand Down Expand Up @@ -213,16 +216,6 @@ class NotebookBundlePath(BundlePath):
"""
bundle_model_class = NotebookBundleModel

@property
def notebook_content(self):
filepath = os.path.join(self.bundle_path, self.name)
with io.open(filepath, 'r', encoding='utf-8') as f:
try:
nb = nbformat.read(f, as_version=4)
except Exception:
nb = None
return nb

@classmethod
def valid_path(cls, os_path):
# basically a bundle with ipynb
Expand All @@ -234,20 +227,14 @@ def valid_path(cls, os_path):
def save_bundle_file(self, model):
nb = cast(nbformat.NotebookNode, nbformat.from_dict(model['content']))

# TODO: I don't remember why I did this...
if 'name' in nb.metadata:
nb.metadata['name'] = u''
try:
with io.open(self.bundle_file, 'w', encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
except Exception as e:
raise Exception((
'Unexpected error while autosaving notebook: '
f'{self.bundle_file} {e}'
))

_save_notebook(self.bundle_file, nb)

def get_bundle_file_content(self):
with io.open(self.bundle_file, 'r', encoding='utf-8') as f:
nb = nbformat.read(f, as_version=4)
nb = _read_notebook(self.bundle_file)
return nb


Expand All @@ -273,7 +260,7 @@ def get_bundle_file_content(self):
files = nb_bundle.files
assert 'howdy.txt' in files

content = nb_bundle.notebook_content
content = nb_bundle.get_bundle_file_content()
assert content == nb

model = nb_bundle.get_model(td)
Expand Down
2 changes: 1 addition & 1 deletion nbx_deux/bundle_manager/tests/test_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_notebook_bundle_file():
files = nb_bundle.files
assert 'howdy.txt' in files

content = nb_bundle.notebook_content
content = nb_bundle.get_bundle_file_content()
assert content == nb

model = nb_bundle.get_model(td)
Expand Down
199 changes: 199 additions & 0 deletions nbx_deux/fileio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""
Mishmash of io logic stripped from jupyter code that isn't entangled with the
Configurable and ContentsManager.
"""
from contextlib import contextmanager
import os.path
from fnmatch import fnmatch
from base64 import decodebytes, encodebytes
from jupyter_server.services.contents.fileio import (
path_to_intermediate,
path_to_invalid,
replace_file,
atomic_writing,
_simple_writing,
)
import nbformat
from nbformat import sign

from tornado.web import HTTPError
from jupyter_server import _tz as tz


def mark_trusted_cells(nb):
"""Mark cells as trusted if the notebook signature matches.
Called as a part of loading notebooks.
Parameters
----------
nb : dict
The notebook object (in current nbformat)
path : str
The notebook's path (for logging)
"""

notary = sign.NotebookNotary()
trusted = notary.check_signature(nb)
notary.mark_cells(nb, trusted)


def should_list(name, hide_globs):
"""Should this file/directory name be displayed in a listing?"""
return not any(fnmatch(name, glob) for glob in hide_globs)


def _read_notebook(
os_path,
as_version=4,
capture_validation_error=None,
use_atomic_writing=True
):
"""Read a notebook from an os path."""
with open(os_path, "r", encoding="utf-8") as f:
try:
return nbformat.read(
f,
as_version=as_version,
capture_validation_error=capture_validation_error
)
except Exception as e:
e_orig = e

# If use_atomic_writing is enabled, we'll guess that it was also
# enabled when this notebook was written and look for a valid
# atomic intermediate.
tmp_path = path_to_intermediate(os_path)

if not use_atomic_writing or not os.path.exists(tmp_path):
raise HTTPError(
400,
f"Unreadable Notebook: {os_path} {e_orig!r}",
)

# Move the bad file aside, restore the intermediate, and try again.
invalid_file = path_to_invalid(os_path)
replace_file(os_path, invalid_file)
replace_file(tmp_path, os_path)
return _read_notebook(
os_path,
as_version,
capture_validation_error=capture_validation_error,
use_atomic_writing=use_atomic_writing
)


def _save_notebook(
os_path,
nb,
capture_validation_error=None,
use_atomic_writing=True
):
"""Save a notebook to an os_path."""
with writing_cm(os_path, encoding="utf-8", use_atomic_writing=use_atomic_writing) as f:
nbformat.write(
nb,
f,
version=nbformat.NO_CONVERT,
capture_validation_error=capture_validation_error,
)


@contextmanager
def writing_cm(os_path, *args, use_atomic_writing=True, **kwargs):
"""wrapper around atomic_writing that turns permission errors to 403.
Depending on flag 'use_atomic_writing', the wrapper perform an actual atomic writing or
simply writes the file (whatever an old exists or not)"""
if use_atomic_writing:
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
else:
with _simple_writing(os_path, *args, **kwargs) as f:
yield f


def _save_file(os_path, content, format, use_atomic_writing=True):
"""Save content of a generic file."""
if format not in {"text", "base64"}:
raise HTTPError(
400,
"Must specify format of file contents as 'text' or 'base64'",
)

try:
if format == "text":
bcontent = content.encode("utf8")
else:
b64_bytes = content.encode("ascii")
bcontent = decodebytes(b64_bytes)

except Exception as e:
raise HTTPError(400, f"Encoding error saving {os_path}: {e}") from e

with writing_cm(os_path, text=False, use_atomic_writing=use_atomic_writing) as f:
f.write(bcontent)



def _read_file(os_path, format):
"""Read a non-notebook file.
os_path: The path to be read.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
if not os.path.isfile(os_path):
raise HTTPError(400, "Cannot read non-file %s" % os_path)

with open(os_path, "rb") as f:
bcontent = f.read()

if format is None or format == "text":
# Try to interpret as unicode if format is unknown or if unicode
# was explicitly requested.
try:
return bcontent.decode("utf8"), "text"
except UnicodeError as e:
if format == "text":
raise HTTPError(
400,
"%s is not UTF-8 encoded" % os_path,
reason="bad format",
) from e
return encodebytes(bcontent).decode("ascii"), "base64"


def ospath_is_writable(os_path):
try:
return os.access(os_path, os.W_OK)
except OSError:
return False


def get_ospath_metadata(os_path):
info = os.lstat(os_path)

size = None
try:
# size of file
size = info.st_size
except (ValueError, OSError):
pass

try:
last_modified = tz.utcfromtimestamp(info.st_mtime)
except (ValueError, OSError):
# Files can rarely have an invalid timestamp
# https://github.com/jupyter/notebook/issues/2539
# https://github.com/jupyter/notebook/issues/2757
# Use the Unix epoch as a fallback so we don't crash.
last_modified = datetime(1970, 1, 1, 0, 0, tzinfo=tz.UTC)

try:
created = tz.utcfromtimestamp(info.st_ctime)
except (ValueError, OSError): # See above
created = datetime(1970, 1, 1, 0, 0, tzinfo=tz.UTC)

return {'size': size, 'last_modified': last_modified, 'created': created}
Loading

0 comments on commit fe34d2f

Please sign in to comment.