Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique keys runs slow with time #5965

Open
ericman93 opened this issue Mar 20, 2022 · 6 comments
Open

Unique keys runs slow with time #5965

ericman93 opened this issue Mar 20, 2022 · 6 comments

Comments

@ericman93
Copy link

I have a DAG that I want to run in parallel multiple times, but each time with a different input params
Having the same key for all of the parallel runs isn't working, because it will only run it once with the first run params
My solution for that was to generate a guid as a key for each task
But this issue is that having guid is the key creates huge number of tasks, and the scheduler becomes slower with time
Trying to have a pool of keys solved the issue, but this isn’t the solution I want to have

Once the task finishes, I don't care about it anymore so if deleting the keys is an option and will solve the issue, it will be great

import asyncio
from random import randint
from uuid import uuid4
import time
from dask.distributed import Client

def add(x, y):
    return x + y

async def do_once(n):
    def get_key(name):
        # return name # not working, running one once
        # return f'{name}_{uuid4().hex}' # become slow after few minutes
        return f'{name}_{n}'  # works good

    async with Client(address='0.0.0.0:8786', asynchronous=True) as client:
        n_key = get_key('n')
        x_key = get_key('x')
        y_key = get_key('y')
        z_key = get_key('z')
        ran = randint(0, 100)

        dsk = {
            n_key: n,
            x_key: (add, n_key, 1),
            y_key: (add, n_key, ran),
            z_key: (add, x_key, y_key)
        }

        futures = client.get(dsk, keys=[z_key], sync=False)
        res = await futures[0]

        assert res == ((n + 1) + (n + ran)), f"{res} != {((n + 1) + (n + ran))}"

async def main():
    while True:
        start = time.time()
        futures = [do_once(j) for j in range(20)]
        await asyncio.gather(*futures)

        print(f'iteration took {time.time() - start}')

asyncio.run(main())

Environment:

  • Dask version: 2022.02.1
  • Python version: 3.8.7
  • Operating System: MacOS
  • Install method (conda, pip, source): poetry
@ericman93
Copy link
Author

I was looking at the code and saw that when entering task they are grouped by the prefix (that is splitter by -)
so I change get_key method to

    def get_key(name):
        return f'{name}-{n}'

and had much better results

but can I clean the stats somehow?
lets say that the scheduler is running for 4 days now, I don't want the old stats

@fjetter
Copy link
Member

fjetter commented Mar 21, 2022

  1. The tasks should all be deleted as soon as you close your internal client (i.e. leave the contextmanager) or the future instances are garbage collected
  2. Instead of writing your own graph, I would suggest using Client.submit

e.g.

async with Client(address='0.0.0.0:8786', asynchronous=True) as client:
    ran = randint(0, 100)
    f1 = client.submit(add, n, 1)
    f2 = client.submit(add, n, ran)
    f3 = client.submit(add, f1, f2)
    res = await f3

    assert res == ((n + 1) + (n + ran)), f"{res} != {((n + 1) + (n + ran))}"

Do you have a specific reason why you are building the graph yourself? The submit/map API is much more user friendly.

This will re-use keys since, by default, we assume functions to be pure, i.e. side effect free, deterministic and therefore cachable. If this is not true, use the keyword pure=False for all your submit calls an we will generate unique keys for you

@ericman93
Copy link
Author

ericman93 commented Mar 22, 2022

The tasks should be deleted from the client or from the scheduler?

I used graph because I wanted to build the graph only once and reuse it each time with different parameters
In this case, I'll always change the value of n key in the dict and run it
Another issue I had is that when submitting the function it start running, and I wanted all functions to start together

I had changes in my use case and now I'm building the DAG everytime, so I'll try to back to submits
Are there performance differences between graph and submit? My tasks are not pure by the way

@gjoseph92
Copy link
Collaborator

One thing to notice: looks like you're opening 20 different clients at once, then submitting the "same" (same keys) operation from each of them. That should work, it's just unusual.

Could you share the results of your iteration took statement? What kind of increase in runtime are we talking about here?

the scheduler becomes slower with time

feels related to #5960. We've noticed this exact behavior before (re-running the same operation is slower and slower each time) but never figured out why: #4987 (comment).

Are there performance differences between graph and submit?

Graph will actually be a tiny bit more efficient, since it's one network call instead of 4. Using dask.delayed would be the more common thing here though; writing a graph by hand and calling client.get like you're doing is supported, just unusual.

My tasks are not pure by the way

In that case, giving them new keys every time would be the semantically correct thing to do.

so I change get_key method

There's no method named get_key exactly; are you talking about key_split or key_split_group?

@ericman93
Copy link
Author

Could you share the results of your iteration took statement? What kind of increase in runtime are we talking about here?

~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
0 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 0.23832011222839355
iteration took 0.18767070770263672
iteration took 0.25737690925598145
iteration took 0.2599179744720459
iteration took 0.201430082321167
iteration took 0.20307374000549316
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
3 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 0.267427921295166
iteration took 0.3653082847595215
iteration took 0.5170252323150635
iteration took 0.6326770782470703
iteration took 0.758781909942627
iteration took 0.640618085861206
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
6 minutes
~ ~ ~ ~ ~ ~ ~ ~ ~ ~ 
iteration took 2.01904296875
iteration took 2.1777050495147705
iteration took 2.5627808570861816
iteration took 2.125898838043213
iteration took 2.4273059368133545
iteration took 2.8562822341918945

One thing to notice: looks like you're opening 20 different clients at once, then submitting the "same" (same keys) operation from each of them. That should work, it's just unusual.

I'm generating a new key, but with the same prefix
generating the same key was just a test
should I create the client once a reuse it?

There's no method named get_key exactly; are you talking about key_split key_split_group?

I meant the get_key from my example

@ericman93
Copy link
Author

ericman93 commented Jun 1, 2022

continue the discussion from dask forum

a few questions about the dashboard

  • all of DAG's tasks stay in the dashboard until the DAG is completed? for example, GetDataStep will stay until all EnrichmentSteps are finished?
  • in-memory is the number of finished that are waiting for the while DAG to be completed?
  • can I change the aggregation function in complete-time per task page? I want to see the average and not the sum

image

All of my EnrichmentStep can run parallel. I understand that they are running on the same worker for optimization, but can I run it parallel?
Maybe if I'll have more workers per dask-worker and change the scheduler to use threads it will run more parallel? (I have 1 worker per pod)

by the way, I see a lot of unmanaged memory
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants