forked from frostming/marko
-
Notifications
You must be signed in to change notification settings - Fork 0
/
source.py
157 lines (129 loc) · 4.91 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations
import functools
import re
import types
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator, Match, Pattern, cast, overload
from marko.block import BlockElement, Document
if TYPE_CHECKING:
from typing import Literal
from marko.parser import Parser
def _preprocess_text(text: str) -> str:
return text.replace("\r\n", "\n")
class Source:
"""Wrapper class on content to be parsed"""
parser: Parser
def __init__(self, text: str) -> None:
self._buffer = _preprocess_text(text)
self.pos = 0
self._anchor = 0
self._states: list[BlockElement] = []
self.match: Match[str] | None = None
#: Store temporary data during parsing.
self.context = types.SimpleNamespace()
@property
def state(self) -> BlockElement:
"""Returns the current element state."""
if not self._states:
raise RuntimeError("Need to push a state first.")
return self._states[-1]
@property
def root(self) -> Document:
"""Returns the root element, which is at the bottom of self._states."""
if not self._states:
raise RuntimeError("Need to push a state first.")
return cast(Document, self._states[0])
def push_state(self, element: BlockElement) -> None:
"""Push a new state to the state stack."""
self._states.append(element)
def pop_state(self) -> BlockElement:
"""Pop the top most state."""
return self._states.pop()
@contextmanager
def under_state(self, element: BlockElement) -> Generator[Source, None, None]:
"""A context manager to enable a new state temporarily."""
self.push_state(element)
yield self
self.pop_state()
@property
def exhausted(self) -> bool:
"""Indicates whether the source reaches the end."""
return self.pos >= len(self._buffer)
@property
def prefix(self) -> str:
"""The prefix of each line when parsing."""
return "".join(s._prefix for s in self._states)
def _expect_re(self, regexp: Pattern[str] | str, pos: int) -> Match[str] | None:
if isinstance(regexp, str):
regexp = re.compile(regexp)
return regexp.match(self._buffer, pos)
@staticmethod
@functools.lru_cache()
def match_prefix(prefix: str, line: str) -> int:
"""Check if the line starts with given prefix and
return the position of the end of prefix.
If the prefix is not matched, return -1.
"""
m = re.match(prefix, line.expandtabs(4))
if not m:
if re.match(prefix, line.expandtabs(4).replace("\n", " " * 99 + "\n")):
return len(line) - 1
return -1
pos = m.end()
if pos == 0:
return 0
for i in range(1, len(line) + 1):
if len(line[:i].expandtabs(4)) >= pos:
return i
return -1 # pragma: no cover
def expect_re(self, regexp: Pattern[str] | str) -> Match[str] | None:
"""Test against the given regular expression and returns the match object.
:param regexp: the expression to be tested.
:returns: the match object.
"""
prefix_len = self.match_prefix(
self.prefix, self.next_line(require_prefix=False) # type: ignore
)
if prefix_len >= 0:
match = self._expect_re(regexp, self.pos + prefix_len)
self.match = match
return match
else:
return None
@overload
def next_line(self, require_prefix: Literal[False] = ...) -> str:
...
@overload
def next_line(self, require_prefix: Literal[True] = ...) -> str | None:
...
def next_line(self, require_prefix: bool = True) -> str | None:
"""Return the next line in the source.
:param require_prefix: if False, the whole line will be returned.
otherwise, return the line with prefix stripped or None if the prefix
is not matched.
"""
if require_prefix:
m = self.expect_re(r"(?m)[^\n]*?$\n?")
else:
m = self._expect_re(r"(?m)[^\n]*$\n?", self.pos)
self.match = m
if m:
return m.group()
return None
def consume(self) -> None:
"""Consume the body of source. ``pos`` will move forward."""
if self.match:
self.pos = self.match.end()
if self.match.group()[-1:] == "\n":
self._update_prefix()
self.match = None
def anchor(self) -> None:
"""Pin the current parsing position."""
self._anchor = self.pos
def reset(self) -> None:
"""Reset the position to the last anchor."""
self.pos = self._anchor
def _update_prefix(self) -> None:
for s in self._states:
if hasattr(s, "_second_prefix"):
s._prefix = s._second_prefix # type: ignore