-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Scheduler #278
New Scheduler #278
Conversation
@@ -145,12 145,12 @@ object core: | |||
end Handler | |||
|
|||
trait Safepoint[-E]: | |||
def check(): Boolean | |||
def preempt(): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor refactoring for clarity.
@@ -129,7 129,7 @@ private[kyo] class IOPromise[T](state: State[T]) | |||
promise.get() match | |||
case _: Pending[T] @unchecked => | |||
IOs { | |||
Scheduler.flush() | |||
Scheduler.get.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scheduler
isn't an object anymore. The global instance returned by get
isn't lazily initialized because it'd require a memory barrier to read the field. It uses a regular val
loaded when the companion object class is loaded.
@volatile private var state: Int // Math.abs(state) => runtime; state < 0 => preempting | ||
) extends IOPromise[T] with Task | ||
initialRuntime: Int | ||
) extends IOPromise[T] with Task(initialRuntime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime and preemption management are now done by Task
.
import java.util.concurrent.Executor | ||
import java.util.concurrent.locks.LockSupport | ||
|
||
final private class InternalClock(executor: Executor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task
's runtime tracking has to measure execution times and obtaining the time from the system clock is prohibitively expensive. This internal clock provides approximate time measurements by updating a volatile field every ~1ms. Readers only pay the price of the read barrier and eventual cache misses when the field is updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
import java.util.concurrent.TimeUnit | ||
import scala.concurrent.duration.Duration | ||
|
||
abstract private class InternalTimer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduced for testability.
|
||
private val queue = PriorityQueue[T]() | ||
|
||
@volatile private var items = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a micro optimization. If items
is marked as volatile
, methods like add
have to use memory barriers to update it within a modify
/tryModify
block, which is unnecessary since there's already a write barrier at the end. to release the lock.
|
||
def size(): Int = | ||
VarHandle.acquireFence() | ||
items |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Force a read barrier since the field isn't volatile anymore so all threads see the latest value.
@@ -61,7 62,7 @@ final private[kyo] class Queue[T](using ord: Ordering[T]) extends AtomicBoolean: | |||
!isEmpty() && to.isEmpty() && to.tryModify { | |||
t = queue.dequeue() | |||
val s = size() - 1 | |||
var i = s - (s / 2) | |||
var i = s - Math.ceil(s.toDouble / 2).intValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a worker had 3 tasks, this code was stealing 2 for example. This new logic makes it steal only 1. Leaving a worker with only one task increases the likelihood of it'll soon go to sleep again. If a thief is successful, it'll naturally try stealing again when it's out of tasks.
items = 0 | ||
queue.dequeueAll | ||
} | ||
tasks.foreach(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was unnecessarily holding the lock when executing the drain function.
@@ -0,0 1,39 @@ | |||
package kyo.scheduler.util | |||
|
|||
final private[kyo] class MovingStdDev(window: Int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this class isn't used in a very hot path anymore and is called only by regulators that execute periodically, it was updated to favor precision over performance.
|
||
val a1, a2, a3, a4, a5, a6, a7 = 0L // padding | ||
|
||
@volatile private var cycles = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field used to be in Coordinator
.
ExecutionContext.fromExecutor(asExecutor) | ||
|
||
@tailrec | ||
private def schedule(t: Task, submitter: Worker): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These scheduling methods have not changed.
workers(idx) = new Worker(idx, pool, schedule, steal, () => cycles, clock) | ||
allocatedWorkers = 1 | ||
|
||
private val cycleTask = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from Coordinator
val w = workers(i) | ||
if w != null then | ||
if i >= maxConcurrency then | ||
w.drain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improved logic to continue cycling all workers but also draining workers that have been stopped (are above the concurrency limit)
|
||
val a1, a2, a3, a4, a5, a6, a7 = 0L // padding | ||
|
||
@volatile private var running = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class hasn't changed much but I've made running
a volatile
that is updated using VarHandle
as a micro optimization to avoid the pointer chasing with AtomicBoolean
since the flag used in the hot path of task execution.
val state = m.getState().ordinal() | ||
state == Thread.State.BLOCKED.ordinal() || | ||
state == Thread.State.WAITING.ordinal() || | ||
state == Thread.State.TIMED_WAITING.ordinal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous implementation was missing some of these states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a huge improvement! Nice work!
@@ -3,13 3,19 @@ package kyo.scheduler | |||
import scala.scalajs.concurrent.JSExecutionContext | |||
|
|||
object Scheduler: | |||
lazy val get = new Scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be lazy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
import java.util.concurrent.Executor | ||
import java.util.concurrent.locks.LockSupport | ||
|
||
final private class InternalClock(executor: Executor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
|
||
def scheduleOnce(delay: Duration)(f: => Unit): TimerTask = | ||
val future = executor.schedule((() => f): Runnable, delay.toNanos, TimeUnit.NANOSECONDS) | ||
new TimerTask: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to avoid this allocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're getting used to Kyo's focus on performance :) In this case, I don't think it's necessary because only two tasks are submitted when the scheduler starts. It'd also be more difficult to mock this class in tests if we used an opaque type to alias ScheduledFuture
.
end if | ||
catch | ||
case ex if NonFatal(ex) => | ||
log.error(s"🙈 !!Kyo Scheduler Bug!! ${getClass.getSimpleName()} regulator's probe collection has failed.", ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that we are using a logger instance for this class, we can remove getClass.getSimpleName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah actually I'm not using Kyo's logging because the idea is for the scheduler module to be isolated without a dependency to kyo-core. This is the regular slf4j logger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but you are using SLF4J instance for each class which should include the classname. See line 92:
private[Regulator] val log = LoggerFactory.getLogger(getClass)
This goes throughout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log
in this case would be Regulator
, not the subclass. But I've decided to remove the slf4j
dependency since logging is used only for bugs. There's the LoomSupport
warning but I think it's ok to log that with a regular println
. The module now has zero dependencies, which should help avoid issues in case it's used in isolation without other Kyo modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool emoji!
Benchmark results with the new scheduler: The results are mixed and the @hearnadam would you be able to check out this branch and run this main? It checks that the regulators are working properly, it'd be nice to get data from other machines as well. I'm doing some more testing using containers and CPU quotas to validate the behavior with CPU throttling and I'm planning to merge this PR if everything looks ok. |
@fwbrasil will take a look at running this tonight. |
Self-check is ok with CPU quotas as well:
|
I've made a few new optimizations and the benchmark results are reporting more consistent good results: The main cause for the regression in |
On: |
@hearnadam can you check if you have other processes that could be using some of the CPU? |
private def bug(msg: String, ex: Throwable) = | ||
(new Exception("🙈 !!Kyo Scheduler Bug!! " msg, ex)).printStackTrace(System.err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def bug(msg: String, ex: Throwable) = | |
(new Exception("🙈 !!Kyo Scheduler Bug!! " msg, ex)).printStackTrace(System.err) | |
import java.util.logging.* | |
private lazy val logger = Logger.getLogger(getClass.getName) | |
private def bug(msg: String, ex: Throwable) = | |
logger.log(Level.SEVERE, "🙈 !!Kyo Scheduler Bug!!", new Exception(msg, ex)) |
Since you want 0 dependencies, we can still use java.util.logging, since that's part of the standard library. This will give people better interoperability with other logging frameworks or other tools hooked on logging, like Sentry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea! 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can define your own logging and then there will be a adapter.
Netty 5 is using slf4j again
I've changed the code use java logging as @sideeffffect suggested. I've also tried another optimization moving the preemption handling from I'm merging this PR since it keeps getting larger but please feel free to provide feedback on the change! |
@nowarn | ||
final class InternalClock(executor: Executor = null): | ||
|
||
def currentMillis(): Long = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be nano time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.nanoTime
isn't monotonic across threads. It also helps to hide the imprecision of the clock.
|
||
@volatile private var millis = System.currentTimeMillis() | ||
|
||
val b1, b2, b3, b4, b5, b6, b7 = 0L // padding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool padding
i = 1 | ||
end while | ||
if worker != null then | ||
worker.steal(thief) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker stealing thief not thief stealing worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's thief
taking tasks from worker
but yeah, the API isn't very clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe stealingBy or divisionBy
|
||
classOf[Executors] | ||
.getMethod("newThreadPerTaskExecutor", classOf[ThreadFactory]) | ||
.invoke(null, factory).asInstanceOf[Executor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I just did the same too
toohttps://github.com/apache/rocketmq/pull/8063
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you don't plan to pooling the virtual thread right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! I tried polling but putting virtual threads to sleep is too expensive and it's cheaper to allocate new ones. Virtual threads are allocated only when workers start, though. A worker mounted on a virtual thread can execute several tasks until there's no more pending work, which avoids the cost of the virtual thread creation on each task execution like a plain newThreadPerTaskExecutor
does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, If totally move to none pooling will need to change a lot of the current work base at work, where thread pool is using like some kind of concurrency limiter too
"kyo" :: "scheduler" :: path.toList | ||
|
||
private def bug(msg: String, ex: Throwable) = | ||
log.severe(s"🙈 !!Kyo Scheduler Bug!! $msg \n Caused by: ${stackTrace(ex)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why not use the overload of this method where you can pass the Throwable
as an argument?
I was hoping that the logging backend will do the printing of the Throwable's stack trace for us, instead of us having to do it manually with out own ad hoc method (private def stackTrace(ex: Throwable)
).
Supplying an intact Throwable instance to the logger will also have better interop with 3rd parties. (You can always turn a Throwable to String, but you can't go the other way around.)
Kyo's current scheduler is functional and has been serving the projects need's well but the code is focused on simplicity using global objects and with only indirect coverage via tests that use fibers. This PR introduces a new version of the scheduler with several major improvements:
Regulator
abstraction was introduced to replace the concurrency management byCoordinator
, which has been removed.Admission
regulator probes the scheduler's queuing delay in order to provide a back pressure signal. The signal isn't integrated in other modules yet but it should be used to reject requests inkyo-tapir
for example.Task
interface is now a first-class public API that provides built-in runtime tracking, which simplifiedIOTask
.I realize this PR is too large to review. I apologize for that but isolating changes would slow things down too much, which doesn't seem necessary in the current phase of the project. Please let me know if this kind of change is getting too intrusive, though. I'm also planning to eventually write a separate readme for the scheduler but I want to have some experience with it first.