forked from python/mypy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdmypy_server.py
1110 lines (972 loc) · 43.7 KB
/
dmypy_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Server for mypy daemon mode.
This implements a daemon process which keeps useful state in memory
to enable fine-grained incremental reprocessing of changes.
"""
from __future__ import annotations
import argparse
import base64
import io
import json
import os
import pickle
import subprocess
import sys
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from typing import AbstractSet, Any, Callable, Final, List, Sequence, Tuple
from typing_extensions import TypeAlias as _TypeAlias
import mypy.build
import mypy.errors
import mypy.main
from mypy.dmypy_util import WriteToConn, receive, send
from mypy.find_sources import InvalidSourceList, create_source_list
from mypy.fscache import FileSystemCache
from mypy.fswatcher import FileData, FileSystemWatcher
from mypy.inspections import InspectionEngine
from mypy.ipc import IPCServer
from mypy.modulefinder import BuildSource, FindModuleCache, SearchPaths, compute_search_paths
from mypy.options import Options
from mypy.server.update import FineGrainedBuildManager, refresh_suppressed_submodules
from mypy.suggestions import SuggestionEngine, SuggestionFailure
from mypy.typestate import reset_global_state
from mypy.util import FancyFormatter, count_stats
from mypy.version import __version__
MEM_PROFILE: Final = False # If True, dump memory profile after initialization
if sys.platform == "win32":
from subprocess import STARTUPINFO
def daemonize(
options: Options, status_file: str, timeout: int | None = None, log_file: str | None = None
) -> int:
"""Create the daemon process via "dmypy daemon" and pass options via command line
When creating the daemon grandchild, we create it in a new console, which is
started hidden. We cannot use DETACHED_PROCESS since it will cause console windows
to pop up when starting. See
https://github.com/python/cpython/pull/4150#issuecomment-340215696
for more on why we can't have nice things.
It also pickles the options to be unpickled by mypy.
"""
command = [sys.executable, "-m", "mypy.dmypy", "--status-file", status_file, "daemon"]
pickled_options = pickle.dumps(options.snapshot())
command.append(f'--options-data="{base64.b64encode(pickled_options).decode()}"')
if timeout:
command.append(f"--timeout={timeout}")
if log_file:
command.append(f"--log-file={log_file}")
info = STARTUPINFO()
info.dwFlags = 0x1 # STARTF_USESHOWWINDOW aka use wShowWindow's value
info.wShowWindow = 0 # SW_HIDE aka make the window invisible
try:
subprocess.Popen(command, creationflags=0x10, startupinfo=info) # CREATE_NEW_CONSOLE
return 0
except subprocess.CalledProcessError as e:
return e.returncode
else:
def _daemonize_cb(func: Callable[[], None], log_file: str | None = None) -> int:
"""Arrange to call func() in a grandchild of the current process.
Return 0 for success, exit status for failure, negative if
subprocess killed by signal.
"""
# See https://stackoverflow.com/questions/473620/how-do-you-create-a-daemon-in-python
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid:
# Parent process: wait for child in case things go bad there.
npid, sts = os.waitpid(pid, 0)
sig = sts & 0xFF
if sig:
print("Child killed by signal", sig)
return -sig
sts = sts >> 8
if sts:
print("Child exit status", sts)
return sts
# Child process: do a bunch of UNIX stuff and then fork a grandchild.
try:
os.setsid() # Detach controlling terminal
os.umask(0o27)
devnull = os.open("/dev/null", os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
os.close(devnull)
pid = os.fork()
if pid:
# Child is done, exit to parent.
os._exit(0)
# Grandchild: run the server.
if log_file:
sys.stdout = sys.stderr = open(log_file, "a", buffering=1)
fd = sys.stdout.fileno()
os.dup2(fd, 2)
os.dup2(fd, 1)
func()
finally:
# Make sure we never get back into the caller.
os._exit(1)
def daemonize(
options: Options, status_file: str, timeout: int | None = None, log_file: str | None = None
) -> int:
"""Run the mypy daemon in a grandchild of the current process
Return 0 for success, exit status for failure, negative if
subprocess killed by signal.
"""
return _daemonize_cb(Server(options, status_file, timeout).serve, log_file)
# Server code.
CONNECTION_NAME: Final = "dmypy"
def process_start_options(flags: list[str], allow_sources: bool) -> Options:
_, options = mypy.main.process_options(
["-i"] + flags, require_targets=False, server_options=True
)
if options.report_dirs:
print("dmypy: Ignoring report generation settings. Start/restart cannot generate reports.")
if options.junit_xml:
print(
"dmypy: Ignoring report generation settings. "
"Start/restart does not support --junit-xml. Pass it to check/recheck instead"
)
options.junit_xml = None
if not options.incremental:
sys.exit("dmypy: start/restart should not disable incremental mode")
if options.follow_imports not in ("skip", "error", "normal"):
sys.exit("dmypy: follow-imports=silent not supported")
return options
def ignore_suppressed_imports(module: str) -> bool:
"""Can we skip looking for newly unsuppressed imports to module?"""
# Various submodules of 'encodings' can be suppressed, since it
# uses module-level '__getattr__'. Skip them since there are many
# of them, and following imports to them is kind of pointless.
return module.startswith("encodings.")
ModulePathPair: _TypeAlias = Tuple[str, str]
ModulePathPairs: _TypeAlias = List[ModulePathPair]
ChangesAndRemovals: _TypeAlias = Tuple[ModulePathPairs, ModulePathPairs]
class Server:
# NOTE: the instance is constructed in the parent process but
# serve() is called in the grandchild (by daemonize()).
def __init__(self, options: Options, status_file: str, timeout: int | None = None) -> None:
"""Initialize the server with the desired mypy flags."""
self.options = options
# Snapshot the options info before we muck with it, to detect changes
self.options_snapshot = options.snapshot()
self.timeout = timeout
self.fine_grained_manager: FineGrainedBuildManager | None = None
if os.path.isfile(status_file):
os.unlink(status_file)
self.fscache = FileSystemCache()
options.raise_exceptions = True
options.incremental = True
options.fine_grained_incremental = True
options.show_traceback = True
if options.use_fine_grained_cache:
# Using fine_grained_cache implies generating and caring
# about the fine grained cache
options.cache_fine_grained = True
else:
options.cache_dir = os.devnull
# Fine-grained incremental doesn't support general partial types
# (details in https://github.com/python/mypy/issues/4492)
options.local_partial_types = True
self.status_file = status_file
# Since the object is created in the parent process we can check
# the output terminal options here.
self.formatter = FancyFormatter(sys.stdout, sys.stderr, options.hide_error_codes)
def _response_metadata(self) -> dict[str, str]:
py_version = f"{self.options.python_version[0]}_{self.options.python_version[1]}"
return {"platform": self.options.platform, "python_version": py_version}
def serve(self) -> None:
"""Serve requests, synchronously (no thread or fork)."""
command = None
server = IPCServer(CONNECTION_NAME, self.timeout)
orig_stdout = sys.stdout
orig_stderr = sys.stderr
try:
with open(self.status_file, "w") as f:
json.dump({"pid": os.getpid(), "connection_name": server.connection_name}, f)
f.write("\n") # I like my JSON with a trailing newline
while True:
with server:
data = receive(server)
sys.stdout = WriteToConn(server, "stdout", sys.stdout.isatty())
sys.stderr = WriteToConn(server, "stderr", sys.stderr.isatty())
resp: dict[str, Any] = {}
if "command" not in data:
resp = {"error": "No command found in request"}
else:
command = data["command"]
if not isinstance(command, str):
resp = {"error": "Command is not a string"}
else:
command = data.pop("command")
try:
resp = self.run_command(command, data)
except Exception:
# If we are crashing, report the crash to the client
tb = traceback.format_exception(*sys.exc_info())
resp = {"error": "Daemon crashed!\n" + "".join(tb)}
resp.update(self._response_metadata())
resp["final"] = True
send(server, resp)
raise
resp["final"] = True
try:
resp.update(self._response_metadata())
send(server, resp)
except OSError:
pass # Maybe the client hung up
if command == "stop":
reset_global_state()
sys.exit(0)
finally:
# Revert stdout/stderr so we can see any errors.
sys.stdout = orig_stdout
sys.stderr = orig_stderr
# If the final command is something other than a clean
# stop, remove the status file. (We can't just
# simplify the logic and always remove the file, since
# that could cause us to remove a future server's
# status file.)
if command != "stop":
os.unlink(self.status_file)
try:
server.cleanup() # try to remove the socket dir on Linux
except OSError:
pass
exc_info = sys.exc_info()
if exc_info[0] and exc_info[0] is not SystemExit:
traceback.print_exception(*exc_info)
def run_command(self, command: str, data: dict[str, object]) -> dict[str, object]:
"""Run a specific command from the registry."""
key = "cmd_" + command
method = getattr(self.__class__, key, None)
if method is None:
return {"error": f"Unrecognized command '{command}'"}
else:
if command not in {"check", "recheck", "run"}:
# Only the above commands use some error formatting.
del data["is_tty"]
del data["terminal_width"]
ret = method(self, **data)
assert isinstance(ret, dict)
return ret
# Command functions (run in the server via RPC).
def cmd_status(self, fswatcher_dump_file: str | None = None) -> dict[str, object]:
"""Return daemon status."""
res: dict[str, object] = {}
res.update(get_meminfo())
if fswatcher_dump_file:
data = self.fswatcher.dump_file_data() if hasattr(self, "fswatcher") else {}
# Using .dumps and then writing was noticeably faster than using dump
s = json.dumps(data)
with open(fswatcher_dump_file, "w") as f:
f.write(s)
return res
def cmd_stop(self) -> dict[str, object]:
"""Stop daemon."""
# We need to remove the status file *before* we complete the
# RPC. Otherwise a race condition exists where a subsequent
# command can see a status file from a dying server and think
# it is a live one.
os.unlink(self.status_file)
return {}
def cmd_run(
self,
version: str,
args: Sequence[str],
export_types: bool,
is_tty: bool,
terminal_width: int,
) -> dict[str, object]:
"""Check a list of files, triggering a restart if needed."""
stderr = io.StringIO()
stdout = io.StringIO()
try:
# Process options can exit on improper arguments, so we need to catch that and
# capture stderr so the client can report it
with redirect_stderr(stderr):
with redirect_stdout(stdout):
sources, options = mypy.main.process_options(
["-i"] + list(args),
require_targets=True,
server_options=True,
fscache=self.fscache,
program="mypy-daemon",
header=argparse.SUPPRESS,
)
# Signal that we need to restart if the options have changed
if not options.compare_stable(self.options_snapshot):
return {"restart": "configuration changed"}
if __version__ != version:
return {"restart": "mypy version changed"}
if self.fine_grained_manager:
manager = self.fine_grained_manager.manager
start_plugins_snapshot = manager.plugins_snapshot
_, current_plugins_snapshot = mypy.build.load_plugins(
options, manager.errors, sys.stdout, extra_plugins=()
)
if current_plugins_snapshot != start_plugins_snapshot:
return {"restart": "plugins changed"}
except InvalidSourceList as err:
return {"out": "", "err": str(err), "status": 2}
except SystemExit as e:
return {"out": stdout.getvalue(), "err": stderr.getvalue(), "status": e.code}
return self.check(sources, export_types, is_tty, terminal_width)
def cmd_check(
self, files: Sequence[str], export_types: bool, is_tty: bool, terminal_width: int
) -> dict[str, object]:
"""Check a list of files."""
try:
sources = create_source_list(files, self.options, self.fscache)
except InvalidSourceList as err:
return {"out": "", "err": str(err), "status": 2}
return self.check(sources, export_types, is_tty, terminal_width)
def cmd_recheck(
self,
is_tty: bool,
terminal_width: int,
export_types: bool,
remove: list[str] | None = None,
update: list[str] | None = None,
) -> dict[str, object]:
"""Check the same list of files we checked most recently.
If remove/update is given, they modify the previous list;
if all are None, stat() is called for each file in the previous list.
"""
t0 = time.time()
if not self.fine_grained_manager:
return {"error": "Command 'recheck' is only valid after a 'check' command"}
sources = self.previous_sources
if remove:
removals = set(remove)
sources = [s for s in sources if s.path and s.path not in removals]
if update:
# Sort list of file updates by extension, so *.pyi files are first.
update.sort(key=lambda f: os.path.splitext(f)[1], reverse=True)
known = {s.path for s in sources if s.path}
added = [p for p in update if p not in known]
try:
added_sources = create_source_list(added, self.options, self.fscache)
except InvalidSourceList as err:
return {"out": "", "err": str(err), "status": 2}
sources = sources + added_sources # Make a copy!
t1 = time.time()
manager = self.fine_grained_manager.manager
manager.log(f"fine-grained increment: cmd_recheck: {t1 - t0:.3f}s")
old_export_types = self.options.export_types
self.options.export_types = self.options.export_types or export_types
if not self.following_imports():
messages = self.fine_grained_increment(
sources, remove, update, explicit_export_types=export_types
)
else:
assert remove is None and update is None
messages = self.fine_grained_increment_follow_imports(
sources, explicit_export_types=export_types
)
res = self.increment_output(messages, sources, is_tty, terminal_width)
self.flush_caches()
self.update_stats(res)
self.options.export_types = old_export_types
return res
def check(
self, sources: list[BuildSource], export_types: bool, is_tty: bool, terminal_width: int
) -> dict[str, Any]:
"""Check using fine-grained incremental mode.
If is_tty is True format the output nicely with colors and summary line
(unless disabled in self.options). Also pass the terminal_width to formatter.
"""
old_export_types = self.options.export_types
self.options.export_types = self.options.export_types or export_types
if not self.fine_grained_manager:
res = self.initialize_fine_grained(sources, is_tty, terminal_width)
else:
if not self.following_imports():
messages = self.fine_grained_increment(sources, explicit_export_types=export_types)
else:
messages = self.fine_grained_increment_follow_imports(
sources, explicit_export_types=export_types
)
res = self.increment_output(messages, sources, is_tty, terminal_width)
self.flush_caches()
self.update_stats(res)
self.options.export_types = old_export_types
return res
def flush_caches(self) -> None:
self.fscache.flush()
if self.fine_grained_manager:
self.fine_grained_manager.flush_cache()
def update_stats(self, res: dict[str, Any]) -> None:
if self.fine_grained_manager:
manager = self.fine_grained_manager.manager
manager.dump_stats()
res["stats"] = manager.stats
manager.stats = {}
def following_imports(self) -> bool:
"""Are we following imports?"""
# TODO: What about silent?
return self.options.follow_imports == "normal"
def initialize_fine_grained(
self, sources: list[BuildSource], is_tty: bool, terminal_width: int
) -> dict[str, Any]:
self.fswatcher = FileSystemWatcher(self.fscache)
t0 = time.time()
self.update_sources(sources)
t1 = time.time()
try:
result = mypy.build.build(sources=sources, options=self.options, fscache=self.fscache)
except mypy.errors.CompileError as e:
output = "".join(s + "\n" for s in e.messages)
if e.use_stdout:
out, err = output, ""
else:
out, err = "", output
return {"out": out, "err": err, "status": 2}
messages = result.errors
self.fine_grained_manager = FineGrainedBuildManager(result)
original_sources_len = len(sources)
if self.following_imports():
sources = find_all_sources_in_build(self.fine_grained_manager.graph, sources)
self.update_sources(sources)
self.previous_sources = sources
# If we are using the fine-grained cache, build hasn't actually done
# the typechecking on the updated files yet.
# Run a fine-grained update starting from the cached data
if result.used_cache:
t2 = time.time()
# Pull times and hashes out of the saved_cache and stick them into
# the fswatcher, so we pick up the changes.
for state in self.fine_grained_manager.graph.values():
meta = state.meta
if meta is None:
continue
assert state.path is not None
self.fswatcher.set_file_data(
state.path,
FileData(st_mtime=float(meta.mtime), st_size=meta.size, hash=meta.hash),
)
changed, removed = self.find_changed(sources)
changed += self.find_added_suppressed(
self.fine_grained_manager.graph,
set(),
self.fine_grained_manager.manager.search_paths,
)
# Find anything that has had its dependency list change
for state in self.fine_grained_manager.graph.values():
if not state.is_fresh():
assert state.path is not None
changed.append((state.id, state.path))
t3 = time.time()
# Run an update
messages = self.fine_grained_manager.update(changed, removed)
if self.following_imports():
# We need to do another update to any new files found by following imports.
messages = self.fine_grained_increment_follow_imports(sources)
t4 = time.time()
self.fine_grained_manager.manager.add_stats(
update_sources_time=t1 - t0,
build_time=t2 - t1,
find_changes_time=t3 - t2,
fg_update_time=t4 - t3,
files_changed=len(removed) + len(changed),
)
else:
# Stores the initial state of sources as a side effect.
self.fswatcher.find_changed()
if MEM_PROFILE:
from mypy.memprofile import print_memory_profile
print_memory_profile(run_gc=False)
__, n_notes, __ = count_stats(messages)
status = 1 if messages and n_notes < len(messages) else 0
# We use explicit sources length to match the logic in non-incremental mode.
messages = self.pretty_messages(messages, original_sources_len, is_tty, terminal_width)
return {"out": "".join(s + "\n" for s in messages), "err": "", "status": status}
def fine_grained_increment(
self,
sources: list[BuildSource],
remove: list[str] | None = None,
update: list[str] | None = None,
explicit_export_types: bool = False,
) -> list[str]:
"""Perform a fine-grained type checking increment.
If remove and update are None, determine changed paths by using
fswatcher. Otherwise, assume that only these files have changes.
Args:
sources: sources passed on the command line
remove: paths of files that have been removed
update: paths of files that have been changed or created
explicit_export_types: --export-type was passed in a check command
(as opposite to being set in dmypy start)
"""
assert self.fine_grained_manager is not None
manager = self.fine_grained_manager.manager
t0 = time.time()
if remove is None and update is None:
# Use the fswatcher to determine which files were changed
# (updated or added) or removed.
self.update_sources(sources)
changed, removed = self.find_changed(sources)
else:
# Use the remove/update lists to update fswatcher.
# This avoids calling stat() for unchanged files.
changed, removed = self.update_changed(sources, remove or [], update or [])
if explicit_export_types:
# If --export-types is given, we need to force full re-checking of all
# explicitly passed files, since we need to visit each expression.
add_all_sources_to_changed(sources, changed)
changed += self.find_added_suppressed(
self.fine_grained_manager.graph, set(), manager.search_paths
)
manager.search_paths = compute_search_paths(sources, manager.options, manager.data_dir)
t1 = time.time()
manager.log(f"fine-grained increment: find_changed: {t1 - t0:.3f}s")
messages = self.fine_grained_manager.update(changed, removed)
t2 = time.time()
manager.log(f"fine-grained increment: update: {t2 - t1:.3f}s")
manager.add_stats(
find_changes_time=t1 - t0,
fg_update_time=t2 - t1,
files_changed=len(removed) + len(changed),
)
self.previous_sources = sources
return messages
def fine_grained_increment_follow_imports(
self, sources: list[BuildSource], explicit_export_types: bool = False
) -> list[str]:
"""Like fine_grained_increment, but follow imports."""
t0 = time.time()
# TODO: Support file events
assert self.fine_grained_manager is not None
fine_grained_manager = self.fine_grained_manager
graph = fine_grained_manager.graph
manager = fine_grained_manager.manager
orig_modules = list(graph.keys())
self.update_sources(sources)
changed_paths = self.fswatcher.find_changed()
manager.search_paths = compute_search_paths(sources, manager.options, manager.data_dir)
t1 = time.time()
manager.log(f"fine-grained increment: find_changed: {t1 - t0:.3f}s")
seen = {source.module for source in sources}
# Find changed modules reachable from roots (or in roots) already in graph.
changed, new_files = self.find_reachable_changed_modules(
sources, graph, seen, changed_paths
)
if explicit_export_types:
# Same as in fine_grained_increment().
add_all_sources_to_changed(sources, changed)
sources.extend(new_files)
# Process changes directly reachable from roots.
messages = fine_grained_manager.update(changed, [], followed=True)
# Follow deps from changed modules (still within graph).
worklist = changed.copy()
while worklist:
module = worklist.pop()
if module[0] not in graph:
continue
sources2 = self.direct_imports(module, graph)
# Filter anything already seen before. This prevents
# infinite looping if there are any self edges. (Self
# edges are maybe a bug, but...)
sources2 = [source for source in sources2 if source.module not in seen]
changed, new_files = self.find_reachable_changed_modules(
sources2, graph, seen, changed_paths
)
self.update_sources(new_files)
messages = fine_grained_manager.update(changed, [], followed=True)
worklist.extend(changed)
t2 = time.time()
def refresh_file(module: str, path: str) -> list[str]:
return fine_grained_manager.update([(module, path)], [], followed=True)
for module_id, state in list(graph.items()):
new_messages = refresh_suppressed_submodules(
module_id, state.path, fine_grained_manager.deps, graph, self.fscache, refresh_file
)
if new_messages is not None:
messages = new_messages
t3 = time.time()
# There may be new files that became available, currently treated as
# suppressed imports. Process them.
while True:
new_unsuppressed = self.find_added_suppressed(graph, seen, manager.search_paths)
if not new_unsuppressed:
break
new_files = [BuildSource(mod[1], mod[0], followed=True) for mod in new_unsuppressed]
sources.extend(new_files)
self.update_sources(new_files)
messages = fine_grained_manager.update(new_unsuppressed, [], followed=True)
for module_id, path in new_unsuppressed:
new_messages = refresh_suppressed_submodules(
module_id, path, fine_grained_manager.deps, graph, self.fscache, refresh_file
)
if new_messages is not None:
messages = new_messages
t4 = time.time()
# Find all original modules in graph that were not reached -- they are deleted.
to_delete = []
for module_id in orig_modules:
if module_id not in graph:
continue
if module_id not in seen:
module_path = graph[module_id].path
assert module_path is not None
to_delete.append((module_id, module_path))
if to_delete:
messages = fine_grained_manager.update([], to_delete)
fix_module_deps(graph)
self.previous_sources = find_all_sources_in_build(graph)
self.update_sources(self.previous_sources)
# Store current file state as side effect
self.fswatcher.find_changed()
t5 = time.time()
manager.log(f"fine-grained increment: update: {t5 - t1:.3f}s")
manager.add_stats(
find_changes_time=t1 - t0,
fg_update_time=t2 - t1,
refresh_suppressed_time=t3 - t2,
find_added_supressed_time=t4 - t3,
cleanup_time=t5 - t4,
)
return messages
def find_reachable_changed_modules(
self,
roots: list[BuildSource],
graph: mypy.build.Graph,
seen: set[str],
changed_paths: AbstractSet[str],
) -> tuple[list[tuple[str, str]], list[BuildSource]]:
"""Follow imports within graph from given sources until hitting changed modules.
If we find a changed module, we can't continue following imports as the imports
may have changed.
Args:
roots: modules where to start search from
graph: module graph to use for the search
seen: modules we've seen before that won't be visited (mutated here!!)
changed_paths: which paths have changed (stop search here and return any found)
Return (encountered reachable changed modules,
unchanged files not in sources_set traversed).
"""
changed = []
new_files = []
worklist = roots.copy()
seen.update(source.module for source in worklist)
while worklist:
nxt = worklist.pop()
if nxt.module not in seen:
seen.add(nxt.module)
new_files.append(nxt)
if nxt.path in changed_paths:
assert nxt.path is not None # TODO
changed.append((nxt.module, nxt.path))
elif nxt.module in graph:
state = graph[nxt.module]
for dep in state.dependencies:
if dep not in seen:
seen.add(dep)
worklist.append(BuildSource(graph[dep].path, graph[dep].id, followed=True))
return changed, new_files
def direct_imports(
self, module: tuple[str, str], graph: mypy.build.Graph
) -> list[BuildSource]:
"""Return the direct imports of module not included in seen."""
state = graph[module[0]]
return [BuildSource(graph[dep].path, dep, followed=True) for dep in state.dependencies]
def find_added_suppressed(
self, graph: mypy.build.Graph, seen: set[str], search_paths: SearchPaths
) -> list[tuple[str, str]]:
"""Find suppressed modules that have been added (and not included in seen).
Args:
seen: reachable modules we've seen before (mutated here!!)
Return suppressed, added modules.
"""
all_suppressed = set()
for state in graph.values():
all_suppressed |= state.suppressed_set
# Filter out things that shouldn't actually be considered suppressed.
#
# TODO: Figure out why these are treated as suppressed
all_suppressed = {
module
for module in all_suppressed
if module not in graph and not ignore_suppressed_imports(module)
}
# Optimization: skip top-level packages that are obviously not
# there, to avoid calling the relatively slow find_module()
# below too many times.
packages = {module.split(".", 1)[0] for module in all_suppressed}
packages = filter_out_missing_top_level_packages(packages, search_paths, self.fscache)
# TODO: Namespace packages
finder = FindModuleCache(search_paths, self.fscache, self.options)
found = []
for module in all_suppressed:
top_level_pkg = module.split(".", 1)[0]
if top_level_pkg not in packages:
# Fast path: non-existent top-level package
continue
result = finder.find_module(module, fast_path=True)
if isinstance(result, str) and module not in seen:
# When not following imports, we only follow imports to .pyi files.
if not self.following_imports() and not result.endswith(".pyi"):
continue
found.append((module, result))
seen.add(module)
return found
def increment_output(
self, messages: list[str], sources: list[BuildSource], is_tty: bool, terminal_width: int
) -> dict[str, Any]:
status = 1 if messages else 0
messages = self.pretty_messages(messages, len(sources), is_tty, terminal_width)
return {"out": "".join(s + "\n" for s in messages), "err": "", "status": status}
def pretty_messages(
self,
messages: list[str],
n_sources: int,
is_tty: bool = False,
terminal_width: int | None = None,
) -> list[str]:
use_color = self.options.color_output and is_tty
fit_width = self.options.pretty and is_tty
if fit_width:
messages = self.formatter.fit_in_terminal(
messages, fixed_terminal_width=terminal_width
)
if self.options.error_summary:
summary: str | None = None
n_errors, n_notes, n_files = count_stats(messages)
if n_errors:
summary = self.formatter.format_error(
n_errors, n_files, n_sources, use_color=use_color
)
elif not messages or n_notes == len(messages):
summary = self.formatter.format_success(n_sources, use_color)
if summary:
# Create new list to avoid appending multiple summaries on successive runs.
messages = messages + [summary]
if use_color:
messages = [self.formatter.colorize(m) for m in messages]
return messages
def update_sources(self, sources: list[BuildSource]) -> None:
paths = [source.path for source in sources if source.path is not None]
if self.following_imports():
# Filter out directories (used for namespace packages).
paths = [path for path in paths if self.fscache.isfile(path)]
self.fswatcher.add_watched_paths(paths)
def update_changed(
self, sources: list[BuildSource], remove: list[str], update: list[str]
) -> ChangesAndRemovals:
changed_paths = self.fswatcher.update_changed(remove, update)
return self._find_changed(sources, changed_paths)
def find_changed(self, sources: list[BuildSource]) -> ChangesAndRemovals:
changed_paths = self.fswatcher.find_changed()
return self._find_changed(sources, changed_paths)
def _find_changed(
self, sources: list[BuildSource], changed_paths: AbstractSet[str]
) -> ChangesAndRemovals:
# Find anything that has been added or modified
changed = [
(source.module, source.path)
for source in sources
if source.path and source.path in changed_paths
]
# Now find anything that has been removed from the build
modules = {source.module for source in sources}
omitted = [source for source in self.previous_sources if source.module not in modules]
removed = []
for source in omitted:
path = source.path
assert path
removed.append((source.module, path))
# Always add modules that were (re-)added, since they may be detected as not changed by
# fswatcher (if they were actually not changed), but they may still need to be checked
# in case they had errors before they were deleted from sources on previous runs.
previous_modules = {source.module for source in self.previous_sources}
changed_set = set(changed)
changed.extend(
[
(source.module, source.path)
for source in sources
if source.path
and source.module not in previous_modules
and (source.module, source.path) not in changed_set
]
)
# Find anything that has had its module path change because of added or removed __init__s
last = {s.path: s.module for s in self.previous_sources}
for s in sources:
assert s.path
if s.path in last and last[s.path] != s.module:
# Mark it as removed from its old name and changed at its new name
removed.append((last[s.path], s.path))
changed.append((s.module, s.path))
return changed, removed
def cmd_inspect(
self,
show: str,
location: str,
verbosity: int = 0,
limit: int = 0,
include_span: bool = False,
include_kind: bool = False,
include_object_attrs: bool = False,
union_attrs: bool = False,
force_reload: bool = False,
) -> dict[str, object]:
"""Locate and inspect expression(s)."""
if not self.fine_grained_manager:
return {
"error": 'Command "inspect" is only valid after a "check" command'
" (that produces no parse errors)"
}
engine = InspectionEngine(
self.fine_grained_manager,
verbosity=verbosity,
limit=limit,
include_span=include_span,
include_kind=include_kind,
include_object_attrs=include_object_attrs,
union_attrs=union_attrs,
force_reload=force_reload,
)
old_inspections = self.options.inspections
self.options.inspections = True
try:
if show == "type":
result = engine.get_type(location)
elif show == "attrs":
result = engine.get_attrs(location)
elif show == "definition":
result = engine.get_definition(location)
else:
assert False, "Unknown inspection kind"
finally:
self.options.inspections = old_inspections
if "out" in result:
assert isinstance(result["out"], str)
result["out"] += "\n"
return result
def cmd_suggest(self, function: str, callsites: bool, **kwargs: Any) -> dict[str, object]:
"""Suggest a signature for a function."""
if not self.fine_grained_manager:
return {
"error": "Command 'suggest' is only valid after a 'check' command"
" (that produces no parse errors)"
}
engine = SuggestionEngine(self.fine_grained_manager, **kwargs)
try:
if callsites:
out = engine.suggest_callsites(function)
else:
out = engine.suggest(function)
except SuggestionFailure as err:
return {"error": str(err)}
else:
if not out:
out = "No suggestions\n"
elif not out.endswith("\n"):
out += "\n"
return {"out": out, "err": "", "status": 0}
finally:
self.flush_caches()
def cmd_hang(self) -> dict[str, object]:
"""Hang for 100 seconds, as a debug hack."""
time.sleep(100)
return {}
# Misc utilities.
MiB: Final = 2**20
def get_meminfo() -> dict[str, Any]:
res: dict[str, Any] = {}