Skip to content

Commit

Permalink
Fix code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Jan 11, 2024
1 parent ef13557 commit df1d749
Showing 1 changed file with 76 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 26,40 @@ public sealed partial class GrpcChannelx : ChannelBase, IMagicOnionAwareGrpcChan
, IGrpcChannelxDiagnosticsInfo
#endif
{
private readonly Action<GrpcChannelx> _onDispose;
private readonly Dictionary<IStreamingHubMarker, (Func<Task> DisposeAsync, ManagedStreamingHubInfo StreamingHubInfo)> _streamingHubs = new Dictionary<IStreamingHubMarker, (Func<Task>, ManagedStreamingHubInfo)>();
private readonly ChannelBase _channel;
readonly Action<GrpcChannelx> onDispose;
readonly Dictionary<IStreamingHubMarker, (Func<Task> DisposeAsync, ManagedStreamingHubInfo StreamingHubInfo)> streamingHubs = new Dictionary<IStreamingHubMarker, (Func<Task>, ManagedStreamingHubInfo)>();
readonly ChannelBase channel;

private bool _disposed;
private bool _shutdownRequested;
bool disposed;
bool shutdownRequested;

public Uri TargetUri { get; }
public int Id { get; }


#if UNITY_EDITOR || MAGICONION_ENABLE_CHANNEL_DIAGNOSTICS
private readonly string _stackTrace;
private readonly ChannelStats _channelStats;
private readonly GrpcChannelOptionsBag _channelOptions;

string IGrpcChannelxDiagnosticsInfo.StackTrace => _stackTrace;
ChannelStats IGrpcChannelxDiagnosticsInfo.Stats => _channelStats;
GrpcChannelOptionsBag IGrpcChannelxDiagnosticsInfo.ChannelOptions => _channelOptions;
ChannelBase IGrpcChannelxDiagnosticsInfo.UnderlyingChannel => _channel;
readonly string stackTrace;
readonly ChannelStats channelStats;
readonly GrpcChannelOptionsBag channelOptions;

string IGrpcChannelxDiagnosticsInfo.StackTrace => stackTrace;
ChannelStats IGrpcChannelxDiagnosticsInfo.Stats => channelStats;
GrpcChannelOptionsBag IGrpcChannelxDiagnosticsInfo.ChannelOptions => channelOptions;
ChannelBase IGrpcChannelxDiagnosticsInfo.UnderlyingChannel => channel;
#endif

public GrpcChannelx(int id, Action<GrpcChannelx> onDispose, ChannelBase channel, Uri targetUri, GrpcChannelOptionsBag channelOptions)
: base(targetUri.ToString())
{
Id = id;
TargetUri = targetUri;
_onDispose = onDispose;
_channel = channel;
_disposed = false;
this.onDispose = onDispose;
this.channel = channel;
this.disposed = false;

#if UNITY_EDITOR || MAGICONION_ENABLE_CHANNEL_DIAGNOSTICS
_stackTrace = new System.Diagnostics.StackTrace().ToString();
_channelStats = new ChannelStats();
_channelOptions = channelOptions;
this.stackTrace = new System.Diagnostics.StackTrace().ToString();
this.channelStats = new ChannelStats();
this.channelOptions = channelOptions;
#endif
}

Expand Down Expand Up @@ -116,7 115,7 @@ public override CallInvoker CreateCallInvoker()
{
ThrowIfDisposed();
#if UNITY_EDITOR || MAGICONION_ENABLE_CHANNEL_DIAGNOSTICS
return new ChannelStats.WrappedCallInvoker(((IGrpcChannelxDiagnosticsInfo)this).Stats, _channel.CreateCallInvoker());
return new ChannelStats.WrappedCallInvoker(((IGrpcChannelxDiagnosticsInfo)this).Stats, channel.CreateCallInvoker());
#else
return _channel.CreateCallInvoker();
#endif
Expand Down Expand Up @@ -149,18 148,18 @@ public async Task ConnectAsync(DateTime? deadline = null)
/// <inheritdoc />
IReadOnlyCollection<ManagedStreamingHubInfo> IMagicOnionAwareGrpcChannel.GetAllManagedStreamingHubs()
{
lock (_streamingHubs)
lock (streamingHubs)
{
return _streamingHubs.Values.Select(x => x.StreamingHubInfo).ToArray();
return streamingHubs.Values.Select(x => x.StreamingHubInfo).ToArray();
}
}

/// <inheritdoc />
void IMagicOnionAwareGrpcChannel.ManageStreamingHubClient(Type streamingHubType, IStreamingHubMarker streamingHub, Func<Task> disposeAsync, Task waitForDisconnect)
{
lock (_streamingHubs)
lock (streamingHubs)
{
_streamingHubs.Add(streamingHub, (disposeAsync, new ManagedStreamingHubInfo(streamingHubType, streamingHub)));
streamingHubs.Add(streamingHub, (disposeAsync, new ManagedStreamingHubInfo(streamingHubType, streamingHub)));

// When the channel is disconnected, unregister it.
Forget(WaitForDisconnectAndDisposeAsync(streamingHub, waitForDisconnect));
Expand All @@ -175,9 174,9 @@ private async Task WaitForDisconnectAndDisposeAsync(IStreamingHubMarker streamin

private void DisposeStreamingHubClient(IStreamingHubMarker streamingHub)
{
lock (_streamingHubs)
lock (streamingHubs)
{
if (_streamingHubs.TryGetValue(streamingHub, out var disposeAsyncAndStreamingHubInfo))
if (streamingHubs.TryGetValue(streamingHub, out var disposeAsyncAndStreamingHubInfo))
{
try
{
Expand All @@ -188,7 187,7 @@ private void DisposeStreamingHubClient(IStreamingHubMarker streamingHub)
Debug.LogException(e);
}

_streamingHubs.Remove(streamingHub);
streamingHubs.Remove(streamingHub);
}
}

Expand All @@ -207,9 206,9 @@ async void Forget(Task t)

private void DisposeAllManagedStreamingHubs()
{
lock (_streamingHubs)
lock (streamingHubs)
{
foreach (var streamingHub in _streamingHubs.Keys.ToArray() /* Snapshot */)
foreach (var streamingHub in streamingHubs.Keys.ToArray() /* Snapshot */)
{
DisposeStreamingHubClient(streamingHub);
}
Expand All @@ -218,47 217,47 @@ private void DisposeAllManagedStreamingHubs()

public void Dispose()
{
if (_disposed) return;
if (disposed) return;

_disposed = true;
disposed = true;
try
{
DisposeAllManagedStreamingHubs();
Forget(ShutdownInternalAsync());
}
finally
{
_onDispose(this);
onDispose(this);
}
}

public async Task DisposeAsync()
{
if (_disposed) return;
if (disposed) return;

_disposed = true;
disposed = true;
try
{
DisposeAllManagedStreamingHubs();
await ShutdownInternalAsync();
}
finally
{
_onDispose(this);
onDispose(this);
}
}

private async Task ShutdownInternalAsync()
{
if (_shutdownRequested) return;
_shutdownRequested = true;
await _channel.ShutdownAsync().ConfigureAwait(false);
if (shutdownRequested) return;
shutdownRequested = true;

await channel.ShutdownAsync().ConfigureAwait(false);
}

private void ThrowIfDisposed()
{
if (_disposed) throw new ObjectDisposedException(nameof(GrpcChannelx));
if (disposed) throw new ObjectDisposedException(nameof(GrpcChannelx));
}

private static async void Forget(Task t)
Expand All @@ -276,47 275,47 @@ private static async void Forget(Task t)
#if UNITY_EDITOR || MAGICONION_ENABLE_CHANNEL_DIAGNOSTICS
public class ChannelStats
{
private int _sentBytes = 0;
private int _receivedBytes = 0;
int sentBytes = 0;
int receivedBytes = 0;

private int _indexSentBytes;
private int _indexReceivedBytes;
private DateTime _prevSentBytesAt;
private DateTime _prevReceivedBytesAt;
private readonly int[] _sentBytesHistory = new int[10];
private readonly int[] _receivedBytesHistory = new int[10];
int indexSentBytes;
int indexReceivedBytes;
DateTime prevSentBytesAt;
DateTime prevReceivedBytesAt;
readonly int[] sentBytesHistory = new int[10];
readonly int[] receivedBytesHistory = new int[10];

public int SentBytes => _sentBytes;
public int ReceivedBytes => _receivedBytes;
public int SentBytes => sentBytes;
public int ReceivedBytes => receivedBytes;

public int SentBytesPerSecond
{
get
{
AddValue(ref _prevSentBytesAt, ref _indexSentBytes, _sentBytesHistory, DateTime.Now, 0);
return _sentBytesHistory.Sum();
AddValue(ref prevSentBytesAt, ref indexSentBytes, sentBytesHistory, DateTime.Now, 0);
return sentBytesHistory.Sum();
}
}

public int ReceiveBytesPerSecond
{
get
{
AddValue(ref _prevReceivedBytesAt, ref _indexReceivedBytes, _receivedBytesHistory, DateTime.Now, 0);
return _receivedBytesHistory.Sum();
AddValue(ref prevReceivedBytesAt, ref indexReceivedBytes, receivedBytesHistory, DateTime.Now, 0);
return receivedBytesHistory.Sum();
}
}

internal void AddSentBytes(int bytesLength)
{
Interlocked.Add(ref _sentBytes, bytesLength);
AddValue(ref _prevSentBytesAt, ref _indexSentBytes, _sentBytesHistory, DateTime.Now, bytesLength);
Interlocked.Add(ref sentBytes, bytesLength);
AddValue(ref prevSentBytesAt, ref indexSentBytes, sentBytesHistory, DateTime.Now, bytesLength);
}

internal void AddReceivedBytes(int bytesLength)
{
Interlocked.Add(ref _receivedBytes, bytesLength);
AddValue(ref _prevReceivedBytesAt, ref _indexReceivedBytes, _receivedBytesHistory, DateTime.Now, bytesLength);
Interlocked.Add(ref receivedBytes, bytesLength);
AddValue(ref prevReceivedBytesAt, ref indexReceivedBytes, receivedBytesHistory, DateTime.Now, bytesLength);
}

private void AddValue(ref DateTime prev, ref int index, int[] values, DateTime d, int value)
Expand Down Expand Up @@ -347,44 346,44 @@ private void AddValue(ref DateTime prev, ref int index, int[] values, DateTime d

internal class WrappedCallInvoker : CallInvoker
{
private readonly CallInvoker _baseCallInvoker;
private readonly ChannelStats _channelStats;
readonly CallInvoker baseCallInvoker;
readonly ChannelStats channelStats;


public WrappedCallInvoker(ChannelStats channelStats, CallInvoker callInvoker)
{
_channelStats = channelStats;
_baseCallInvoker = callInvoker;
this.channelStats = channelStats;
this.baseCallInvoker = callInvoker;
}

public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
//Debug.Log($"Unary(Blocking): {method.FullName}");
return _baseCallInvoker.BlockingUnaryCall(WrapMethod(method), host, options, request);
return baseCallInvoker.BlockingUnaryCall(WrapMethod(method), host, options, request);
}

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
//Debug.Log($"Unary: {method.FullName}");
return _baseCallInvoker.AsyncUnaryCall(WrapMethod(method), host, options, request);
return baseCallInvoker.AsyncUnaryCall(WrapMethod(method), host, options, request);
}

public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
//Debug.Log($"ServerStreaming: {method.FullName}");
return _baseCallInvoker.AsyncServerStreamingCall(WrapMethod(method), host, options, request);
return baseCallInvoker.AsyncServerStreamingCall(WrapMethod(method), host, options, request);
}

public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
{
//Debug.Log($"ClientStreaming: {method.FullName}");
return _baseCallInvoker.AsyncClientStreamingCall(WrapMethod(method), host, options);
return baseCallInvoker.AsyncClientStreamingCall(WrapMethod(method), host, options);
}

public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
{
//Debug.Log($"DuplexStreaming: {method.FullName}");
return _baseCallInvoker.AsyncDuplexStreamingCall(WrapMethod(method), host, options);
return baseCallInvoker.AsyncDuplexStreamingCall(WrapMethod(method), host, options);
}

private Method<TRequest, TResponse> WrapMethod<TRequest, TResponse>(Method<TRequest, TResponse> method)
Expand All @@ -397,11 396,11 @@ private Method<TRequest, TResponse> WrapMethod<TRequest, TResponse>(Method<TRequ
{
var wrapper = new SerializationContextWrapper(context);
method.RequestMarshaller.ContextualSerializer(request, context);
_channelStats.AddSentBytes(wrapper.Written);
channelStats.AddSentBytes(wrapper.Written);
}, (context) => method.RequestMarshaller.ContextualDeserializer(context)),
new Marshaller<TResponse>((request, context) => method.ResponseMarshaller.ContextualSerializer(request, context), x =>
{
_channelStats.AddReceivedBytes(x.PayloadLength);
channelStats.AddReceivedBytes(x.PayloadLength);
return method.ResponseMarshaller.ContextualDeserializer(x);
})
);
Expand All @@ -412,31 411,31 @@ private Method<TRequest, TResponse> WrapMethod<TRequest, TResponse>(Method<TRequ

private class SerializationContextWrapper : SerializationContext, IBufferWriter<byte>
{
private readonly SerializationContext _inner;
private IBufferWriter<byte>? _bufferWriter;
readonly SerializationContext inner;
IBufferWriter<byte>? bufferWriter;
public int Written { get; private set; }

public SerializationContextWrapper(SerializationContext inner)
{
_inner = inner;
this.inner = inner;
}

public override IBufferWriter<byte> GetBufferWriter()
=> _bufferWriter ?? (_bufferWriter = _inner.GetBufferWriter());
=> bufferWriter ?? (bufferWriter = inner.GetBufferWriter());

public override void Complete(byte[] payload)
{
Written = payload.Length;
_inner.Complete(payload);
inner.Complete(payload);
}

public override void Complete()
=> _inner.Complete();
=> inner.Complete();

public override void SetPayloadLength(int payloadLength)
{
Written = payloadLength;
_inner.SetPayloadLength(payloadLength);
inner.SetPayloadLength(payloadLength);
}

public void Advance(int count)
Expand Down

0 comments on commit df1d749

Please sign in to comment.