Skip to content

Commit

Permalink
Various fixes to the benchmark client and server (grpc#1944)
Browse files Browse the repository at this point in the history
Motivation:

The benchmark client and server were implemented before we had a working
transport making them impossible to test. As it turns out there were a
few issues.

Modifications:

- BenchmarkClient:
  - Some RPC types have been removed. All other implementations only
    support unary and streaming and we never run scenarios using the
    other types.
  - The implementation of the streaming RPC has been altered to record
    the latency per request/response message pair. This aligns with
    other implementations.
  - The implementation of the streaming RPC has also been changed to
    send the response message sequence into the request writer. This
    yields a fairly substantial performance improvement (~3.5x) over
    the existing implementation.
  - The streaming RPC now respects the messages per stream config being
    zero (meaning no limit).
  - An 'is shutting down' atomic is used to stop the client from
    initiating new RPCs before closing.
- BenchmarkService:
  - No semantic changes; the typealiases have been desugared following
    from changes in fea1b72
- WorkerService:
  - The state machine has been tightened up a bit to more clearly
    separate state from side effects and to avoid leaking the
    implementation of the state machine into the service.
  - Added logic for creating clients and servers with an HTTP/2
    transport.

Result:

Can run perf tests
  • Loading branch information
glbrntt committed Jun 21, 2024
1 parent e9490d1 commit cd00d5c
Show file tree
Hide file tree
Showing 5 changed files with 490 additions and 349 deletions.
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 259,8 @@ extension Target {
name: "performance-worker",
dependencies: [
.grpcCore,
.grpcHTTP2Core,
.grpcHTTP2TransportNIOPosix,
.grpcProtobuf,
.nioCore,
.nioFileSystem
Expand Down
257 changes: 130 additions & 127 deletions Sources/performance-worker/BenchmarkClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 14,68 @@
* limitations under the License.
*/

import Atomics
import Foundation
import GRPCCore
import NIOConcurrencyHelpers

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
struct BenchmarkClient {
private let _isShuttingDown = ManagedAtomic(false)

/// Whether the benchmark client is shutting down. Used to control when to stop sending messages
/// or creating new RPCs.
private var isShuttingDown: Bool {
self._isShuttingDown.load(ordering: .relaxed)
}

/// The underlying client.
private var client: GRPCClient
private var rpcNumber: Int32

/// The number of concurrent RPCs to run.
private var concurrentRPCs: Int

/// The type of RPC to make against the server.
private var rpcType: RPCType
private var messagesPerStream: Int32
private var protoParams: Grpc_Testing_SimpleProtoParams

/// The max number of messages to send on a stream before replacing the RPC with a new one. A
/// value of zero means there is no limit.
private var messagesPerStream: Int
private var noMessageLimit: Bool { self.messagesPerStream == 0 }

/// The message to send for all RPC types to the server.
private let message: Grpc_Testing_SimpleRequest

/// Per RPC stats.
private let rpcStats: NIOLockedValueBox<RPCStats>

init(
client: GRPCClient,
rpcNumber: Int32,
concurrentRPCs: Int,
rpcType: RPCType,
messagesPerStream: Int32,
messagesPerStream: Int,
protoParams: Grpc_Testing_SimpleProtoParams,
histogramParams: Grpc_Testing_HistogramParams?
) {
self.client = client
self.rpcNumber = rpcNumber
self.concurrentRPCs = concurrentRPCs
self.messagesPerStream = messagesPerStream
self.protoParams = protoParams
self.rpcType = rpcType
self.message = .with {
$0.responseSize = protoParams.respSize
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: Int(protoParams.reqSize))
}
}

let histogram: RPCStats.LatencyHistogram
if let histogramParams = histogramParams {
histogram = .init(
histogram = RPCStats.LatencyHistogram(
resolution: histogramParams.resolution,
maxBucketStart: histogramParams.maxPossible
)
} else {
histogram = .init()
histogram = RPCStats.LatencyHistogram()
}

self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram))
Expand All @@ -57,9 84,6 @@ struct BenchmarkClient {
enum RPCType {
case unary
case streaming
case streamingFromClient
case streamingFromServer
case streamingBothWays
}

internal var currentStats: RPCStats {
Expand All @@ -69,33 93,53 @@ struct BenchmarkClient {
}

internal func run() async throws {
let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: client)
let benchmarkClient = Grpc_Testing_BenchmarkServiceClient(client: self.client)
return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
// Start the client.
clientGroup.addTask { try await client.run() }
clientGroup.addTask {
try await self.client.run()
}

// Make the requests to the server and register the latency for each one.
try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
for _ in 0 ..< self.rpcNumber {
// Start one task for each concurrent RPC and keep looping in that task until indicated
// to stop.
for _ in 0 ..< self.concurrentRPCs {
rpcsGroup.addTask {
let (latency, errorCode) = try await self.makeRPC(
benchmarkClient: benchmarkClient
)
self.rpcStats.withLockedValue {
$0.latencyHistogram.record(latency)
if let errorCode = errorCode {
$0.requestResultCount[errorCode, default: 1] = 1
while !self.isShuttingDown {
switch self.rpcType {
case .unary:
await self.unary(benchmark: benchmarkClient)

case .streaming:
await self.streaming(benchmark: benchmarkClient)
}
}
}
}

try await rpcsGroup.waitForAll()
}

self.client.close()
try await clientGroup.next()
}
}

private func record(latencyNanos: Double, errorCode: RPCError.Code?) {
self.rpcStats.withLockedValue { stats in
stats.latencyHistogram.record(latencyNanos)
if let errorCode = errorCode {
stats.requestResultCount[errorCode, default: 0] = 1
}
}
}

private func record(errorCode: RPCError.Code) {
self.rpcStats.withLockedValue { stats in
stats.requestResultCount[errorCode, default: 0] = 1
}
}

private func timeIt<R>(
_ body: () async throws -> R
) async rethrows -> (R, nanoseconds: Double) {
Expand All @@ -105,133 149,92 @@ struct BenchmarkClient {
return (result, nanoseconds: Double(endTime - startTime))
}

// The result is the number of nanoseconds for processing the RPC.
private func makeRPC(
benchmarkClient: Grpc_Testing_BenchmarkServiceClient
) async throws -> (latency: Double, errorCode: RPCError.Code?) {
let message = Grpc_Testing_SimpleRequest.with {
$0.responseSize = self.protoParams.respSize
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: Int(self.protoParams.reqSize))
private func unary(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
try await benchmark.unaryCall(request: ClientRequest.Single(message: self.message)) {
_ = try $0.message
}
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
}
}

switch self.rpcType {
case .unary:
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
try await benchmarkClient.unaryCall(
request: ClientRequest.Single(message: message)
) { response in
_ = try response.message
}
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
}
}
return (latency: nanoseconds, errorCode)
self.record(latencyNanos: nanoseconds, errorCode: errorCode)
}

// Repeated sequence of one request followed by one response.
// It is a ping-pong of messages between the client and the server.
case .streaming:
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
let ids = AsyncStream.makeStream(of: Int.self)
let streamingRequest = ClientRequest.Stream { writer in
for try await id in ids.stream {
if id <= self.messagesPerStream {
try await writer.write(message)
} else {
return
}
}
}
private func streaming(benchmark: Grpc_Testing_BenchmarkServiceClient) async {
// Streaming RPCs ping-pong messages back and forth. To achieve this the response message
// stream is sent to the request closure, and the request closure indicates the outcome back
// to the response handler to keep the RPC alive for the appropriate amount of time.
let status = AsyncStream.makeStream(of: RPCError.self)
let response = AsyncStream.makeStream(of: RPCAsyncSequence<Grpc_Testing_SimpleResponse>.self)

ids.continuation.yield(1)
let request = ClientRequest.Stream(of: Grpc_Testing_SimpleRequest.self) { writer in
defer { status.continuation.finish() }

try await benchmarkClient.streamingCall(request: streamingRequest) { response in
var id = 1
for try await _ in response.messages {
id = 1
ids.continuation.yield(id)
}
}
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
}
// The time at which the last message was sent.
var lastMessageSendTime = DispatchTime.now()
try await writer.write(self.message)

// Wait for the response stream.
var iterator = response.stream.makeAsyncIterator()
guard let responses = await iterator.next() else {
throw RPCError(code: .internalError, message: "")
}
return (latency: nanoseconds, errorCode)

case .streamingFromClient:
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
let streamingRequest = ClientRequest.Stream { writer in
for _ in 1 ... self.messagesPerStream {
try await writer.write(message)
}
}
// Record the first latency.
let now = DispatchTime.now()
let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
lastMessageSendTime = now
self.record(latencyNanos: Double(nanos), errorCode: nil)

try await benchmarkClient.streamingFromClient(
request: streamingRequest
) { response in
_ = try response.message
}
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
}
}
return (latency: nanoseconds, errorCode)
// Now start looping. Only stop when the max messages per stream is hit or told to stop.
var responseIterator = responses.makeAsyncIterator()
var messagesSent = 1

case .streamingFromServer:
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
while !self.isShuttingDown && (self.noMessageLimit || messagesSent < self.messagesPerStream) {
messagesSent = 1
do {
try await benchmarkClient.streamingFromServer(
request: ClientRequest.Single(message: message)
) { response in
for try await _ in response.messages {}
if try await responseIterator.next() != nil {
let now = DispatchTime.now()
let nanos = now.uptimeNanoseconds - lastMessageSendTime.uptimeNanoseconds
lastMessageSendTime = now
self.record(latencyNanos: Double(nanos), errorCode: nil)
try await writer.write(message)
} else {
break
}
return nil
} catch let error as RPCError {
return error.code
status.continuation.yield(error)
break
} catch {
return .unknown
status.continuation.yield(RPCError(code: .unknown, message: ""))
break
}
}
return (latency: nanoseconds, errorCode)

case .streamingBothWays:
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
let streamingRequest = ClientRequest.Stream { writer in
for _ in 1 ... self.messagesPerStream {
try await writer.write(message)
}
}
}

try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in
for try await _ in response.messages {}
}
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
do {
try await benchmark.streamingCall(request: request) {
response.continuation.yield($0.messages)
response.continuation.finish()
for await errorCode in status.stream {
throw errorCode
}
}
return (latency: nanoseconds, errorCode)
} catch let error as RPCError {
self.record(errorCode: error.code)
} catch {
self.record(errorCode: .unknown)
}
}

internal func shutdown() {
self._isShuttingDown.store(true, ordering: .relaxed)
self.client.close()
}
}
Loading

0 comments on commit cd00d5c

Please sign in to comment.