-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A few threaded scheduler fixups #8143
base: main
Are you sure you want to change the base?
Conversation
Previously there was a bug where the number of threads used by the threaded scheduler was different if `dask.compute` was called in the main thread vs in a background thread. While fixing this, I also noticed that the logic around maintaining a cache of executors could be done in a simpler way that avoided the need for a lock or active management (we could instead rely on GC behavior). This patch includes fixes for both.
if not hasattr(_EXECUTORS, "pool"): | ||
_EXECUTORS.pool = _ExecutorPool() | ||
if not num_workers: # treat both 0 and None the same | ||
# TODO: if num_workers is 1 should we still use more threads? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we default to num_workers == CPU_COUNT
. Do we want to set a minimum threshold here? This came up in a binder session where CPU_COUNT
was 1
, so the threaded scheduler wasn't resulting in any parallelism on a simple time.sleep
example.
multiprocessing.pool.ThreadPool
defaults to os.cpu_count()
, but concurrent.futures.ThreadPoolExecutor
does set a min default of cpu_count 4
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering since we are using the concurrent.futures.ThreadPoolExecutor
shouldn't we try to be consistent with their defaults?
Checking the docs when max_workers=None
since version 3.8 it sets the default to min(32, os.cpu_count() 4)
and when max_workers=0
it raises a value error, instead of treating it same as None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note we should still be using the custom dask.system.cpu_count
function rather than os.cpu_count
directly, as it handles things like cpu affinity and cgroups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO using CPU_COUNT
as a default still makes sense.
Likely the instances Binder uses lack multiple cores (and likely live in a multitenant deployment). If there is a better value to use there, it would make sense to document in notebooks or other docs so users know how to override it.
We could potentially check an environment variable and use that for CPU_COUNT
(if Binder folks would like to override this generally). Have done similar things with conda-build and CI in conda-forge, which has worked well.
One other thought on the use of a single thread in the multithreaded case, we could issue a warning so the user is aware. This likely requires some thought to ensure the warning is not overly noise (or entirely missed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already settable with an environment variable (DASK_NUM_WORKERS
), but not every user knows that.
I don't think we need to match the heuristics of other python threadpool-like-things, I was mainly wondering if having a threaded scheduler that may end up using only 1 thread by default on some systems would be confusing to new users. But since dask is mainly used for compute-heavy work (rather than IO heavy work), using more threads > cores outside of a demo context makes less sense. Fine to keep things as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right. Good point.
Yeah I think it is ok to leave as-is. We could maybe have a warning, but we could also handle that as a separate issue ( #8152 ).
cc @ncclementi |
We also seem to be having problems with the dask/dask/tests/test_threaded.py Line 83 in f79a400
|
This one has slipped through the cracks. Is there something to salvage? |
Would think so as well. Generally this seemed reasonable. Think it had CI failures (potentially unrelated?) at the time. The PR has also fallen out-of-date. Maybe fixing conflicts and merging with |
I can take on a cleanup effort @jcrist if that's ok with you. |
Co-authored-by: jakirkham <[email protected]>
Seeing this Windows test failure on CI: def test_threaded_within_thread():
L = []
def f(i):
result = get({"x": (lambda: i,)}, "x", num_workers=2)
L.append(result)
before = threading.active_count()
for i in range(20):
t = threading.Thread(target=f, args=(1,))
t.daemon = True
t.start()
t.join()
assert L == [1]
del L[:]
start = time() # wait for most threads to join
while threading.active_count() > before 10:
sleep(0.01)
> assert time() < start 5
E assert 1648825023.2966466 < (1648825018.2903678 5)
E where 1648825023.2966466 = time()
dask\tests\test_threaded.py:144: AssertionError Similar failures show up on all Windows CI runs. Maybe we should just bump |
No, this is likely an actual windows-specific bug (in the new code), not something timing specific. |
Previously there was a bug where the number of threads used by the
threaded scheduler was different if
dask.compute
was called in themain thread vs in a background thread. While fixing this, I also noticed
that the logic around maintaining a cache of executors could be done in
a simpler way that avoided the need for a lock or active management (we
could instead rely on GC behavior). This patch includes fixes for both.