Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Scheduler #278

Merged
merged 59 commits into from
Apr 24, 2024
Merged

New Scheduler #278

merged 59 commits into from
Apr 24, 2024

Conversation

fwbrasil
Copy link
Collaborator

Kyo's current scheduler is functional and has been serving the projects need's well but the code is focused on simplicity using global objects and with only indirect coverage via tests that use fibers. This PR introduces a new version of the scheduler with several major improvements:

  • The scheduler is now in a separate module with no dependencies to other kyo modules. This allows users to adopt the scheduler in isolation, for example with another effect system like ZIO.
  • The code was refactored to become more testable via function composition and constructor injection. Its test coverage is quite high now.
  • A new Regulator abstraction was introduced to replace the concurrency management by Coordinator, which has been removed.
  • In addition to the concurrency regulator, a new Admission regulator probes the scheduler's queuing delay in order to provide a back pressure signal. The signal isn't integrated in other modules yet but it should be used to reject requests in kyo-tapir for example.
  • The Task interface is now a first-class public API that provides built-in runtime tracking, which simplified IOTask.

I realize this PR is too large to review. I apologize for that but isolating changes would slow things down too much, which doesn't seem necessary in the current phase of the project. Please let me know if this kind of change is getting too intrusive, though. I'm also planning to eventually write a separate readme for the scheduler but I want to have some experience with it first.

@@ -145,12 145,12 @@ object core:
end Handler

trait Safepoint[-E]:
def check(): Boolean
def preempt(): Boolean
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor refactoring for clarity.

@@ -129,7 129,7 @@ private[kyo] class IOPromise[T](state: State[T])
promise.get() match
case _: Pending[T] @unchecked =>
IOs {
Scheduler.flush()
Scheduler.get.flush()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler isn't an object anymore. The global instance returned by get isn't lazily initialized because it'd require a memory barrier to read the field. It uses a regular val loaded when the companion object class is loaded.

@volatile private var state: Int // Math.abs(state) => runtime; state < 0 => preempting
) extends IOPromise[T] with Task
initialRuntime: Int
) extends IOPromise[T] with Task(initialRuntime)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime and preemption management are now done by Task.

import java.util.concurrent.Executor
import java.util.concurrent.locks.LockSupport

final private class InternalClock(executor: Executor):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task's runtime tracking has to measure execution times and obtaining the time from the system clock is prohibitively expensive. This internal clock provides approximate time measurements by updating a volatile field every ~1ms. Readers only pay the price of the read barrier and eventual cache misses when the field is updated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

abstract private class InternalTimer:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced for testability.


private val queue = PriorityQueue[T]()

@volatile private var items = 0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a micro optimization. If items is marked as volatile, methods like add have to use memory barriers to update it within a modify/tryModify block, which is unnecessary since there's already a write barrier at the end. to release the lock.


def size(): Int =
VarHandle.acquireFence()
items
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force a read barrier since the field isn't volatile anymore so all threads see the latest value.

@@ -61,7 62,7 @@ final private[kyo] class Queue[T](using ord: Ordering[T]) extends AtomicBoolean:
!isEmpty() && to.isEmpty() && to.tryModify {
t = queue.dequeue()
val s = size() - 1
var i = s - (s / 2)
var i = s - Math.ceil(s.toDouble / 2).intValue()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a worker had 3 tasks, this code was stealing 2 for example. This new logic makes it steal only 1. Leaving a worker with only one task increases the likelihood of it'll soon go to sleep again. If a thief is successful, it'll naturally try stealing again when it's out of tasks.

items = 0
queue.dequeueAll
}
tasks.foreach(f)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was unnecessarily holding the lock when executing the drain function.

@@ -0,0 1,39 @@
package kyo.scheduler.util

final private[kyo] class MovingStdDev(window: Int):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class isn't used in a very hot path anymore and is called only by regulators that execute periodically, it was updated to favor precision over performance.


val a1, a2, a3, a4, a5, a6, a7 = 0L // padding

@volatile private var cycles = 0L
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field used to be in Coordinator.

ExecutionContext.fromExecutor(asExecutor)

@tailrec
private def schedule(t: Task, submitter: Worker): Unit =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These scheduling methods have not changed.

workers(idx) = new Worker(idx, pool, schedule, steal, () => cycles, clock)
allocatedWorkers = 1

private val cycleTask =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from Coordinator

val w = workers(i)
if w != null then
if i >= maxConcurrency then
w.drain()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved logic to continue cycling all workers but also draining workers that have been stopped (are above the concurrency limit)


val a1, a2, a3, a4, a5, a6, a7 = 0L // padding

@volatile private var running = false
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class hasn't changed much but I've made running a volatile that is updated using VarHandle as a micro optimization to avoid the pointer chasing with AtomicBoolean since the flag used in the hot path of task execution.

val state = m.getState().ordinal()
state == Thread.State.BLOCKED.ordinal() ||
state == Thread.State.WAITING.ordinal() ||
state == Thread.State.TIMED_WAITING.ordinal()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous implementation was missing some of these states.

Copy link
Collaborator

@hearnadam hearnadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a huge improvement! Nice work!

@@ -3,13 3,19 @@ package kyo.scheduler
import scala.scalajs.concurrent.JSExecutionContext

object Scheduler:
lazy val get = new Scheduler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be lazy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

import java.util.concurrent.Executor
import java.util.concurrent.locks.LockSupport

final private class InternalClock(executor: Executor):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


def scheduleOnce(delay: Duration)(f: => Unit): TimerTask =
val future = executor.schedule((() => f): Runnable, delay.toNanos, TimeUnit.NANOSECONDS)
new TimerTask:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid this allocation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're getting used to Kyo's focus on performance :) In this case, I don't think it's necessary because only two tasks are submitted when the scheduler starts. It'd also be more difficult to mock this class in tests if we used an opaque type to alias ScheduledFuture.

end if
catch
case ex if NonFatal(ex) =>
log.error(s"🙈 !!Kyo Scheduler Bug!! ${getClass.getSimpleName()} regulator's probe collection has failed.", ex)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we are using a logger instance for this class, we can remove getClass.getSimpleName()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah actually I'm not using Kyo's logging because the idea is for the scheduler module to be isolated without a dependency to kyo-core. This is the regular slf4j logger.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but you are using SLF4J instance for each class which should include the classname. See line 92:

private[Regulator] val log = LoggerFactory.getLogger(getClass)

This goes throughout

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log in this case would be Regulator, not the subclass. But I've decided to remove the slf4j dependency since logging is used only for bugs. There's the LoomSupport warning but I think it's ok to log that with a regular println. The module now has zero dependencies, which should help avoid issues in case it's used in isolation without other Kyo modules.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool emoji!

@fwbrasil
Copy link
Collaborator Author

fwbrasil commented Apr 23, 2024

Benchmark results with the new scheduler:

image

The results are mixed and the RandomBench improvement seems just noise. I think the results are reasonable, though. The scheduler now does much more handling blocking, adjusting concurrency, and providing a backpressure signal. Not having a significant regression is already a good win and we can keep optimizing it over time.

@hearnadam would you be able to check out this branch and run this main? It checks that the regulators are working properly, it'd be nice to get data from other machines as well. I'm doing some more testing using containers and CPU quotas to validate the behavior with CPU throttling and I'm planning to merge this PR if everything looks ok.

@hearnadam
Copy link
Collaborator

@fwbrasil will take a look at running this tonight.

@fwbrasil
Copy link
Collaborator Author

Self-check is ok with CPU quotas as well:

fwbrasil@flavios-mbp kyo % docker run --rm --cpus=1 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.02040816326530612, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.8846153846153846, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=2 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 1.2727272727272727, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=3 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.06521739130434782, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.21951219512195122, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=4 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.06521739130434782, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.21951219512195122, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=5 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 8, rejectionPercent -> 0.020833333333333332, rejectionThreshold -> 0.2)
Map(clients -> 9, rejectionPercent -> 1.5, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=6 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 8, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 9, rejectionPercent -> 1.380952380952381, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=7 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 8, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 9, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 10, rejectionPercent -> 0.1951219512195122, rejectionThreshold -> 0.2)
Map(clients -> 11, rejectionPercent -> 0.7241379310344828, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=8 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.041666666666666664, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.53125, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=9 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 8, rejectionPercent -> 0.0425531914893617, rejectionThreshold -> 0.2)
Map(clients -> 9, rejectionPercent -> 1.0, rejectionThreshold -> 0.2)
Success
fwbrasil@flavios-mbp kyo % docker run --rm --cpus=10 -v /Users/fwbrasil/workspace/kyo/kyo-scheduler/jvm/target/scala-3.4.1:/app adoptopenjdk/openjdk11 java -cp /app/kyo-scheduler-assembly-0.9.2 71-82298105 20240423-1138-SNAPSHOT.jar kyo.scheduler.util.SelfCheckMain
Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
Map(clients -> 8, rejectionPercent -> 0.041666666666666664, rejectionThreshold -> 0.2)
Map(clients -> 9, rejectionPercent -> 0.16666666666666666, rejectionThreshold -> 0.2)
Map(clients -> 10, rejectionPercent -> 1.2727272727272727, rejectionThreshold -> 0.2)
Success

@fwbrasil
Copy link
Collaborator Author

I've made a few new optimizations and the benchmark results are reporting more consistent good results:

image

The main cause for the regression in ForkSpawnBench is the path for the fiber to check for the preemption signal involving more code now that Task manages the preemption instead of IOTask. Since Task is a trait, Scala encodes methods in the companion object and accessing the state field has to go through a getter.

@hearnadam
Copy link
Collaborator

hearnadam commented Apr 24, 2024

@fwbrasil

sbt:kyo-scheduler> runMain kyo.scheduler.util.SelfCheckMain
[info] running (fork) kyo.scheduler.util.SelfCheckMain 
[info] Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 4, rejectionPercent -> 0.02127659574468085, rejectionThreshold -> 0.2)
[info] Map(clients -> 5, rejectionPercent -> 0.75, rejectionThreshold -> 0.2)
[info] Failure: Expected between 6.4 and 14.0 clients for 8 cores but found 5.
[success] Total time: 25 s, completed Apr 23, 2024, 5:45:49 PM
sbt:kyo-scheduler> runMain kyo.scheduler.util.SelfCheckMain
[info] running (fork) kyo.scheduler.util.SelfCheckMain 
[info] Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 5, rejectionPercent -> 0.14285714285714285, rejectionThreshold -> 0.2)
[info] Map(clients -> 6, rejectionPercent -> 2.0625, rejectionThreshold -> 0.2)
[info] Failure: Expected between 6.4 and 14.0 clients for 8 cores but found 6.
[success] Total time: 30 s, completed Apr 23, 2024, 5:47:04 PM
sbt:kyo-scheduler> runMain kyo.scheduler.util.SelfCheckMain
[info] running (fork) kyo.scheduler.util.SelfCheckMain 
[info] Map(clients -> 0, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 1, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 2, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 3, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 4, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 5, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 6, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 7, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 8, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 9, rejectionPercent -> 0.0, rejectionThreshold -> 0.2)
[info] Map(clients -> 10, rejectionPercent -> 0.1951219512195122, rejectionThreshold -> 0.2)
[info] Map(clients -> 11, rejectionPercent -> 4.333333333333333, rejectionThreshold -> 0.2)
[info] Success

On: 6e838b14047e9eb0f57caab112c34ba49196ef2b

@fwbrasil
Copy link
Collaborator Author

@hearnadam can you check if you have other processes that could be using some of the CPU?

Comment on lines 8 to 9
private def bug(msg: String, ex: Throwable) =
(new Exception("🙈 !!Kyo Scheduler Bug!! " msg, ex)).printStackTrace(System.err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def bug(msg: String, ex: Throwable) =
(new Exception("🙈 !!Kyo Scheduler Bug!! " msg, ex)).printStackTrace(System.err)
import java.util.logging.*
private lazy val logger = Logger.getLogger(getClass.getName)
private def bug(msg: String, ex: Throwable) =
logger.log(Level.SEVERE, "🙈 !!Kyo Scheduler Bug!!", new Exception(msg, ex))

Since you want 0 dependencies, we can still use java.util.logging, since that's part of the standard library. This will give people better interoperability with other logging frameworks or other tools hooked on logging, like Sentry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! 🙏

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define your own logging and then there will be a adapter.

Netty 5 is using slf4j again

@fwbrasil
Copy link
Collaborator Author

I've changed the code use java logging as @sideeffffect suggested. I've also tried another optimization moving the preemption handling from Task to Worker but the results weren't good because obtaining the current worker is more expensive.

I'm merging this PR since it keeps getting larger but please feel free to provide feedback on the change!

@fwbrasil fwbrasil merged commit fd554d5 into main Apr 24, 2024
3 checks passed
@fwbrasil fwbrasil deleted the scheduler-review branch April 24, 2024 17:38
@nowarn
final class InternalClock(executor: Executor = null):

def currentMillis(): Long = System.currentTimeMillis()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be nano time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.nanoTime isn't monotonic across threads. It also helps to hide the imprecision of the clock.


@volatile private var millis = System.currentTimeMillis()

val b1, b2, b3, b4, b5, b6, b7 = 0L // padding
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool padding

i = 1
end while
if worker != null then
worker.steal(thief)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker stealing thief not thief stealing worker?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's thief taking tasks from worker but yeah, the API isn't very clear.

Copy link

@He-Pin He-Pin Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe stealingBy or divisionBy


classOf[Executors]
.getMethod("newThreadPerTaskExecutor", classOf[ThreadFactory])
.invoke(null, factory).asInstanceOf[Executor]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I just did the same too
toohttps://github.com/apache/rocketmq/pull/8063

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you don't plan to pooling the virtual thread right?

Copy link
Collaborator Author

@fwbrasil fwbrasil Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I tried polling but putting virtual threads to sleep is too expensive and it's cheaper to allocate new ones. Virtual threads are allocated only when workers start, though. A worker mounted on a virtual thread can execute several tasks until there's no more pending work, which avoids the cost of the virtual thread creation on each task execution like a plain newThreadPerTaskExecutor does.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, If totally move to none pooling will need to change a lot of the current work base at work, where thread pool is using like some kind of concurrency limiter too

"kyo" :: "scheduler" :: path.toList

private def bug(msg: String, ex: Throwable) =
log.severe(s"🙈 !!Kyo Scheduler Bug!! $msg \n Caused by: ${stackTrace(ex)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why not use the overload of this method where you can pass the Throwable as an argument?

I was hoping that the logging backend will do the printing of the Throwable's stack trace for us, instead of us having to do it manually with out own ad hoc method (private def stackTrace(ex: Throwable)).

Supplying an intact Throwable instance to the logger will also have better interop with 3rd parties. (You can always turn a Throwable to String, but you can't go the other way around.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants