Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: rewrite the work-stealing thread pool #1657

Merged
merged 39 commits into from
Oct 19, 2019
Merged

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Oct 14, 2019

This patch is a ground up rewrite of the existing work-stealing thread
pool. The goal is to reduce overhead while simplifying code when
possible.

At a high level, the following architectural changes were made:

  • The local run queues were switched for bounded circle buffer queues.
  • Reduce cross-thread synchronization.
  • Refactor task constructs to use a single allocation and always include
    a join handle (Refine Executor trait #887).
  • Simplify logic around putting workers to sleep and waking them up.

This article goes into details of the implementation and would be helpful reading when reviewing this PR: https://tokio.rs/blog/2019-10-scheduler/

Local run queues

Move away from crossbeam's implementation of the Chase-Lev deque. This
implementation included unnecessary overhead as it supported
capabilities that are not needed for the work-stealing thread pool.
Instead, a fixed size circle buffer is used for the local queue. When
the local queue is full, half of the tasks contained in it are moved to
the global run queue.

Reduce cross-thread synchronization

This is done via many small improvements. Primarily, an upper bound is
placed on the number of concurrent stealers. Limiting the number of
stealers results in lower contention. Secondly, the rate at which
workers are notified and woken up is throttled. This also reduces
contention by preventing many threads from racing to steal work.

Refactor task structure

Now that Tokio is able to target a rust version that supports
std::alloc as well as std::task, the pool is able to optimize how
the task structure is laid out. Now, a single allocation per task is
required and a join handle is always provided enabling the spawner to
retrieve the result of the task (#887).

Simplifying logic

When possible, complexity is reduced in the implementation. This is done
by using locks and other simpler constructs in cold paths. The set of
sleeping workers is now represented as a Mutex<VecDeque<usize>>.
Instead of optimizing access to this structure, we reduce the amount the
pool must access this structure.

Secondly, we have (temporarily) removed threadpool::blocking. This
capability will come back later, but the original implementation was way
more complicated than necessary.

Results

The thread pool benchmarks have improved significantly:

Old thread pool

test chained_spawn ... bench:   2,019,796 ns/iter ( /- 302,168)
test ping_pong     ... bench:   1,279,948 ns/iter ( /- 154,365)
test spawn_many    ... bench:  10,283,608 ns/iter ( /- 1,284,275)
test yield_many    ... bench:  21,450,748 ns/iter ( /- 1,201,337)

New thread pool

test chained_spawn ... bench:     168,854 ns/iter ( /- 8,339)
test ping_pong     ... bench:     562,659 ns/iter ( /- 34,410)
test spawn_many    ... bench:   7,320,737 ns/iter ( /- 264,620)
test yield_many    ... bench:  14,638,563 ns/iter ( /- 1,573,678)

Real-world benchmarks improve significantly as well. This is testing the hyper hello world server using wrk -t1 -c50 -d10:

Old scheduler

Running 10s test @ http://127.0.0.1:3000
  1 threads and 50 connections
  Thread Stats   Avg      Stdev     Max    /- Stdev
    Latency   371.53us   99.05us   1.97ms   60.53%
    Req/Sec   114.61k     8.45k  133.85k    67.00%
  1139307 requests in 10.00s, 95.61MB read
Requests/sec: 113923.19
Transfer/sec:      9.56MB

New scheduler

Running 10s test @ http://127.0.0.1:3000
  1 threads and 50 connections
  Thread Stats   Avg      Stdev     Max    /- Stdev
    Latency   275.05us   69.81us   1.09ms   73.57%
    Req/Sec   153.17k    10.68k  171.51k    71.00%
  1522671 requests in 10.00s, 127.79MB read
Requests/sec: 152258.70
Transfer/sec:     12.78MB

This patch is a ground up rewrite of the existing work-stealing thread
pool. The goal is to reduce overhead while simplifying code when
possible.

At a high level, the following architectural changes were made:

- The local run queues were switched for bounded circle buffer queues.
- Reduce cross-thread synchronization.
- Refactor task constructs to use a single allocation and always include
  a join handle (#887).
- Simplify logic around putting workers to sleep and waking them up.

Move away from crossbeam's implementation of the Chase-Lev deque. This
implementation included unnecessary overhead as it supported
capabilities that are not needed for the work-stealing thread pool.
Instead, a fixed size circle buffer is used for the local queue. When
the local queue is full, half of the tasks contained in it are moved to
the global run queue.

This is done via many small improvements. Primarily, an upper bound is
placed on the number of concurrent stealers. Limiting the number of
stealers results in lower contention. Secondly, the rate at which
workers are notified and woken up is throttled. This also reduces
contention by preventing many threads from racing to steal work.

Now that Tokio is able to target a rust version that supports
`std::alloc` as well as `std::task`, the pool is able to optimize how
the task structure is laid out. Now, a single allocation per task is
required and a join handle is always provided enabling the spawner to
retrieve the result of the task (#887).

When possible, complexity is reduced in the implementation. This is done
by using locks and other simpler constructs in cold paths. The set of
sleeping workers is now represented as a `Mutex<VecDeque<usize>>`.
Instead of optimizing access to this structure, we reduce the amount the
pool must access this structure.

Secondly, we have (temporarily) removed `threadpool::blocking`. This
capability will come back later, but the original implementation was way
more complicated than necessary.
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great overall and the associated blog post is a really good explanation. I commented on some non-blocking nits.

tokio-executor/src/loom/mod.rs Show resolved Hide resolved
tokio-executor/src/task/error.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/harness.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/harness.rs Show resolved Hide resolved
tokio-executor/src/task/harness.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/current.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/park.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/queue/local.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/worker.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/list.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/stack.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/waker.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/park.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/park.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/set.rs Outdated Show resolved Hide resolved
tokio-executor/src/thread_pool/thread_pool.rs Outdated Show resolved Hide resolved
tokio/src/runtime/threadpool/builder.rs Show resolved Hide resolved
@seanmonstar
Copy link
Member

Can the mod task stuff be optional? Seems like a lot of code if you just want to depend on the traits (or current-thread, though I assume that executor will take advantage of this stuff eventually).

@sfackler
Copy link
Contributor

I tried out the branch with some tokio-postgres benchmarks and hit this panic:

thread 'tokio-runtime-worker-0' panicked at 'assertion failed: res.is_complete() || res.is_canceled()', /home/sfackler/.cargo/git/checkouts/tokio-377c595163f99a10/3a941e9/tokio-executor/src/task/harness.rs:182:9
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.37/src/backtrace/libunwind.rs:88
   1: backtrace::backtrace::trace_unsynchronized
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.37/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:76
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:60
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1030
   5: std::io::Write::write_fmt
             at src/libstd/io/mod.rs:1412
   6: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:64
   7: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:49
   8: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:196
   9: std::panicking::default_hook
             at src/libstd/panicking.rs:210
  10: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:473
  11: std::panicking::begin_panic
  12: tokio_executor::task::harness::Harness<T,S>::drop_task
  13: tokio_executor::task::raw::drop_task
  14: std::thread::local::LocalKey<T>::with
  15: tokio_executor::task::harness::Harness<T,S>::wake_by_val
  16: tokio_executor::task::waker::wake_by_val
  17: <tokio_net::driver::reactor::Inner as core::ops::drop::Drop>::drop
             at /home/sfackler/.cargo/git/checkouts/tokio-377c595163f99a10/3a941e9/tokio-net/src/driver/reactor.rs:510
  18: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  19: core::ptr::drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:165
  20: alloc::sync::Arc<T>::drop_slow
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/liballoc/sync.rs:703
  21: <alloc::sync::Arc<T> as core::ops::drop::Drop>::drop
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/liballoc/sync.rs:1225
  22: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  23: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  24: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  25: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  26: core::ptr::real_drop_in_place
             at /rustc/fa5c2f3e5724bce07bf1b70020e5745e7b693a57/src/libcore/ptr/mod.rs:175
  27: tokio_executor::thread_pool::builder::Builder::build_with_park::{{closure}}
             at /home/sfackler/.cargo/git/checkouts/tokio-377c595163f99a10/3a941e9/tokio-executor/src/thread_pool/builder.rs:192
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

@carllerche
Copy link
Member Author

@sfackler can you give me repro steps?

@sfackler
Copy link
Contributor

I hit it runing cargo bench with tokio patched to this branch at sfackler/rust-postgres@9d2ec74.

It seems like you can probably run into it by just dropping a runtime with a spawned future still alive though.

@carllerche
Copy link
Member Author

@sfackler thanks... looking at your backtrace, it looks like there is a bug when the reactor notifies tasks (which is internal to the thread pool) and dropping... i don't think I thought of testing that case. ❤️ the QA

@carllerche
Copy link
Member Author

I have reproed @sfackler's reported bug. It is indeed a shutdown bug. That is what I get for not using loom to test the shutdown process :'( Adding a loom shutdown test catches it.

@lhecker
Copy link

lhecker commented Oct 17, 2019

I’m sorry to barge in like that. I‘d just like to cross post what I wrote on Reddit over here where it might be a better fit. 🙂

Nice work!
@zonyitoo and me actually also ported Go‘s runtime over to Rust a long time ago.
Please feel free to take our shabby code as another point of reference. 😄

In particular I‘d like to mention the RandomProcessorOrder struct and fetch_foreign_coroutines method you can find there, the former of which I couldn’t find in this PR.
According to Go’s runtime it better randomizes the sources for work stealing. 🙂

@carllerche
Copy link
Member Author

@lhecker thanks for the references. There are definitely still areas to improve on in this PR. I’ll be taking a look. Feel free to leave whatever other feedback you might have.

When shutting down the scheduler, the first step is to close the
global (injection) queue. Once this is done, it is possible for a task
to be notified from *outside* of a scheduler thread. This will attempt
to push the task onto the injection queue but fail. When this happens,
the task must be explicitly be shutdown or bad things will happen.
@carllerche
Copy link
Member Author

@sfackler I fixed the bug you reported. By adding a loom test, I found some additional bugs related to shutdown. I applied a temporary fix, but it needs some cleanup.

tokio-executor/src/task/core.rs Show resolved Hide resolved
pub(super) executor: CausalCell<Option<NonNull<S>>>,

/// Pointer to next task, used for misc task linked lists.
pub(crate) queue_next: UnsafeCell<*const Header<S>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be changed to an Option<NonNull<Header<S>>> as well, if only for consistency with the owned list?

let (pool, mut w, mock_park) = pool!(!2);
(pool, w.remove(0), w.remove(0), mock_park)
}};
(! $n:expr) => {{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: I think typically @ is used in macros for "inner" match arms?


for (_, join) in map.iter_mut() {
let mut cx = Context::from_waker(noop_waker_ref());
match Pin::new(join).poll(&mut cx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/TIOLI: Could be simplified to

if let Ready(n) = Pin::new(join).poll(&mut cx) {
    num = Some(n.unwrap());
    break;
}

tokio-executor/src/task/core.rs Outdated Show resolved Hide resolved
tokio-executor/src/task/core.rs Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants