Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few threaded scheduler fixups #8143

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jcrist
Copy link
Member

@jcrist jcrist commented Sep 14, 2021

Previously there was a bug where the number of threads used by the
threaded scheduler was different if dask.compute was called in the
main thread vs in a background thread. While fixing this, I also noticed
that the logic around maintaining a cache of executors could be done in
a simpler way that avoided the need for a lock or active management (we
could instead rely on GC behavior). This patch includes fixes for both.

Previously there was a bug where the number of threads used by the
threaded scheduler was different if `dask.compute` was called in the
main thread vs in a background thread. While fixing this, I also noticed
that the logic around maintaining a cache of executors could be done in
a simpler way that avoided the need for a lock or active management (we
could instead rely on GC behavior). This patch includes fixes for both.
if not hasattr(_EXECUTORS, "pool"):
_EXECUTORS.pool = _ExecutorPool()
if not num_workers: # treat both 0 and None the same
# TODO: if num_workers is 1 should we still use more threads?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we default to num_workers == CPU_COUNT. Do we want to set a minimum threshold here? This came up in a binder session where CPU_COUNT was 1, so the threaded scheduler wasn't resulting in any parallelism on a simple time.sleep example.

multiprocessing.pool.ThreadPool defaults to os.cpu_count(), but concurrent.futures.ThreadPoolExecutor does set a min default of cpu_count 4.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering since we are using the concurrent.futures.ThreadPoolExecutor shouldn't we try to be consistent with their defaults?
Checking the docs when max_workers=None since version 3.8 it sets the default to min(32, os.cpu_count() 4) and when max_workers=0 it raises a value error, instead of treating it same as None.

See: https://github.com/python/cpython/blob/9ccdc90488302b212bd3405d10dc5c22052e9b4c/Lib/concurrent/futures/thread.py#L128-L138

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we should still be using the custom dask.system.cpu_count function rather than os.cpu_count directly, as it handles things like cpu affinity and cgroups.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO using CPU_COUNT as a default still makes sense.

Likely the instances Binder uses lack multiple cores (and likely live in a multitenant deployment). If there is a better value to use there, it would make sense to document in notebooks or other docs so users know how to override it.

We could potentially check an environment variable and use that for CPU_COUNT (if Binder folks would like to override this generally). Have done similar things with conda-build and CI in conda-forge, which has worked well.

One other thought on the use of a single thread in the multithreaded case, we could issue a warning so the user is aware. This likely requires some thought to ensure the warning is not overly noise (or entirely missed).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already settable with an environment variable (DASK_NUM_WORKERS), but not every user knows that.

I don't think we need to match the heuristics of other python threadpool-like-things, I was mainly wondering if having a threaded scheduler that may end up using only 1 thread by default on some systems would be confusing to new users. But since dask is mainly used for compute-heavy work (rather than IO heavy work), using more threads > cores outside of a demo context makes less sense. Fine to keep things as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. Good point.

Yeah I think it is ok to leave as-is. We could maybe have a warning, but we could also handle that as a separate issue ( #8152 ).

@jcrist
Copy link
Member Author

jcrist commented Sep 14, 2021

cc @ncclementi

@ncclementi
Copy link
Member

We also seem to be having problems with the test_threaded_within_thread not passing on windows. I wonder if it's connected to how the get works in here

result = get({"x": (lambda: i,)}, "x", num_workers=2)

@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Oct 25, 2021
@jsignell
Copy link
Member

This one has slipped through the cracks. Is there something to salvage?

@jakirkham
Copy link
Member

Would think so as well. Generally this seemed reasonable.

Think it had CI failures (potentially unrelated?) at the time. The PR has also fallen out-of-date. Maybe fixing conflicts and merging with main to get the latest CI status would be a good first (maybe only) step?

@jsignell
Copy link
Member

I can take on a cleanup effort @jcrist if that's ok with you.

dask/threaded.py Outdated Show resolved Hide resolved
Co-authored-by: jakirkham <[email protected]>
@jakirkham
Copy link
Member

jakirkham commented Apr 1, 2022

Seeing this Windows test failure on CI:

    def test_threaded_within_thread():
        L = []
    
        def f(i):
            result = get({"x": (lambda: i,)}, "x", num_workers=2)
            L.append(result)
    
        before = threading.active_count()
    
        for i in range(20):
            t = threading.Thread(target=f, args=(1,))
            t.daemon = True
            t.start()
            t.join()
            assert L == [1]
            del L[:]
    
        start = time()  # wait for most threads to join
        while threading.active_count() > before   10:
            sleep(0.01)
>           assert time() < start   5
E           assert 1648825023.2966466 < (1648825018.2903678   5)
E               where 1648825023.2966466 = time()

dask\tests\test_threaded.py:144: AssertionError

Similar failures show up on all Windows CI runs. Maybe we should just bump 5 to 6 or similar?

@jcrist
Copy link
Member Author

jcrist commented Apr 1, 2022

Maybe we should just bump 5 to 6 or similar?

No, this is likely an actual windows-specific bug (in the new code), not something timing specific.

@github-actions github-actions bot removed the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants