Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in code suggestion of when to use split_out in dask.dataframe.groupby #8001

Open
raybellwaves opened this issue Aug 5, 2021 · 6 comments
Open
Labels
dataframe documentation Improve or add to documentation needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Comments

@raybellwaves
Copy link
Member

raybellwaves commented Aug 5, 2021

After spending some time working out why my groupby operation was not working I came across https://examples.dask.org/dataframes/02-groupby.html#Many-groups. If It wasn't for the great docs around dask I would likely to have missed this.

The split_out arg really helped.

This made sense as I had ~500,000,000 groups.

I know it may not be easy but similar to other warning messages (I believe i've seen a suggestion of using Client.Scatter at some point but can't find where it comes from in the codebase) it would be nice to suggest in the code to the user once (if possible) it works out how many groups there are, or even optimize it so it happens under the hood (see #7933).

Example:

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, performance_report

cluster = LocalCluster(local_directory="/tmp")
client = Client(cluster)

df = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-01-04",
    dtypes={"x": float, "y": float, "id": int},
    freq="10ms",
    partition_freq="24h",
)

# key of ["x", "y"] gives ~ 25,920,000 groups

# Doesn't finish
with performance_report(filename="ddf-too-many-groups.html"):
    df.groupby(["x", "y"]).sum().compute()

# Finishes
with performance_report(filename="ddf-split-out.html"):
    df.groupby(["x", "y"]).sum(split_out=4).compute()

Performance reports:

Traceback of df.groupby(["x", "y"]).sum().compute():

KilledWorker                              Traceback (most recent call last)
/var/folders/rf/26llfhwd68x7cftb1z3h000w0000gp/T/ipykernel_39651/3854007570.py in <module>
      1 with performance_report(filename=""):
----> 2     df.groupby(["x", "y"]).sum().compute()

~/miniconda3/envs/main/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/miniconda3/envs/main/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2746                     should_rejoin = False
   2747             try:
-> 2748                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2749             finally:
   2750                 for f in futures.values():

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   2023             else:
   2024                 local_worker = None
-> 2025             return self.sync(
   2026                 self._gather,
   2027                 futures,

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    864             return future
    865         else:
--> 866             return sync(
    867                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    868             )

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

~/miniconda3/envs/main/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/main/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1888                             exc = CancelledError(key)
   1889                         else:
-> 1890                             raise exception.with_traceback(traceback)
   1891                         raise exc
   1892                     if errors == "skip":

KilledWorker: ("('dataframe-groupby-sum-agg-f6215c9bc267c471d79fe7d859ac9f04', 0)", <WorkerState 'tcp://127.0.0.1:60293', name: 0, memory: 0, processing: 1>)
@jsignell
Copy link
Member

jsignell commented Aug 5, 2021

That's a really interesting idea @raybellwaves - I like the idea of introducing these kinds of guardrails and suggestions into the codebase (as long as we don't go overboard :) ). I'm wondering how this will work though. After looking at the code for a bit I am actually wondering if it might make sense to default split_out to the same npartitions as the original df. The split_out=1 default seems like a bit of a gotcha.

@mrocklin
Copy link
Member

mrocklin commented Aug 8, 2021

I am actually wondering if it might make sense to default split_out to the same npartitions as the original df. The split_out=1 default seems like a bit of a gotcha.

In the common case split_out=1 makes sense for groupby-aggregations. There are usually a small number of groups (where "small" means less than ten million) and so we want a single output partition. Split_out is computationally decently expensive. It requires something like a mini-shuffle in order to achieve.

I agree that informative error messages would be good. If we knew the values of the data ahead of time then this is something that we would definitely do, but as things stand we only know about this situation once we're in the middle of a computation. Maybe we can emit such a message during computation in the intermediate steps, but this is going to be more awkward and less effective than in something like Client.scatter (where we know the answer much closer to client code).

I think in general the answer to this is 👍 full support, but it's not clear how best to accomplish it, so if anyone wants to get their hands dirty and investigate that would be welcome.

@jsignell
Copy link
Member

Yeah that makes sense. I think as a first step we could template in docstrings for split_out wherever it's allowed. As of right now split_out is not listed in the params for the groupby aggreagtion methods.

@gjoseph92
Copy link
Collaborator

dask/distributed#5220 could make it easier to send a warning back to users at runtime. Yes, it would be nice to know at call time, but getting a warning at compute time would still be far more helpful than no warning at all!

@fjetter
Copy link
Member

fjetter commented Aug 17, 2021

One would still need to put some thought into how to issue the warning with dask/distributed#5220 you wouldn't want every single task to issue a warning since the client would receive hundreds or even thousands of warnings which might be a little excessive :) but the infrastructure is there, yes.

@gjoseph92
Copy link
Collaborator

With dask/distributed#5217 we could have a warn_once event which is the same as warn but filters the messages for uniqueness on the client :) bit inefficient, but easy!

@GenevieveBuckley GenevieveBuckley added dataframe documentation Improve or add to documentation labels Oct 14, 2021
@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe documentation Improve or add to documentation needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.
Projects
None yet
Development

No branches or pull requests

6 participants