Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update_who_has can remove workers #6342

Merged
merged 12 commits into from
Jun 1, 2022
Merged

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented May 13, 2022

Contributes to #5896

  • Update_who_has can now remove workers from who_has and not just add them, thus avoiding unnecessary requests to workers that don't have the data or are dead.
  • Fix bug where the worker's own address would end up in distributed.worker.TaskState.who_has after a handle_compute call of a dependent key

@crusaderky crusaderky self-assigned this May 13, 2022
@crusaderky crusaderky force-pushed the WSMR/update_who_has branch from b97a134 to c39288a Compare May 13, 2022 21:52
@github-actions
Copy link
Contributor

github-actions bot commented May 14, 2022

Unit Test Results

       15 files  ±  0         15 suites  ±0   6h 34m 9s ⏱️ +15s
  2 823 tests +  4    2 739 ✔️ +  3    81 💤 +1  3 ±0 
20 924 runs  +28  19 977 ✔️ +19  944 💤 +9  3 ±0 

For more details on these failures, see this check.

Results for commit e8f5ef5. ± Comparison against base commit 5feb171.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the WSMR/update_who_has branch 2 times, most recently from 8c3617b to b67f5c2 Compare May 15, 2022 20:32
@crusaderky crusaderky marked this pull request as ready for review May 16, 2022 09:03
@crusaderky
Copy link
Collaborator Author

crusaderky commented May 16, 2022

This is ready for review and merge.
CC @fjetter @gjoseph92 @graingert @mrocklin

distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@crusaderky
Copy link
Collaborator Author

All review comments addressed

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, just two small things

Comment on lines 3532 to 3534
# All workers which previously held a replica of the key are either
# in flight or busy. Kick off ensure_communicating to try fetching
# the data from the new worker.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# All workers which previously held a replica of the key are either
# in flight or busy. Kick off ensure_communicating to try fetching
# the data from the new worker.
# We were holding back from fetching this key, possibly because we
# didn't know of any workers holding it (that weren't busy, or didn't
# already have in-flight data requests).
# In case having a new worker available changes things,
# kick off ensure_communicating to try fetching the data
# from this new worker.

There are a few other reasons why the task might still be in fetch

  1. we're over the total_out_connections limit
  2. we're over the comm_threshold_bytes limits
  3. we're paused

Just to make it clear that we're being a little optimistic here, not necessarily testing for all cases in which ensure_communicating = True will actually have an effect (which is fine).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comment

if ensure_communicating:
recs, instructions = merge_recs_instructions(
(recs, instructions),
self._ensure_communicating(stimulus_id=stimulus_id),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said in cbccc86#r73797288, it seems odd to me to call _ensure_communicating directly. Why not just append an EnsureCommunicatingAfterTransitions instruction on L3535?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. We'll probably revert it soon as per our recent chat with @fjetter , but it makes sense to have an organic behaviour for now.

@crusaderky
Copy link
Collaborator Author

@fjetter are you happy to merge this?

Comment on lines 398 to 403
# Do not use handle_missing_data, since it would cause the scheduler to call
# handle_free_keys(["y"]) on a
s.remove_replica(ts=s.tasks["x"], ws=s.workers[b.address])
# We used a scheduler internal call, thus corrupting its state.
# Don't crash at the end of the test.
s.validate = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an anti pattern and we should not intentionally put the scheduler in a corrupt state.

Is the condition this test is provoking even real? I would argue that it is not even intended for the key to get into a missing state here but the scheduler should rather ensure the task is released

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's real. I rewrote the test.

Comment on lines 430 to 431
# Wipe x from the worker and lose the message that would inform the scheduler
a.handle_remove_replicas(keys=["x"], stimulus_id="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this request ever be triggered if the key is the only one on the cluster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler thinks w1 and w2 hold a replica.
w2 is unresponsive, but the scheduler doesn't know yet as it didn't time out.
The AMM decides to ask w1 to drop its replica.

Adding a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean "w2 is unresponsive"? It is dead? If so, why not kill the worker? Just being unresponsive, i.e. not hearbeating for a while, doesn't trigger any action. If anything this would trigger the removal of the worker.

Is there a way to trigger this using scheduler API? handle_remove_replica is a signal that's always originating from the scheduler, calling it here out of the blue doesn't seem right

distributed/worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker_state_machine.py Outdated Show resolved Hide resolved
Comment on lines 442 to 448
# Lose the message that would inform the scheduler.
# In theory, this should not happen.
# In practice, in case of TPC connection fault, BatchedSend can drop messages.
assert a.batched_stream.buffer == [
{"key": "x", "stimulus_id": "test", "op": "release-worker-data"}
]
a.batched_stream.buffer.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection fails, there are a couple of other mechanisms around that should recover from any lost messages. This clearly doesn't work well which is why we're likely removing the worker reconnect (#6361)

Apart from this failure scenario, we should always assume that a message goes through. Is there any other way to trigger this condition? I don't feel comfortable with clearing the buffer for a test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above: I cannot think of how to send it out of sync in real life, short of a reconnect.
The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)

Well, I think there is no universally true answer to this question.

Why I care so much about this specific case is because this test apparently motivates an instruction in update_who_has, see https://github.com/dask/distributed/pull/6342/files#r873561939

The argument, as it stands right now, is that we need this instruction because otherwise this test would deadlock. However, this test is an artificial case that intentionally corrupts state and I don't believe we should be able to recover from any kind of state corruption automatically (that's just unrealistic, I believe)

  1. If we cannot reach this state "naturally" we should remove the instruction and the entire code branch that leads up to it because it is effectively dead code and additional complexity.
  2. If this code branch is not dead and can be reached naturally, we should invest the time to construct this naturally occurring state to make sure the instruction is indeed doing what it is supposed to and doesn't have unexpected side effects.

If neither of these two options are satisfying, I would propose to instead go for

  1. keep the code path leading up the the MissingDataMsg but do not issue this instruction but rather fail hard. This will be uncomfortable but we'll know why and how this case is hit and can fix it reliably

Comment on lines 438 to 440
# 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out.
# 3. The AMM decides to ask a to drop its replica.
a.handle_remove_replicas(keys=["x"], stimulus_id="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. AMM would not request the removal of a replica if there is only one replica available
  2. If a worker is "unresponsive" this would mean that some kind of worker is notifying the scheduler about this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. AMM will not request the removal of a replica if it believes it's the only one replica available. Hence the unresponsive worker that hasn't reached timeout.
  2. unresponsive as in, it just froze but it didn't trigger any timeouts yet. When it hits timeouts later, we'll lose all of its contents.

some kind of worker is notifying the scheduler about this.

First of all, you might as well be in the condition where theworker that holds the other replica of x has been unresponsive for less than 30s, which means that nobody knows yet.

Second, this is more nuanced than you picture it.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data} for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.

This design is clearly bad. We should have a design session about it - the most obvious option (but it may be too aggressive) would be to have a {op: missing-worker} call where, as soon as any worker times out on gather-dep, the whole worker is removed from the cluster, together with all of the keys that it has in memory, including those the reporting worker doesn't know about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.

It will not only mark it as lost but it will try to close the worker, ungracefully

If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about

Not if it doesn't receive any data but if the connection attempt fails, assuming there is no pooled connection.


I'm struggling a bit with your usage of the term unresponsive. I don't know if this means a dead worker, high latencies, a blocked GIL / event loop, etc.

The way I understand unresponsiveness is if the worker doesn't respond to messages in a given time and the only place where we expect the worker application code to respond in a certain time is during handshakes upon establishment of new RPC connections.

If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data} for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.

if the scheduler receives a missing-data(ts, errant_worker), it will call a Scheduler.remove_replica(self, ts, ws). If ws is the only worker with a replica, the scheduler will reschedule the task, not ask workers to try to fetch the data again.


I understand that there are various problems around but this all doesn't explain how the worker would receive a remove-replica signal if the scheduler only knows of one worker. This is basically a corruption of the cluster-wide state which is why you need to flush the buffer (#6342 (comment)) to recover.

Comment on lines +389 to +399
6. the scheduler responds {"x": []}, because w1 in the meantime has lost the key.
7. x is transitioned fetch -> missing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the scheduler respond with this message? Why would the worker need to transition to missing? I think the update_who_has should not do anything in this case but rather the scheduler should eventually tell the worker to release the entire task.

If the "update who_has" was not via a dedicated RPC this entire scenario would even be impossible, wouldn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scheduler should eventually tell the worker to release the entire task.

This will happen for dependencies tracked by {op: compute-task}, but not for keys fetched through {op: acquire-replicas}.

Why would the scheduler respond with this message?

  1. Because a third worker recently sent to the scheduler {op: missing-data, errant_worker: b, key: x}
  2. Because the network between b and the scheduler is malfunctioning, so the scheduler hasn't received a heartbeat from b for 5 minutes, but for whatever reason the network between workers a and b is still functioning. This second use case is reproduced in the test (lines 404-407). I could have done the first one, but it would have been more tedious to implement due to time sensitivity.
    Because it hasn't received any heartbeat f

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a third worker recently sent to the scheduler {op: missing-data, errant_worker: b, key: x}

For a dependency this should still be followed by an appropriate free-keys message such that the key will properly released by this worker. It feels like AMM should keep track of the workers it asked to fetch a key to allow for the same mechanism. I would feel more comfortable if both "types" of tasks are handled the same on worker side

Because the network between b and the scheduler is malfunctioning

If the network between a worker and the scheduler is down, we're closing the worker

crusaderky added a commit to crusaderky/distributed that referenced this pull request May 20, 2022
@crusaderky crusaderky mentioned this pull request May 20, 2022
@crusaderky crusaderky linked an issue May 20, 2022 that may be closed by this pull request
8 tasks
crusaderky added a commit to crusaderky/distributed that referenced this pull request May 21, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request May 21, 2022
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some review comments pending. Will give it another look shortly

Comment on lines 438 to 440
# 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out.
# 3. The AMM decides to ask a to drop its replica.
a.handle_remove_replicas(keys=["x"], stimulus_id="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.

It will not only mark it as lost but it will try to close the worker, ungracefully

If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about

Not if it doesn't receive any data but if the connection attempt fails, assuming there is no pooled connection.


I'm struggling a bit with your usage of the term unresponsive. I don't know if this means a dead worker, high latencies, a blocked GIL / event loop, etc.

The way I understand unresponsiveness is if the worker doesn't respond to messages in a given time and the only place where we expect the worker application code to respond in a certain time is during handshakes upon establishment of new RPC connections.

If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data} for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.

if the scheduler receives a missing-data(ts, errant_worker), it will call a Scheduler.remove_replica(self, ts, ws). If ws is the only worker with a replica, the scheduler will reschedule the task, not ask workers to try to fetch the data again.


I understand that there are various problems around but this all doesn't explain how the worker would receive a remove-replica signal if the scheduler only knows of one worker. This is basically a corruption of the cluster-wide state which is why you need to flush the buffer (#6342 (comment)) to recover.

Comment on lines 442 to 448
# Lose the message that would inform the scheduler.
# In theory, this should not happen.
# In practice, in case of TPC connection fault, BatchedSend can drop messages.
assert a.batched_stream.buffer == [
{"key": "x", "stimulus_id": "test", "op": "release-worker-data"}
]
a.batched_stream.buffer.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)

Well, I think there is no universally true answer to this question.

Why I care so much about this specific case is because this test apparently motivates an instruction in update_who_has, see https://github.com/dask/distributed/pull/6342/files#r873561939

The argument, as it stands right now, is that we need this instruction because otherwise this test would deadlock. However, this test is an artificial case that intentionally corrupts state and I don't believe we should be able to recover from any kind of state corruption automatically (that's just unrealistic, I believe)

  1. If we cannot reach this state "naturally" we should remove the instruction and the entire code branch that leads up to it because it is effectively dead code and additional complexity.
  2. If this code branch is not dead and can be reached naturally, we should invest the time to construct this naturally occurring state to make sure the instruction is indeed doing what it is supposed to and doesn't have unexpected side effects.

If neither of these two options are satisfying, I would propose to instead go for

  1. keep the code path leading up the the MissingDataMsg but do not issue this instruction but rather fail hard. This will be uncomfortable but we'll know why and how this case is hit and can fix it reliably

distributed/worker.py Outdated Show resolved Hide resolved
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level, I still have two problems with this PR

  1. The tests are pretty artificial and I'm not convinced they are testing real use cases. This worries me a bit since I'm concerned we're maintaining more complex code just to enable these cases.
  2. I don't feel comfortable for update_who_has to generate RecsInstrs. This requires us to put these complicated constructions in everywhere and I feel the cases that are handled by update_who_has should be handled by the transition system. I'm still struggling to understand why it has or should be handled there
        recommendations, instructions = merge_recs_instructions(
            (recommendations, []),
            self._update_who_has(who_has, stimulus_id=stimulus_id),
        )
        self.transitions(recommendations, stimulus_id=stimulus_id)
        self._handle_instructions(instructions)

Comment on lines 3459 to 3460
list(ts.who_has),
list(workers),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should log all of this. This list can be pretty large in a real world scenario and this is called frequently. We only limit the number of log entries but not the byte size of the log in total.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it to show only a length-constrained version of the differences

Comment on lines +312 to +325
assert_story(
b.log,
# FIXME: This log should be replaced with a StateMachineEvent log
[
(f1.key, "ensure-task-exists", "released"),
(f1.key, "released", "fetch", "fetch", {}),
(f1.key, "compute-task", "fetch"),
(f1.key, "put-in-memory"),
],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this log necessary now? It wasn't there before

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To show evidence that recomputing f1 was in fact what happened, as opposed to a.close() not working as intended in the test which would cause a successful fetch->flight->memory cycle.

Comment on lines +389 to +399
6. the scheduler responds {"x": []}, because w1 in the meantime has lost the key.
7. x is transitioned fetch -> missing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a third worker recently sent to the scheduler {op: missing-data, errant_worker: b, key: x}

For a dependency this should still be followed by an appropriate free-keys message such that the key will properly released by this worker. It feels like AMM should keep track of the workers it asked to fetch a key to allow for the same mechanism. I would feel more comfortable if both "types" of tasks are handled the same on worker side

Because the network between b and the scheduler is malfunctioning

If the network between a worker and the scheduler is down, we're closing the worker

Comment on lines +404 to +416
# Sever connection between b and s, but not between b and a.
# If a tries fetching from b after this, b will keep responding {status: busy}.
b.periodic_callbacks["heartbeat"].stop()
await s.remove_worker(b.address, close=False, stimulus_id="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you are preparing the scheduler state such that it believes the worker is dead. This is only partially true after this statement because the stream is still open. This is flagged in #6390 as a problem and it suggest we should close the stream as well, i.e. remove the close keyword.

Why this bothers me a bit is because I believe in a real world setup where this connection is severed, the cluster would be able to self heal because the connection between A and B would also close. This would be a different mechanism than what this test is specifically testing.



@gen_cluster(client=True, nthreads=[("", 1)])
async def test_self_denounce_missing_data(c, s, a):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly modified the current version of this test to verify it against main. I didn't like the clearing of the batched send but understand that LockedCommPool does not work since the BatchedSend is initialized before the pool can be installed.
I modified it such that we re-submit the cleared message and I added code to ensure that the ordering is still as expected. indeed the cluster unblocks after we resubmit the message. Therefore this test only deadlocks on main because we're dropping messages intentionally which should never happen in a real setup.

The below passes on main

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_self_denounce_missing_data(c, s, a):
    x = c.submit(inc, 1, key="x")
    await x

    # Wipe x from the worker. This simulates the following condition:
    # 1. The scheduler thinks a and b hold a replica of x.
    # 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out.
    # 3. The AMM decides to ask a to drop its replica.
    a.handle_remove_replicas(keys=["x"], stimulus_id="test")

    # Lose the message that would inform the scheduler.
    # In theory, this should not happen.
    # In practice, in case of TPC connection fault, BatchedSend can drop messages.
    count = a.batched_stream.batch_count
    buf = list(a.batched_stream.buffer)
    assert buf == [
        {"key": "x", "stimulus_id": "test", "op": "release-worker-data"}
    ]
    a.batched_stream.buffer.clear()

    y = c.submit(inc, x, key="y")
    # The scheduler tries computing y, but a responds that x is not available.
    # The scheduler kicks off the computation of x and then y from scratch.
    async def check_result():
        assert await y == 3

    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(check_result(), 0.5)

    # There were no further messages over the batched send.
    # WE need to assert on this to make sure the ordering of events is not corrupted
    assert a.batched_stream.batch_count == count
    a.batched_stream.send(*buf)
    await check_result()
    assert_story(
        a.story("compute-task"),
        [
            ("x", "compute-task", "released"),
            # The scheduler tries computing y a first time and fails.
            # This line would not be here if we didn't lose the
            # {"op": "release-worker-data"} message earlier.
            ("y", "compute-task", "released"),
            # The scheduler receives the {"op": "missing-data"} message from the
            # worker. This makes the computation of y to fail. The scheduler reschedules
            # x and then y.
            ("x", "compute-task", "released"),
            ("y", "compute-task", "released"),
        ],
        strict=True,
    )

    del x
    while "x" in a.data:
        await asyncio.sleep(0.01)
    assert a.tasks["x"].state == "released"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above version you can see that we're indeed blocked fetching y (check_result) but after the message is submitted to the scheduler, everything works.

crusaderky added a commit to crusaderky/distributed that referenced this pull request May 25, 2022
@crusaderky
Copy link
Collaborator Author

@fjetter added test for fetch->flight transition, as discussed yesterday

@@ -2098,8 +2098,7 @@ def transition_released_waiting(
if dep_ts.state != "memory":
ts.waiting_for_data.add(dep_ts)
dep_ts.waiters.add(ts)
if dep_ts.state not in {"fetch", "flight"}:
recommendations[dep_ts] = "fetch"
recommendations[dep_ts] = "fetch"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch->fetch is a no-op.
flight->fetch is a no-op if not ts.done.

This aligns the logic here and in handle_acquire_replicas.

@crusaderky crusaderky changed the title Overhaul update_who_has update_who_has can remove workers May 25, 2022
@crusaderky
Copy link
Collaborator Author

All test failures are unrelated.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still worried about test_self_denounce_missing_data. It is a very specific test and I do not think it is realistic. It deliberately corrupts cluster-wide state and expects the system to heal. I consider this to be an unrealistic expectation.

If we are able to detect state corruption, I think we should raise hard but this is out of scope for this PR. However, I also do think we should not encode behavior in our tests we do not actually expect from the system.

Comment on lines +3453 to +3454
if ts.state == "fetch":
self.data_needed_per_worker[worker].push(ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ts.state == "fetch":
self.data_needed_per_worker[worker].push(ts)
self.data_needed_per_worker[worker].push(dep_ts)

I'm a bit on the fence with this.

Why this bothers me is because this is the only place in the entire worker where we're modifying Worker state depending on what TaskState.state is outside of actual transition functions.

I understand the reasoning but it feels a bit odd

Copy link
Collaborator Author

@crusaderky crusaderky May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change? There's no reason to push it into the heap if state=memory or flight.
data_needed_per_worker should be a variant of data_needed, whose only purpose is to be able to fetch out of order.

See

def _add_to_data_needed(self, ts: TaskState, stimulus_id: str) -> RecsInstrs:
self.data_needed.push(ts)
for w in ts.who_has:
self.data_needed_per_worker[w].push(ts)
# This is the same as `return self._ensure_communicating()`, except that when
# many tasks transition to fetch at the same time, e.g. from a single
# compute-task or acquire-replicas command from the scheduler, it allows
# clustering the transfers into less GatherDep instructions; see
# _select_keys_for_gather().
return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)]

which is a helper function exclusively of

  • transition_missing_fetch
  • transition_released_fetch
  • transition_flight_fetch

(the other two transition_*_fetch functions don't call it because they don't set the state to fetch).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as I said, it feels odd but I understand. Let's move on

Comment on lines 477 to 479
# Wipe x from the worker. This simulates the scheduler calling
# delete_worker_data(a.address, ["x"]), but the RPC call has not returned yet.
a.handle_free_keys(keys=["x"], stimulus_id="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything this means that delete_worker_data should not work this way. I don't think we should free any keys via direct RPC calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler.delete_worker_data is only used in replicate and rebalance. Both methods are flagged as being unreliable if the cluster is not at rest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels unnatural for the scheduler to issue any free-keys message if there is only one replica around, even for a rebalance or replicate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the whole test and slapped a pragma: nocover on the code path. Let me know if I interpreted your opinion correctly.

@fjetter fjetter merged commit 715d7be into dask:main Jun 1, 2022
@fjetter
Copy link
Member

fjetter commented Jun 1, 2022

Thanks for your hard work on this @crusaderky !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migrate ensure_communicating transitions to new WorkerState event mechanism
3 participants