Skip to content

Commit

Permalink
Centralise more locks (#8389)
Browse files Browse the repository at this point in the history
Refactor: Centralize lock management

This change centralizes the management of locks and conditions by introducing a centralised Locks registry. This allows for easier debugging and potential future improvements to lock usage.

The `withLock` and `newLockCondition` functions are now invoked through the `Locks` object, providing a central point of access.
  • Loading branch information
yschimke authored Nov 17, 2024
1 parent 02d69bf commit 3e56089
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 25,9 @@ import java.util.logging.Level
import java.util.logging.LogManager
import java.util.logging.LogRecord
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.internal.buildConnectionPool
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.internal.taskRunnerInternal
Expand Down Expand Up @@ -234,7 234,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
// a test timeout failure.
val waitTime = (entryTime 1_000_000_000L - System.nanoTime())
if (!queue.idleLatch().await(waitTime, TimeUnit.NANOSECONDS)) {
TaskRunner.INSTANCE.lock.withLock {
TaskRunner.INSTANCE.withLock {
TaskRunner.INSTANCE.cancelAll()
}
fail<Unit>("Queue still active after 1000 ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

package okhttp3.internal.concurrent

import assertk.assertThat
Expand All @@ -23,9 25,9 @@ import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.OkHttpClient
import okhttp3.TestUtil.threadFactory
import okhttp3.internal.connection.Locks.withLock

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand Down Expand Up @@ -170,7 172,7 @@ class TaskFaker : Closeable {
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
check(currentTask == TestThreadSerialTask)
nanoTime = newTime
yieldUntil(ResumePriority.AfterOtherTasks)
Expand All @@ -181,7 183,7 @@ class TaskFaker : Closeable {
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
assertThat(activeThreads).isEqualTo(0)
}
}
Expand Down Expand Up @@ -211,7 213,7 @@ class TaskFaker : Closeable {
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
val contextSwitchCountBefore = contextSwitchCount
yieldUntil(ResumePriority.BeforeOtherTasks) {
contextSwitchCount > contextSwitchCountBefore
Expand All @@ -221,7 223,7 @@ class TaskFaker : Closeable {

/** Sleep until [durationNanos] elapses. For use by the task threads. */
fun sleep(durationNanos: Long) {
taskRunner.lock.withLock {
taskRunner.withLock {
val sleepUntil = nanoTime durationNanos
yieldUntil { nanoTime >= sleepUntil }
}
Expand All @@ -233,7 235,7 @@ class TaskFaker : Closeable {
*/
fun yield() {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
taskRunner.withLock {
yieldUntil()
}
}
Expand Down Expand Up @@ -332,7 334,7 @@ class TaskFaker : Closeable {
runnable.run()
require(currentTask == this) { "unexpected current task: $currentTask" }
} finally {
taskRunner.lock.withLock {
taskRunner.withLock {
activeThreads--
startNextTask()
}
Expand All @@ -358,7 360,7 @@ class TaskFaker : Closeable {
timeout: Long,
unit: TimeUnit,
): T? {
taskRunner.lock.withLock {
taskRunner.withLock {
val waitUntil = nanoTime unit.toNanos(timeout)
while (true) {
val result = poll()
Expand All @@ -371,7 373,7 @@ class TaskFaker : Closeable {
}

override fun put(element: T) {
taskRunner.lock.withLock {
taskRunner.withLock {
delegate.put(element)
editCount
}
Expand Down
14 changes: 7 additions & 7 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 18,8 @@ package okhttp3.internal.concurrent
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import okhttp3.internal.assertNotHeld
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.okHttpName

/**
Expand All @@ -32,7 32,7 @@ class TaskQueue internal constructor(
internal val taskRunner: TaskRunner,
internal val name: String,
) {
val lock: ReentrantLock = ReentrantLock()
internal val lock: ReentrantLock = ReentrantLock()

internal var shutdown = false

Expand All @@ -50,7 50,7 @@ class TaskQueue internal constructor(
* currently-executing task unless it is also scheduled for future execution.
*/
val scheduledTasks: List<Task>
get() = taskRunner.lock.withLock { futureTasks.toList() }
get() = taskRunner.withLock { futureTasks.toList() }

/**
* Schedules [task] for execution in [delayNanos]. A task may only have one future execution
Expand All @@ -66,7 66,7 @@ class TaskQueue internal constructor(
task: Task,
delayNanos: Long = 0L,
) {
taskRunner.lock.withLock {
taskRunner.withLock {
if (shutdown) {
if (task.cancelable) {
taskRunner.logger.taskLog(task, this) { "schedule canceled (queue is shutdown)" }
Expand Down Expand Up @@ -126,7 126,7 @@ class TaskQueue internal constructor(

/** Returns a latch that reaches 0 when the queue is next idle. */
fun idleLatch(): CountDownLatch {
taskRunner.lock.withLock {
taskRunner.withLock {
// If the queue is already idle, that's easy.
if (activeTask == null && futureTasks.isEmpty()) {
return CountDownLatch(0)
Expand Down Expand Up @@ -208,7 208,7 @@ class TaskQueue internal constructor(
fun cancelAll() {
lock.assertNotHeld()

taskRunner.lock.withLock {
taskRunner.withLock {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
Expand All @@ -218,7 218,7 @@ class TaskQueue internal constructor(
fun shutdown() {
lock.assertNotHeld()

taskRunner.lock.withLock {
taskRunner.withLock {
shutdown = true
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
Expand Down
17 changes: 9 additions & 8 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 23,11 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.internal.addIfAbsent
import okhttp3.internal.assertHeld
import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE
import okhttp3.internal.connection.Locks.newLockCondition
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.okHttpName
import okhttp3.internal.threadFactory

Expand All @@ -45,8 46,8 @@ class TaskRunner(
val backend: Backend,
internal val logger: Logger = TaskRunner.logger,
) {
val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newCondition()
internal val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newLockCondition()

private var nextQueueName = 10000
private var coordinatorWaiting = false
Expand Down Expand Up @@ -75,7 76,7 @@ class TaskRunner(
var incrementedRunCallCount = false
while (true) {
val task =
this@TaskRunner.lock.withLock {
this@TaskRunner.withLock {
if (!incrementedRunCallCount) {
incrementedRunCallCount = true
runCallCount
Expand All @@ -91,7 92,7 @@ class TaskRunner(
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
this@TaskRunner.withLock {
startAnotherThread()
}
}
Expand Down Expand Up @@ -139,7 140,7 @@ class TaskRunner(
try {
delayNanos = task.runOnce()
} finally {
lock.withLock {
this.withLock {
afterRun(task, delayNanos)
}
currentThread.name = oldName
Expand Down Expand Up @@ -264,7 265,7 @@ class TaskRunner(
}

fun newQueue(): TaskQueue {
val name = lock.withLock { nextQueueName }
val name = this.withLock { nextQueueName }
return TaskQueue(this, "Q$name")
}

Expand All @@ -273,7 274,7 @@ class TaskRunner(
* necessarily track queues that have no tasks scheduled.
*/
fun activeQueues(): List<TaskQueue> {
lock.withLock {
this.withLock {
return busyQueues readyQueues
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 26,6 @@ import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
import okhttp3.CertificatePinner
import okhttp3.ConnectionSpec
import okhttp3.Handshake
Expand Down
46 changes: 39 additions & 7 deletions okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 17,15 @@

package okhttp3.internal.connection

import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import okhttp3.Dispatcher
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.http2.Http2Connection
import okhttp3.internal.http2.Http2Stream
import okhttp3.internal.http2.Http2Writer
Expand All @@ -32,34 36,62 @@ import okhttp3.internal.http2.Http2Writer
internal object Locks {
inline fun <T> Dispatcher.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> RealConnection.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> RealCall.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Connection.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Stream.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Writer.withLock(action: () -> T): T {
inline fun <T> TaskRunner.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

inline fun <T> TaskQueue.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

inline fun <T> Http2Writer.withLock(action: () -> T): T {
// TODO can we assert we don't have the connection lock?

return lock.withLock(action)
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

/**
* A no cost (inlined) alias to [ReentrantLock#newCondition] for an OkHttp Lock.
* No function on its own but places a central place that all conditions go through to allow
* temporary debugging.
*/
internal fun ReentrantLock.newLockCondition(): Condition {
return this.newCondition()
}

/**
* A no cost (inlined) alias to [ReentrantLock#withLock] for an OkHttp Lock.
* No function on its own but places a central place that all locks go through to allow
* temporary debugging.
*/
inline fun <T> ReentrantLock.runWithLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return withLock(action)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 26,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
import okhttp3.Address
import okhttp3.Connection
import okhttp3.ConnectionListener
Expand Down Expand Up @@ -335,7 334,7 @@ class RealConnection(
return http2Connection.isHealthy(nowNs)
}

val idleDurationNs = lock.withLock { nowNs - idleAtNs }
val idleDurationNs = this.withLock { nowNs - idleAtNs }
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source)
}
Expand All @@ -354,7 353,7 @@ class RealConnection(
connection: Http2Connection,
settings: Settings,
) {
lock.withLock {
this.withLock {
val oldLimit = allocationLimit
allocationLimit = settings.getMaxConcurrentStreams()

Expand Down Expand Up @@ -398,7 397,7 @@ class RealConnection(
e: IOException?,
) {
var noNewExchangesEvent = false
lock.withLock {
this.withLock {
if (e is StreamResetException) {
when {
e.errorCode == ErrorCode.REFUSED_STREAM -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 27,7 @@ import okhttp3.internal.EMPTY_HEADERS
import okhttp3.internal.assertThreadDoesntHoldLock
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.Locks.newLockCondition
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM
import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE
Expand Down Expand Up @@ -56,7 57,7 @@ import okio.source
@Suppress("NAME_SHADOWING")
class Http2Connection internal constructor(builder: Builder) : Closeable {
internal val lock: ReentrantLock = ReentrantLock()
internal val condition: Condition = lock.newCondition()
internal val condition: Condition = lock.newLockCondition()

// Internal state of this connection is guarded by 'lock'. No blocking operations may be
// performed while holding this lock!
Expand Down
Loading

0 comments on commit 3e56089

Please sign in to comment.