Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update precommit #8852

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@ ignore =
B028
# do not compare types, for exact checks use `is` / `is not`, for instance checks use `isinstance()`
E721
# multiple statements on one line; required for black compat
E701, E704

per-file-ignores =
**/tests/*:
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -12,37 +12,37 @@ repos:
- id: isort
language_version: python3
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
rev: v3.17.0
hooks:
- id: pyupgrade
args:
- --py39-plus
- repo: https://github.com/psf/black
rev: 23.12.1
rev: 24.8.0
hooks:
- id: black
language_version: python3
args:
- --target-version=py39
- repo: https://github.com/pycqa/flake8
rev: 7.0.0
rev: 7.1.1
hooks:
- id: flake8
language_version: python3
additional_dependencies:
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
- flake8-bugbear==23.12.2
- flake8-bugbear==24.8.19
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
rev: v2.3.0
hooks:
- id: codespell
additional_dependencies:
- tomli
types_or: [rst, markdown]
files: docs
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.11.2
hooks:
- id: mypy
# Override default --ignore-missing-imports
3 changes: 1 addition & 2 deletions continuous_integration/scripts/host_info.py
Original file line number Diff line number Diff line change
@@ -38,8 +38,7 @@ def main() -> None:
else:
print("CPU frequency:")
for freq in freqs:
# FIXME types-psutil
print(f" - current={freq.current}, min={freq.min}, max={freq.max}") # type: ignore
print(f" - current={freq.current}, min={freq.min}, max={freq.max}")

mem = psutil.virtual_memory()
print("Memory:")
1 change: 1 addition & 0 deletions continuous_integration/scripts/parse_stdout.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""On Windows, pytest-timeout kills off the whole test suite, leaving no junit report
behind. Parse the stdout of pytest to generate one.
"""

from __future__ import annotations

import html
6 changes: 3 additions & 3 deletions distributed/_concurrent_futures_thread.py
Original file line number Diff line number Diff line change
@@ -31,9 +31,9 @@
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues: weakref.WeakKeyDictionary[
threading.Thread, queue.Queue
] = weakref.WeakKeyDictionary()
_threads_queues: weakref.WeakKeyDictionary[threading.Thread, queue.Queue] = (
weakref.WeakKeyDictionary()
)
_shutdown = False


13 changes: 7 additions & 6 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
See also :mod:`distributed.worker_memory` and :mod:`distributed.spill`, which implement
spill/pause/terminate mechanics on the Worker side.
"""

from __future__ import annotations

import abc
@@ -392,12 +393,12 @@ def _enact_suggestions(self) -> None:
logger.debug("Enacting suggestions for %d tasks:", len(self.pending))

validate = self.scheduler.validate
drop_by_worker: (
defaultdict[scheduler_module.WorkerState, list[Key]]
) = defaultdict(list)
repl_by_worker: (
defaultdict[scheduler_module.WorkerState, list[Key]]
) = defaultdict(list)
drop_by_worker: defaultdict[scheduler_module.WorkerState, list[Key]] = (
defaultdict(list)
)
repl_by_worker: defaultdict[scheduler_module.WorkerState, list[Key]] = (
defaultdict(list)
)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
6 changes: 2 additions & 4 deletions distributed/actor.py
Original file line number Diff line number Diff line change
@@ -245,12 +245,10 @@ class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""

@abc.abstractmethod
def result(self, timeout: str | timedelta | float | None = None) -> _T:
...
def result(self, timeout: str | timedelta | float | None = None) -> _T: ...

@abc.abstractmethod
def done(self) -> bool:
...
def done(self) -> bool: ...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"
6 changes: 2 additions & 4 deletions distributed/broker.py
Original file line number Diff line number Diff line change
@@ -84,14 +84,12 @@ def _send_to_subscribers(self, topic: str, event: Any) -> None:
self._scheduler.send_all(client_msgs, worker_msgs={})

@overload
def get_events(self, topic: str) -> tuple[tuple[float, Any], ...]:
...
def get_events(self, topic: str) -> tuple[tuple[float, Any], ...]: ...

@overload
def get_events(
self, topic: None = None
) -> dict[str, tuple[tuple[float, Any], ...]]:
...
) -> dict[str, tuple[tuple[float, Any], ...]]: ...

def get_events(
self, topic: str | None = None
2 changes: 1 addition & 1 deletion distributed/cfexecutor.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ def _cascade_future(future, cf_future):
try:
typ, exc, tb = result
raise exc.with_traceback(tb)
except BaseException as exc:
except BaseException as exc: # noqa: B036
cf_future.set_exception(exc)


8 changes: 5 additions & 3 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
@@ -406,9 +406,11 @@ async def run():
host=host,
dashboard=dashboard,
dashboard_address=dashboard_address,
name=name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i),
name=(
name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i)
),
**kwargs,
**port_kwargs_i,
)
5 changes: 3 additions & 2 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
@@ -610,8 +610,9 @@ async def test_set_lifetime_stagger_via_env_var(c, s):
env = os.environ.copy()
env["DASK_DISTRIBUTED__WORKER__LIFETIME__DURATION"] = "10 seconds"
env["DASK_DISTRIBUTED__WORKER__LIFETIME__STAGGER"] = "2 seconds"
with popen(["dask", "worker", s.address], env=env), popen(
["dask", "worker", s.address], env=env
with (
popen(["dask", "worker", s.address], env=env),
popen(["dask", "worker", s.address], env=env),
):
await c.wait_for_workers(2)
[lifetime1, lifetime2] = (
21 changes: 11 additions & 10 deletions distributed/client.py
Original file line number Diff line number Diff line change
@@ -133,9 +133,9 @@

logger = logging.getLogger(__name__)

_global_clients: weakref.WeakValueDictionary[
int, Client
] = weakref.WeakValueDictionary()
_global_clients: weakref.WeakValueDictionary[int, Client] = (
weakref.WeakValueDictionary()
)
_global_client_index = [0]

_current_client: ContextVar[Client | None] = ContextVar("_current_client", default=None)
@@ -483,6 +483,7 @@ def execute_callback(fut):
fn(fut)
except BaseException:
logger.exception("Error in callback %s of %s:", fn, fut)
raise

self.client.loop.add_callback(
done_callback, self, partial(cls._cb_executor.submit, execute_callback)
@@ -3873,13 +3874,13 @@ async def _restart_workers(
name_to_addr = {meta["name"]: addr for addr, meta in info["workers"].items()}
worker_addrs = [name_to_addr.get(w, w) for w in workers]

out: dict[
str, Literal["OK", "removed", "timed out"]
] = await self.scheduler.restart_workers(
workers=worker_addrs,
timeout=timeout,
on_error="raise" if raise_for_error else "return",
stimulus_id=f"client-restart-workers-{time()}",
out: dict[str, Literal["OK", "removed", "timed out"]] = (
await self.scheduler.restart_workers(
workers=worker_addrs,
timeout=timeout,
on_error="raise" if raise_for_error else "return",
stimulus_id=f"client-restart-workers-{time()}",
)
)
# Map keys back to original `workers` input names/addresses
out = {w: out[w_addr] for w, w_addr in zip(workers, worker_addrs)}
3 changes: 1 addition & 2 deletions distributed/comm/registry.py
Original file line number Diff line number Diff line change
@@ -7,8 +7,7 @@


class _EntryPoints(Protocol):
def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]:
...
def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]: ...


_entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment]
5 changes: 3 additions & 2 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

.. _UCX: https://github.com/openucx/ucx
"""

from __future__ import annotations

import functools
@@ -360,7 +361,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
await self.ep.recv(header)
header = struct.unpack(header_fmt, header)
cuda_frames, sizes = header[:nframes], header[nframes:]
except BaseException as e:
except BaseException as e: # noqa: B036
# In addition to UCX exceptions, may be CancelledError or another
# "low-level" exception. The only safe thing to do is to abort.
# (See also https://github.com/dask/distributed/pull/6574).
@@ -390,7 +391,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
try:
for each_frame in recv_frames:
await self.ep.recv(each_frame)
except BaseException as e:
except BaseException as e: # noqa: B036
# In addition to UCX exceptions, may be CancelledError or another
# "low-level" exception. The only safe thing to do is to abort.
# (See also https://github.com/dask/distributed/pull/6574).
8 changes: 5 additions & 3 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
@@ -3923,9 +3923,11 @@ def update(self):

# Format event loop as time and GIL (if configured) as %
self.data["text"] = [
f"{x * 100:.1f}%"
if i % 2 and s.monitor.monitor_gil_contention
else format_time(x)
(
f"{x * 100:.1f}%"
if i % 2 and s.monitor.monitor_gil_contention
else format_time(x)
)
for i, x in enumerate(self.data["values"])
]
update(self.source, self.data)
30 changes: 18 additions & 12 deletions distributed/deploy/ssh.py
Original file line number Diff line number Diff line change
@@ -434,28 +434,34 @@ def SSHCluster(
"cls": Scheduler,
"options": {
"address": hosts[0],
"connect_options": connect_options
if isinstance(connect_options, dict)
else connect_options[0],
"connect_options": (
connect_options
if isinstance(connect_options, dict)
else connect_options[0]
),
"kwargs": scheduler_options,
"remote_python": remote_python[0]
if isinstance(remote_python, list)
else remote_python,
"remote_python": (
remote_python[0] if isinstance(remote_python, list) else remote_python
),
},
}
workers = {
i: {
"cls": Worker,
"options": {
"address": host,
"connect_options": connect_options
if isinstance(connect_options, dict)
else connect_options[i + 1],
"connect_options": (
connect_options
if isinstance(connect_options, dict)
else connect_options[i + 1]
),
"kwargs": worker_options,
"worker_class": worker_class,
"remote_python": remote_python[i + 1]
if isinstance(remote_python, list)
else remote_python,
"remote_python": (
remote_python[i + 1]
if isinstance(remote_python, list)
else remote_python
),
},
}
for i, host in enumerate(hosts[1:])
36 changes: 21 additions & 15 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
@@ -189,14 +189,17 @@ async def test_adapt_quickly():
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster, Client(cluster, asynchronous=True) as client:
async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster,
Client(cluster, asynchronous=True) as client,
):
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
future = client.submit(slowinc, 1, delay=0.100)
await wait(future)
@@ -240,13 +243,16 @@ async def test_adapt_quickly():
@gen_test()
async def test_adapt_down():
"""Ensure that redefining adapt with a lower maximum removes workers"""
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster, Client(cluster, asynchronous=True) as client:
async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster,
Client(cluster, asynchronous=True) as client,
):
cluster.adapt(interval="20ms", maximum=5)

futures = client.map(slowinc, range(1000), delay=0.1)
19 changes: 12 additions & 7 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -10,9 +10,11 @@

@gen_test()
async def test_eq():
async with Cluster(asynchronous=True, name="A") as clusterA, Cluster(
asynchronous=True, name="A2"
) as clusterA2, Cluster(asynchronous=True, name="B") as clusterB:
async with (
Cluster(asynchronous=True, name="A") as clusterA,
Cluster(asynchronous=True, name="A2") as clusterA2,
Cluster(asynchronous=True, name="B") as clusterB,
):
assert clusterA != "A"
assert not (clusterA == "A")
assert clusterA == clusterA
@@ -75,8 +77,11 @@ def test_exponential_backoff():
@gen_test()
async def test_sync_context_manager_used_with_async_cluster():
async with Cluster(asynchronous=True, name="A") as cluster:
with pytest.raises(
TypeError,
match=r"Used 'with' with asynchronous class; please use 'async with'",
), cluster:
with (
pytest.raises(
TypeError,
match=r"Used 'with' with asynchronous class; please use 'async with'",
),
cluster,
):
pass
Loading
Loading