Skip to content

Commit

Permalink
Include the timestamp of sending in the heartbeat for server and client
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Jul 5, 2024
1 parent c9f43a4 commit a9327ef
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 74,24 @@ public StreamingHubMessageType ReadMessageType()
return (clientRequestMessageId, methodId, data.Slice(offset));
}

public (byte Sequence, ReadOnlyMemory<byte> Metadata) ReadServerHeartbeat()
public (byte Sequence, long ServerSentAt, ReadOnlyMemory<byte> Metadata) ReadServerHeartbeat()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
var sequence = reader.ReadByte(); // Sequence
reader.Skip(); // Dummy (2)
var serverSentAt = reader.ReadInt64(); // ServerSentAt (2)
reader.Skip(); // Dummy (3)

return (sequence, data.Slice((int)reader.Consumed));
return (sequence, serverSentAt, data.Slice((int)reader.Consumed));
}

public (byte Sequence, long SentAt) ReadClientHeartbeatResponse()
public (byte Sequence, long ClientSentAt) ReadClientHeartbeatResponse()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
var sequence = reader.ReadByte(); // Sequence
reader.Skip(); // Dummy (2)
reader.Skip(); // Dummy (3)

// Extra: [SentAt(long)]
var arrayLen = reader.ReadArrayHeader();
if (arrayLen == 0) throw new InvalidOperationException("Invalid client heartbeat response. An extra data is empty.");
var sentAt = reader.ReadInt64();
var clientSentAt = reader.ReadInt64(); // ClientSentAt (2)
reader.Skip(); // Reserved (3)

return (sequence, sentAt);
return (sequence, clientSentAt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,66 203,66 @@ public static void WriteClientResultResponseMessageForError(IBufferWriter<byte>
/// Writes a server heartbeat message for sending from the server.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteServerHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter, short sequence)
public static void WriteServerHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter, short sequence, DateTimeOffset serverSentAt)
{
// Array(5)[127, Sequence(int8), Nil, Nil, <Metadata>]
// Array(5)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil, <Metadata>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.Write(serverSentAt.ToUnixTimeMilliseconds()); // ServerSentAt
writer.WriteNil(); // Dummy
writer.Flush();
// // <Metadata>
// // <Metadata>
}

/// <summary>
/// Writes a server heartbeat message for sending response from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence)
public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence, long serverSentAt)
{
// Array(4)[127, Sequence(int8), Nil, Nil]
// Array(4)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.Write(serverSentAt); // ServerSentAt
writer.WriteNil(); // Dummy
writer.Flush();
}

/// <summary>
/// Writes a client heartbeat message for sending from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter, short sequence)
public static void WriteClientHeartbeatMessage(IBufferWriter<byte> bufferWriter, short sequence, DateTimeOffset clientSentAt)
{
// Array(4)[0x7e(126), Sequence(int8), Nil, <Extra>]
// Array(4)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7e); // Type = 0x7e / 126 (ClientHeartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.Write(0x7e); // 0:Type = 0x7e / 126 (ClientHeartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt.ToUnixTimeMilliseconds()); // 2:ClientSentAt
writer.WriteNil(); // 3:Reserved
writer.Flush();
// // <Extra>
}

/// <summary>
/// Writes a client heartbeat message for sending response from the server.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageResponseHeader(IBufferWriter<byte> bufferWriter, short sequence)
public static void WriteClientHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence, long clientSentAt)
{
// Array(5)[0x7e(126), Sequence(int8), Nil, Nil, <Extra>]
// Array(5)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), Nil, <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7e); // Type = 0x7e / 126 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Write(0x7e); // 0:Type = 0x7e / 126 (Heartbeat)
writer.Write(sequence); // 1:Sequence
writer.Write(clientSentAt); // 2:ClientSentAt
writer.WriteNil(); // 3:Reserved
writer.WriteNil(); // 4:Reserved
writer.Flush();
// // <Extra>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 77,14 @@ public StreamingHubMessageType ReadMessageType()
return (clientResultMessageId, clientMethodId, statusCode, detail, message);
}

public (short Sequence, ReadOnlyMemory<byte> Extra) ReadClientHeartbeat()
public (short Sequence, long ClientSentAt, ReadOnlyMemory<byte> Extra) ReadClientHeartbeat()
{
// [Sequence(int8), Nil, [SentAt(long)]]
// [Sequence(int8), ClientSentAt(long), <Extra>]
var sequence = reader.ReadInt16(); // Sequence
reader.Skip(); // Dummy
var clientSentAt = reader.ReadInt64(); // ClientSentAt
var extra = data.Slice((int)reader.Consumed);

return (sequence, data.Slice((int)reader.Consumed));
return (sequence, clientSentAt, data.Slice((int)reader.Consumed));
}

public short ReadServerHeartbeatResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 22,7 @@ public class StreamingHubClientOptions

public TimeSpan? ClientHeartbeatInterval { get; }
public TimeSpan? ClientHeartbeatTimeout { get; }
public Action<ReadOnlyMemory<byte>>? OnServerHeartbeatReceived { get; }
public Action<ServerHeartbeatEvent>? OnServerHeartbeatReceived { get; }
public Action<ClientHeartbeatEvent>? OnClientHeartbeatResponseReceived { get; }
#if NET8_0_OR_GREATER
public TimeProvider? TimeProvider { get; }
Expand All @@ -38,9 38,9 @@ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOn
}

#if NET8_0_OR_GREATER
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived,TimeProvider? timeProvider)
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ServerHeartbeatEvent>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived,TimeProvider? timeProvider)
#else
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived)
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ServerHeartbeatEvent>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived)
#endif
{
Host = host;
Expand Down Expand Up @@ -120,7 120,7 @@ public StreamingHubClientOptions WithClientHeartbeatTimeout(TimeSpan? timeout)
/// </summary>
/// <param name="onServerHeartbeatReceived"></param>
/// <returns></returns>
public StreamingHubClientOptions WithServerHeartbeatReceived(Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived)
public StreamingHubClientOptions WithServerHeartbeatReceived(Action<ServerHeartbeatEvent>? onServerHeartbeatReceived)
=> new(Host, CallOptions, SerializerProvider, Logger
, ClientHeartbeatInterval, ClientHeartbeatTimeout, onServerHeartbeatReceived, OnClientHeartbeatResponseReceived
#if NET8_0_OR_GREATER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 9,9 @@

namespace MagicOnion.Client
{
/// <summary>
/// Represents a client heartbeat received event.
/// </summary>
public readonly struct ClientHeartbeatEvent
{
/// <summary>
Expand All @@ -22,13 25,35 @@ public ClientHeartbeatEvent(long roundTripTimeMs)
}
}

/// <summary>
/// Represents a server heartbeat received event.
/// </summary>
public readonly struct ServerHeartbeatEvent
{
/// <summary>
/// Gets the server time at when the heartbeat was sent.
/// </summary>
public DateTimeOffset ServerTime { get; }

/// <summary>
/// Gets the metadata data. The data is only available during event processing.
/// </summary>
public ReadOnlyMemory<byte> Metadata { get; }

public ServerHeartbeatEvent(long serverTimeUnixMs, ReadOnlyMemory<byte> metadata)
{
ServerTime = DateTimeOffset.FromUnixTimeMilliseconds(serverTimeUnixMs);
Metadata = metadata;
}
}

internal class StreamingHubClientHeartbeatManager : IDisposable
{
readonly CancellationTokenSource timeoutTokenSource;
readonly CancellationTokenSource shutdownTokenSource;
readonly TimeSpan heartbeatInterval;
readonly TimeSpan timeoutPeriod;
readonly Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived;
readonly Action<ServerHeartbeatEvent>? onServerHeartbeatReceived;
readonly Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived;
readonly SynchronizationContext? synchronizationContext;
readonly ChannelWriter<StreamingHubPayload> writer;
Expand All @@ -48,7 73,7 @@ public StreamingHubClientHeartbeatManager(
ChannelWriter<StreamingHubPayload> writer,
TimeSpan heartbeatInterval,
TimeSpan timeoutPeriod,
Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived,
Action<ServerHeartbeatEvent>? onServerHeartbeatReceived,
Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived,
SynchronizationContext? synchronizationContext,
CancellationToken shutdownToken
Expand Down Expand Up @@ -142,7 167,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (sentSequence, sentAt) = reader.ReadClientHeartbeatResponse();
var (sentSequence, clientSentAt) = reader.ReadClientHeartbeatResponse();
if (sentSequence == (sequence - 1)/* NOTE: Sequence already 1 advanced.*/)
{
Expand All @@ -157,38 182,37 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEven
#else
DateTimeOffset.UtcNow;
#endif
var elapsed = now.ToUnixTimeMilliseconds() - sentAt;
var elapsed = now.ToUnixTimeMilliseconds() - clientSentAt;
clientHeartbeatReceivedAction?.Invoke(new ClientHeartbeatEvent(elapsed));
StreamingHubPayloadPool.Shared.Return(payload);
};

SendOrPostCallback ProcessServerHeartbeatCore(Action<ReadOnlyMemory<byte>>? serverHeartbeatReceivedAction) => (state) =>
SendOrPostCallback ProcessServerHeartbeatCore(Action<ServerHeartbeatEvent>? serverHeartbeatReceivedAction) => (state) =>
{
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (serverSentSequence, metadata) = reader.ReadServerHeartbeat();
var (serverSentSequence, serverSentAt, metadata) = reader.ReadServerHeartbeat();
serverHeartbeatReceivedAction?.Invoke(metadata);
serverHeartbeatReceivedAction?.Invoke(new ServerHeartbeatEvent(serverSentAt, metadata));
// Writes a ServerHeartbeatResponse to the writer queue.
_ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence));
_ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence, serverSentAt));
StreamingHubPayloadPool.Shared.Return(payload);
};

StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence)
StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence, long serverSentAt)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();
StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence);
StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence, serverSentAt);
return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan);
}

StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();
StreamingHubMessageWriter.WriteClientHeartbeatMessageHeader(buffer, clientSequence);

var now =
#if NET8_0_OR_GREATER
Expand All @@ -197,11 221,7 @@ StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence)
DateTimeOffset.UtcNow;
#endif

// Extra: [SentAt(long)]
var writer = new MessagePackWriter(buffer);
writer.WriteArrayHeader(1);
writer.Write(now.ToUnixTimeMilliseconds());
writer.Flush();
StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, now);
return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan);
}

Expand Down
8 changes: 4 additions & 4 deletions src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 22,7 @@ public class StreamingHubClientOptions

public TimeSpan? ClientHeartbeatInterval { get; }
public TimeSpan? ClientHeartbeatTimeout { get; }
public Action<ReadOnlyMemory<byte>>? OnServerHeartbeatReceived { get; }
public Action<ServerHeartbeatEvent>? OnServerHeartbeatReceived { get; }
public Action<ClientHeartbeatEvent>? OnClientHeartbeatResponseReceived { get; }
#if NET8_0_OR_GREATER
public TimeProvider? TimeProvider { get; }
Expand All @@ -38,9 38,9 @@ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOn
}

#if NET8_0_OR_GREATER
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived,TimeProvider? timeProvider)
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ServerHeartbeatEvent>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived,TimeProvider? timeProvider)
#else
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived)
public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action<ServerHeartbeatEvent>? onServerHeartbeatReceived, Action<ClientHeartbeatEvent>? onClientHeartbeatResponseReceived)
#endif
{
Host = host;
Expand Down Expand Up @@ -120,7 120,7 @@ public StreamingHubClientOptions WithClientHeartbeatTimeout(TimeSpan? timeout)
/// </summary>
/// <param name="onServerHeartbeatReceived"></param>
/// <returns></returns>
public StreamingHubClientOptions WithServerHeartbeatReceived(Action<ReadOnlyMemory<byte>>? onServerHeartbeatReceived)
public StreamingHubClientOptions WithServerHeartbeatReceived(Action<ServerHeartbeatEvent>? onServerHeartbeatReceived)
=> new(Host, CallOptions, SerializerProvider, Logger
, ClientHeartbeatInterval, ClientHeartbeatTimeout, onServerHeartbeatReceived, OnClientHeartbeatResponseReceived
#if NET8_0_OR_GREATER
Expand Down
Loading

0 comments on commit a9327ef

Please sign in to comment.