-
-
Notifications
You must be signed in to change notification settings - Fork 287
/
Copy pathvdobj.py
168 lines (139 loc) · 5.37 KB
/
vdobj.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from functools import wraps
import curses
import visidata
__all__ = ['ENTER', 'ALT', 'ESC', 'asyncthread', 'VisiData']
ENTER='Enter'
ALT=ESC='^['
# define @asyncthread for potentially long-running functions
# when function is called, instead launches a thread
def asyncthread(func):
'Function decorator, to make calls to `func()` spawn a separate thread if available.'
@wraps(func)
def _execAsync(*args, **kwargs):
if args and isinstance(args[0], visidata.BaseSheet): #1136: allow cancel of async methods on Sheet
if 'sheet' not in kwargs:
kwargs['sheet'] = args[0]
return visidata.vd.execAsync(func, *args, **kwargs)
return _execAsync
class VisiData(visidata.Extensible):
allPrefixes = ['g', 'z', 'Alt+'] # embig'g'en, 'z'mallify, Alt/Esc=User
@classmethod
def global_api(cls, func):
'Make global func() and identical vd.func()'
def _vdfunc(*args, **kwargs):
return getattr(visidata.vd, func.__name__)(*args, **kwargs)
visidata.vd.addGlobals({func.__name__: func})
setattr(cls, func.__name__, func)
return wraps(func)(_vdfunc)
def __init__(self):
self.sheets = [] # list of BaseSheet; all sheets on the sheet stack
self.allSheets = [] # list of all non-precious sheets ever pushed
self.lastErrors = []
self.pendingKeys = []
self.keystrokes = ''
self.scrFull = None
self._cmdlog = None
self.currentReplay = None
self.contexts = [self] # objects whose attributes are in the fallback context for eval/exec.
self.importingModule = None
self.importedModules = []
@property
def cursesEnabled(self):
return bool(self.scrFull)
def sheetstack(self, pane=0):
'Return list of sheets in given *pane*. pane=0 is the active pane. pane=-1 is the inactive pane.'
if pane == -1:
return list(vs for vs in self.sheets if vs.pane and (vs.pane != self.activePane))
else:
return list(vs for vs in self.sheets if vs.pane == (pane or self.activePane))
@property
def stackedSheets(self):
return list(vs for vs in self.sheets if vs.pane)
@property
def activeSheet(self):
'Return top sheet on sheets stack, or cmdlog if no sheets.'
for vs in self.sheets:
if vs.pane and vs.pane == self.activePane:
return vs
for vs in self.sheets:
if vs.pane and vs.pane != self.activePane:
return vs
return self._cmdlog
@property
def activeStack(self):
return self.sheetstack() or self.sheetstack(-1)
def __copy__(self):
'Dummy method for Extensible.init()'
pass
def finalInit(self):
'Initialize members specified in other modules with init()'
pass
@classmethod
def init(cls, membername, initfunc, **kwargs):
'Overload Extensible.init() to call finalInit instead of __init__'
oldinit = cls.finalInit
def newinit(self, *args, **kwargs):
oldinit(self, *args, **kwargs)
setattr(self, membername, initfunc())
cls.finalInit = newinit
super().init(membername, lambda: None, **kwargs)
def clearCaches(self):
'Invalidate internal caches between command inputs.'
visidata.Extensible.clear_all_caches()
def resetVisiData(self):
self.clearCaches() # we want vd to return a new VisiData object for each command
vd = visidata.vd # get the new vd
vd.cmdlog.rows = []
vd.sheets = []
vd.allSheets = []
return vd
def get_wch(self, scr):
try:
return scr.get_wch()
except AttributeError: #192 some packages don't have wide chars
k = scr.getch()
if k == -1: # mimic get_wch behavior
raise curses.error('no char ready')
return k
def drainPendingKeys(self, scr):
'''Call scr.get_wch() until no more keypresses are available. Return True if any keypresses are pending.'''
scr.timeout(0)
try:
while True:
k = self.get_wch(scr)
if k:
self.pendingKeys.append(k)
else:
break
except curses.error:
pass
finally:
scr.timeout(self.curses_timeout)
return bool(self.pendingKeys)
def getkeystroke(self, scr, vs=None):
'Get keystroke and display it on status bar.'
self.drainPendingKeys(scr)
k = None
if self.pendingKeys:
k = self.pendingKeys.pop(0)
else:
curses.reset_prog_mode() #1347
try:
scr.refresh()
k = self.get_wch(scr)
vs = vs or self.activeSheet
if vs:
self.drawRightStatus(vs._scr, vs) # continue to display progress %
except curses.error:
return '' # curses timeout
if isinstance(k, str):
if ord(k) >= 32 and ord(k) != 127: # 127 == DEL or ^?
return k
k = ord(k)
return curses.keyname(k).decode('utf-8')
@property
def screenHeight(self):
return self.scrFull.getmaxyx()[0] if self.scrFull else 25
@property
def screenWidth(self):
return self.scrFull.getmaxyx()[1] if self.scrFull else 80