Skip to content

Commit

Permalink
WIP: splitting notebook into component parts.
Browse files Browse the repository at this point in the history
  • Loading branch information
dalejung committed Jul 3, 2023
1 parent 9363dad commit af7b446
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 2 deletions.
40 changes: 38 additions & 2 deletions nbx_deux/bundle_manager/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
check_and_sign,
ospath_is_writable,
)
from nbx_deux.normalized_notebook import NormalizedNotebookPy


@dc.dataclass(frozen=True, kw_only=True)
Expand Down Expand Up @@ -73,12 +74,23 @@ class BundleModel(BaseModel):

@dc.dataclass(kw_only=True)
class NotebookBundleModel(BundleModel):
content: nbformat.NotebookNode
type: str = dc.field(default='notebook', init=False)
default_format: ClassVar = 'json'

def __repr__(self):
content = self.content
cell_count = len(content['cells'])
metadata = content['metadata']
notebook_node_repr = f"NotebookNode({cell_count=}, {metadata=})"
return (
f"NotebookBundleModel(name={self.name}, path={self.path}"
f", content={notebook_node_repr})"
)


class BundlePath:
bundle_model_class = BundleModel
bundle_model_class: ClassVar[type] = BundleModel

def __init__(self, bundle_path):
bundle_path = Path(bundle_path)
Expand All @@ -99,6 +111,8 @@ def __repr__(self):
def files(self):
"""
files keys will be name relative to bundle_path
NOTE: This is not recursive depth.
"""
try:
files = [
Expand Down Expand Up @@ -192,11 +206,14 @@ def write_files(self, model):
with open(filepath, 'w') as f:
f.write(fcontent)

def get_model(self, root_dir, content=True, file_content=None):
def get_model(self, root_dir=None, content=True, file_content=None):
# default getting file_content to content
if file_content is None:
file_content = content

if root_dir is None:
root_dir = self.bundle_path

os_path = self.bundle_file

bundle_file_content = None
Expand Down Expand Up @@ -264,10 +281,28 @@ def valid_path(cls, os_path):
return False
return cls.is_bundle(os_path)

def normalized_dir(self, nb: nbformat.NotebookNode):
normalized_dir = self.bundle_path.joinpath('_normalized')
return normalized_dir

def save_normalized(self, nb: nbformat.NotebookNode):
normalized_dir = self.normalized_dir(nb)
normalized_dir.mkdir(exist_ok=True, parents=True)

nnpy = NormalizedNotebookPy(nb)
content = nnpy.to_pyfile()
basename, ext = os.path.splitext(self.bundle_file.name)
new_filename = basename + '.py'
new_filepath = normalized_dir.joinpath(new_filename)
with open(new_filepath, 'w') as f:
f.write(content)

def save_bundle_file(self, model: NotebookModel):
nb = cast(nbformat.NotebookNode, nbformat.from_dict(model['content']))
check_and_sign(nb)
_save_notebook(self.bundle_file, nb)
# WIP
self.save_normalized(nb)

def get_bundle_file_content(self):
nb = _read_notebook(self.bundle_file)
Expand All @@ -288,3 +323,4 @@ def get_bundle_file_content(self):

new_model = bundle.get_model(td)
assert new_model['content'] == nb
assert bundle.bundle_path.joinpath('_normalized/example.py').exists()
32 changes: 32 additions & 0 deletions nbx_deux/nb_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from enum import StrEnum, auto
from nbformat.validator import get_validator

class NBOutputType(StrEnum):
EXECUTE_RESULT = auto()
DISPLAY_DATA = auto()
STREAM = auto()
ERROR = auto()


class NBSection:
def __init__(self, id, cell_type):
self.id = id
self.cell_type = cell_type
self.lines = []

def append(self, line):
self.lines.append(line)

def __repr__(self):
return f"NBSection(id={self.id}, cell_type={self.cell_type})"


class NBOutput:
def __init__(self, output_type):
self.output_type = output_type


if __name__ == '__main__':
validator = get_validator()
schema = validator._schema
definitions = schema['definitions']
137 changes: 137 additions & 0 deletions nbx_deux/normalized_notebook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from functools import cached_property
from pathlib import Path
from typing import cast
import copy

from nbformat import NotebookNode


from nbx_deux.nb_model import (
NBSection,
NBOutput,
NBOutputType,
)


class NormalizedNotebookPy:
def __init__(self, notebooknode: NotebookNode):
self.notebooknode = notebooknode
self.cell_map = {cell['id']: cell for cell in notebooknode['cells']}

def get_pyheader(self, cell):
header = f"# id={cell['id']} cell_type={cell['cell_type']}"
return header

@cached_property
def components(self):
# gonna pare down skeleton by removing source / outputs
skeleton = copy.deepcopy(self.notebooknode)
source_cells = {}
all_outputs = {}

for cell in skeleton['cells']:
id = cell['id']
source_cells[id] = {
'id': id,
'cell_type': cell['cell_type'],
'source': cell.pop('source'),
}

if outputs := cell.pop('outputs', None):
sentinel_outputs = []
for output in outputs:
sentinel = {
'output_type': output['output_type'],
}
sentinel_outputs.append(sentinel)
cell['outputs'] = sentinel_outputs
all_outputs[id] = outputs

return {
'skeleton': skeleton,
'source_cells': source_cells,
'all_outputs': all_outputs
}

def munge_source(self, source):
source = source.strip()

if source.startswith('%%'):
source = f'"""\n{source}\n"""'
return source

lines = []
for line in source.split('\n'):
if line.startswith('!') or line.startswith('%'):
line = f"# |{line}|"
lines.append(line)

source = '\n'.join(lines)
return source


def to_pyfile(self):
outs = []
for id, cell in self.components['source_cells'].items():
cell_type = cell['cell_type']
source = cell['source']

source = self.munge_source(source)
if not source.strip():
continue

header = self.get_pyheader(cell)

out = ""
match cell_type:
case 'code':
out = f"{header}\n\n{source}"
case 'markdown':
out = f'{header}\n\n"""\n{source}\n"""'
case _:
raise Exception(f"Dunno how to handle this {cell_type=}")

outs.append(out)

return "\n\n".join(outs)


def notebooknode_to_nnpy(nb_node: NotebookNode):
return NormalizedNotebookPy(nb_node)


def parse_nnpy_header(line):
if not line.startswith('#'):
return

info = {}
bits = line[1:].split(' ')
for bit in bits:
if not bit.strip():
continue
try:
k, v = bit.split('=')
info[k] = v
except Exception:
pass

# we require at least id/cell_type
if not {'id', 'cell_type'}.issubset(set(info.keys())):
return

return info


def nnpy_to_sections(content):
lines = content.splitlines()
sections = {}
section = None
for line in lines:
header = parse_nnpy_header(line)
if header:
section = sections.setdefault(header['id'], NBSection(**header))
else:
if section is None:
raise Exception(f"File did not start with header {line}")
section.append(line)
return sections

0 comments on commit af7b446

Please sign in to comment.