From cc1a969788c6c0485d9e45f466fb552fd9d6acf9 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 21 Jun 2024 13:58:23 -0700 Subject: [PATCH 1/4] Make `BasicProperties` a class Follow-up to #1607 As suggested by @bollhals --- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 2 +- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 1 + .../client/api/BasicProperties.cs | 44 ++++++++++--------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 65fd1f869e..73cb3d6d2a 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -122,7 +122,7 @@ RabbitMQ.Client.BasicProperties RabbitMQ.Client.BasicProperties.AppId.get -> string RabbitMQ.Client.BasicProperties.AppId.set -> void RabbitMQ.Client.BasicProperties.BasicProperties() -> void -RabbitMQ.Client.BasicProperties.BasicProperties(in RabbitMQ.Client.ReadOnlyBasicProperties input) -> void +RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties input) -> void RabbitMQ.Client.BasicProperties.ClearAppId() -> void RabbitMQ.Client.BasicProperties.ClearClusterId() -> void RabbitMQ.Client.BasicProperties.ClearContentEncoding() -> void diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb2..6f9bcba8f2 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1 @@ +RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void \ No newline at end of file diff --git a/projects/RabbitMQ.Client/client/api/BasicProperties.cs b/projects/RabbitMQ.Client/client/api/BasicProperties.cs index f93128ed8c..eb95a18a95 100644 --- a/projects/RabbitMQ.Client/client/api/BasicProperties.cs +++ b/projects/RabbitMQ.Client/client/api/BasicProperties.cs @@ -40,7 +40,7 @@ namespace RabbitMQ.Client /// /// AMQP specification content header properties for content class "basic". /// - public struct BasicProperties : IBasicProperties, IAmqpHeader + public sealed class BasicProperties : IBasicProperties, IAmqpHeader { public string? ContentType { get; set; } public string? ContentEncoding { get; set; } @@ -59,7 +59,7 @@ public struct BasicProperties : IBasicProperties, IAmqpHeader public bool Persistent { - readonly get + get { return DeliveryMode == DeliveryModes.Persistent; } @@ -72,7 +72,7 @@ readonly get public PublicationAddress? ReplyToAddress { - readonly get + get { PublicationAddress.TryParse(ReplyTo, out PublicationAddress result); return result; @@ -81,7 +81,11 @@ readonly get set { ReplyTo = value?.ToString(); } } - public BasicProperties(in ReadOnlyBasicProperties input) + public BasicProperties() + { + } + + public BasicProperties(ReadOnlyBasicProperties input) { ContentType = input.ContentType; ContentEncoding = input.ContentEncoding; @@ -114,20 +118,20 @@ public BasicProperties(in ReadOnlyBasicProperties input) public void ClearAppId() => AppId = default; public void ClearClusterId() => ClusterId = default; - public readonly bool IsContentTypePresent() => ContentType != default; - public readonly bool IsContentEncodingPresent() => ContentEncoding != default; - public readonly bool IsHeadersPresent() => Headers != default; - public readonly bool IsDeliveryModePresent() => DeliveryMode != default; - public readonly bool IsPriorityPresent() => Priority != default; - public readonly bool IsCorrelationIdPresent() => CorrelationId != default; - public readonly bool IsReplyToPresent() => ReplyTo != default; - public readonly bool IsExpirationPresent() => Expiration != default; - public readonly bool IsMessageIdPresent() => MessageId != default; - public readonly bool IsTimestampPresent() => Timestamp != default; - public readonly bool IsTypePresent() => Type != default; - public readonly bool IsUserIdPresent() => UserId != default; - public readonly bool IsAppIdPresent() => AppId != default; - public readonly bool IsClusterIdPresent() => ClusterId != default; + public bool IsContentTypePresent() => ContentType != default; + public bool IsContentEncodingPresent() => ContentEncoding != default; + public bool IsHeadersPresent() => Headers != default; + public bool IsDeliveryModePresent() => DeliveryMode != default; + public bool IsPriorityPresent() => Priority != default; + public bool IsCorrelationIdPresent() => CorrelationId != default; + public bool IsReplyToPresent() => ReplyTo != default; + public bool IsExpirationPresent() => Expiration != default; + public bool IsMessageIdPresent() => MessageId != default; + public bool IsTimestampPresent() => Timestamp != default; + public bool IsTypePresent() => Type != default; + public bool IsUserIdPresent() => UserId != default; + public bool IsAppIdPresent() => AppId != default; + public bool IsClusterIdPresent() => ClusterId != default; ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic; @@ -153,7 +157,7 @@ public BasicProperties(in ReadOnlyBasicProperties input) internal const byte AppIdBit = 3; internal const byte ClusterIdBit = 2; - readonly int IAmqpWriteable.WriteTo(Span span) + int IAmqpWriteable.WriteTo(Span span) { int offset = 2; ref byte bitValue = ref span.GetStart(); @@ -247,7 +251,7 @@ readonly int IAmqpWriteable.WriteTo(Span span) return offset; } - readonly int IAmqpWriteable.GetRequiredBufferSize() + int IAmqpWriteable.GetRequiredBufferSize() { int bufferSize = 2; // number of presence fields (14) in 2 bytes blocks if (IsContentTypePresent()) { bufferSize += 1 + WireFormatting.GetByteCount(ContentType); } // _contentType in bytes From 004ce32301615417f01f81277dbb0665b891af67 Mon Sep 17 00:00:00 2001 From: bollhals Date: Mon, 24 Jun 2024 01:10:59 +0200 Subject: [PATCH 2/4] * Change InboundFrame to a class * Improve `TakeoverPayload` * Use `RentedMemory` and define more constants * Use explicit types instead of `var` --- .../client/api/ReadonlyBasicProperties.cs | 5 + .../client/impl/CommandAssembler.cs | 77 +++++------- .../client/impl/Connection.Receive.cs | 20 ++- projects/RabbitMQ.Client/client/impl/Frame.cs | 116 +++++++++--------- .../client/impl/IFrameHandler.cs | 4 +- .../RabbitMQ.Client/client/impl/ISession.cs | 2 +- .../client/impl/MainSession.cs | 4 +- .../RabbitMQ.Client/client/impl/Session.cs | 11 +- .../client/impl/SessionBase.cs | 2 +- .../client/impl/SocketFrameHandler.cs | 8 +- 10 files changed, 120 insertions(+), 129 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs b/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs index 0b07a5e91c..98010a3184 100644 --- a/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs +++ b/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs @@ -84,6 +84,11 @@ public PublicationAddress? ReplyToAddress public ReadOnlyBasicProperties(ReadOnlySpan span) { + if (span.IsEmpty) + { + return; + } + int offset = 2; ref readonly byte bits = ref span[0]; if (bits.IsBitSet(BasicProperties.ContentTypeBit)) { offset += WireFormatting.ReadShortstr(span.Slice(offset), out _contentType); } diff --git a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs index 76f4d7efde..5c09bf5319 100644 --- a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs +++ b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs @@ -45,12 +45,9 @@ internal sealed class CommandAssembler private const int MaxArrayOfBytesSize = 2_147_483_591; private ProtocolCommandId _commandId; - private ReadOnlyMemory _methodMemory; - private byte[]? _rentedMethodArray; - private ReadOnlyMemory _headerMemory; - private byte[]? _rentedHeaderArray; - private ReadOnlyMemory _bodyMemory; - private byte[]? _rentedBodyArray; + private RentedMemory _methodMemory; + private RentedMemory _headerMemory; + private RentedMemory _bodyMemory; private int _remainingBodyByteCount; private int _offset; private AssemblyState _state; @@ -66,61 +63,49 @@ public CommandAssembler(uint maxBodyLength) private void Reset() { _commandId = default; - _methodMemory = ReadOnlyMemory.Empty; - _rentedMethodArray = null; - _headerMemory = ReadOnlyMemory.Empty; - _rentedHeaderArray = null; - _bodyMemory = ReadOnlyMemory.Empty; - _rentedBodyArray = null; + _methodMemory = default; + _headerMemory = default; + _bodyMemory = default; _remainingBodyByteCount = 0; _offset = 0; _state = AssemblyState.ExpectingMethod; } - public bool HandleFrame(in InboundFrame frame, out IncomingCommand command) + public void HandleFrame(InboundFrame frame, out IncomingCommand command) { - bool shallReturn = true; switch (_state) { case AssemblyState.ExpectingMethod: - ParseMethodFrame(in frame); - shallReturn = false; + ParseMethodFrame(frame); break; case AssemblyState.ExpectingContentHeader: - shallReturn = ParseHeaderFrame(in frame); + ParseHeaderFrame(frame); break; case AssemblyState.ExpectingContentBody: - shallReturn = ParseBodyFrame(in frame); + ParseBodyFrame(frame); break; } if (_state != AssemblyState.Complete) { command = IncomingCommand.Empty; - return shallReturn; + return; } RabbitMqClientEventSource.Log.CommandReceived(); - - var method = new RentedMemory(_methodMemory, _rentedMethodArray); - var header = new RentedMemory(_headerMemory, _rentedHeaderArray); - var body = new RentedMemory(_bodyMemory, _rentedBodyArray); - - command = new IncomingCommand(_commandId, method, header, body); + command = new IncomingCommand(_commandId, _methodMemory, _headerMemory, _bodyMemory); Reset(); - return shallReturn; } - private void ParseMethodFrame(in InboundFrame frame) + private void ParseMethodFrame(InboundFrame frame) { if (frame.Type != FrameType.FrameMethod) { throw new UnexpectedFrameException(frame.Type); } - _rentedMethodArray = frame.TakeoverPayload(); _commandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span); - _methodMemory = frame.Payload.Slice(4); + _methodMemory = frame.TakeoverPayload(Framing.Method.ArgumentsOffset); switch (_commandId) { @@ -136,7 +121,7 @@ private void ParseMethodFrame(in InboundFrame frame) } } - private bool ParseHeaderFrame(in InboundFrame frame) + private void ParseHeaderFrame(InboundFrame frame) { if (frame.Type != FrameType.FrameHeader) { @@ -150,7 +135,7 @@ private bool ParseHeaderFrame(in InboundFrame frame) throw new UnknownClassOrMethodException(classId, 0); } - ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4)); + ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(Framing.Header.BodyLengthOffset)); if (totalBodyBytes > MaxArrayOfBytesSize) { throw new UnexpectedFrameException(frame.Type); @@ -162,16 +147,21 @@ private bool ParseHeaderFrame(in InboundFrame frame) throw new MalformedFrameException(message: msg, canShutdownCleanly: false); } - _rentedHeaderArray = totalBodyBytes != 0 ? frame.TakeoverPayload() : Array.Empty(); - - _headerMemory = frame.Payload.Slice(12); + // There are always at least 2 bytes, even for empty ones + if (frame.Payload.Length <= Framing.Header.HeaderArgumentOffset + 2) + { + frame.TryReturnPayload(); + } + else + { + _headerMemory = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset); + } _remainingBodyByteCount = (int)totalBodyBytes; UpdateContentBodyState(); - return _rentedHeaderArray.Length == 0; } - private bool ParseBodyFrame(in InboundFrame frame) + private void ParseBodyFrame(InboundFrame frame) { if (frame.Type != FrameType.FrameBody) { @@ -184,27 +174,26 @@ private bool ParseBodyFrame(in InboundFrame frame) throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received"); } - if (_rentedBodyArray is null) + if (_bodyMemory.RentedArray is null) { // check for single frame payload for an early exit if (payloadLength == _remainingBodyByteCount) { - _rentedBodyArray = frame.TakeoverPayload(); - _bodyMemory = frame.Payload; + _bodyMemory = frame.TakeoverPayload(0); _state = AssemblyState.Complete; - return false; + return; } // Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame - _rentedBodyArray = ArrayPool.Shared.Rent(_remainingBodyByteCount); - _bodyMemory = new ReadOnlyMemory(_rentedBodyArray, 0, _remainingBodyByteCount); + var rentedBodyArray = ArrayPool.Shared.Rent(_remainingBodyByteCount); + _bodyMemory = new RentedMemory(new ReadOnlyMemory(rentedBodyArray, 0, _remainingBodyByteCount), rentedBodyArray); } - frame.Payload.Span.CopyTo(_rentedBodyArray.AsSpan(_offset)); + frame.Payload.Span.CopyTo(_bodyMemory.RentedArray.AsSpan(_offset)); + frame.TryReturnPayload(); _remainingBodyByteCount -= payloadLength; _offset += payloadLength; UpdateContentBodyState(); - return true; } private void UpdateContentBodyState() diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 3789208fd1..9fd75db08f 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -123,11 +123,13 @@ await FinishCloseAsync(cts.Token) private async Task ReceiveLoopAsync(CancellationToken mainLoopCancellationToken) { + InboundFrame frame = new InboundFrame(); + while (false == _closed) { mainLoopCancellationToken.ThrowIfCancellationRequested(); - while (_frameHandler.TryReadFrame(out InboundFrame frame)) + while (_frameHandler.TryReadFrame(frame)) { NotifyHeartbeatListener(); await ProcessFrameAsync(frame, mainLoopCancellationToken) @@ -135,17 +137,16 @@ await ProcessFrameAsync(frame, mainLoopCancellationToken) } // Done reading frames synchronously, go async - InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancellationToken) + await _frameHandler.ReadFrameAsync(frame, mainLoopCancellationToken) .ConfigureAwait(false); NotifyHeartbeatListener(); - await ProcessFrameAsync(asyncFrame, mainLoopCancellationToken) - .ConfigureAwait(false); + await ProcessFrameAsync(frame, mainLoopCancellationToken) + .ConfigureAwait(false); } } private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { - bool shallReturnPayload = true; if (frame.Channel == 0) { if (frame.Type == FrameType.FrameHeartbeat) @@ -164,7 +165,7 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance // quiescing situation, even though technically we // should be ignoring everything except // connection.close-ok. - shallReturnPayload = await _session0.HandleFrameAsync(frame, cancellationToken) + await _session0.HandleFrameAsync(frame, cancellationToken) .ConfigureAwait(false); } } @@ -182,15 +183,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance // Session itself may be quiescing this particular // channel, but that's none of our concern.) ISession session = _sessionManager.Lookup(frame.Channel); - shallReturnPayload = await session.HandleFrameAsync(frame, cancellationToken) + await session.HandleFrameAsync(frame, cancellationToken) .ConfigureAwait(false); } } - if (shallReturnPayload) - { - frame.ReturnPayload(); - } + frame.TryReturnPayload(); } /// diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index d9695fd1ac..9ed3e3bded 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -36,7 +36,6 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; - using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Logging; @@ -57,21 +56,22 @@ internal static class Framing internal static class Method { /* +----------+-----------+-----------+ - * | CommandId (combined) | Arguments | + * | CommandId (combined) | Arguments | * | Class Id | Method Id | | * +----------+-----------+-----------+ * | 4 bytes (combined) | x bytes | * | 2 bytes | 2 bytes | | * +----------+-----------+-----------+ */ public const int FrameSize = BaseFrameSize + 2 + 2; + public const int ArgumentsOffset = 4; [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int WriteTo(Span span, ushort channel, ref T method) where T : struct, IOutgoingAmqpMethod { const int StartClassId = StartPayload; - const int StartMethodArguments = StartClassId + 4; + const int StartMethodArguments = StartPayload + ArgumentsOffset; - int payloadLength = method.WriteTo(span.Slice(StartMethodArguments)) + 4; + int payloadLength = ArgumentsOffset + method.WriteTo(span.Slice(StartMethodArguments)); NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameMethod << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8)); NetworkOrderSerializer.WriteUInt32(ref span.GetOffset(StartClassId), (uint)method.ProtocolCommandId); span[payloadLength + StartPayload] = Constants.FrameEnd; @@ -87,15 +87,17 @@ internal static class Header * | 2 bytes | 2 bytes | 8 bytes | x bytes | * +----------+----------+-------------------+-----------+ */ public const int FrameSize = BaseFrameSize + 2 + 2 + 8; + public const int BodyLengthOffset = 4; + public const int HeaderArgumentOffset = 12; [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int WriteTo(Span span, ushort channel, ref T header, int bodyLength) where T : IAmqpHeader { const int StartClassId = StartPayload; - const int StartBodyLength = StartPayload + 4; - const int StartHeaderArguments = StartPayload + 12; + const int StartBodyLength = StartPayload + BodyLengthOffset; + const int StartHeaderArguments = StartPayload + HeaderArgumentOffset; - int payloadLength = 12 + header.WriteTo(span.Slice(StartHeaderArguments)); + int payloadLength = HeaderArgumentOffset + header.WriteTo(span.Slice(StartHeaderArguments)); NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameHeader << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8)); NetworkOrderSerializer.WriteUInt32(ref span.GetOffset(StartClassId), (uint)header.ProtocolClassId << 16); // The last 16 bytes (Weight) aren't used NetworkOrderSerializer.WriteUInt64(ref span.GetOffset(StartBodyLength), (ulong)bodyLength); @@ -205,20 +207,13 @@ private static int GetBodyFrameCount(int maxPayloadBytes, int length) } } - internal readonly struct InboundFrame +#nullable enable + internal sealed class InboundFrame { - public readonly FrameType Type; - public readonly int Channel; - public readonly ReadOnlyMemory Payload; - private readonly byte[] _rentedArray; - - private InboundFrame(FrameType type, int channel, ReadOnlyMemory payload, byte[] rentedArray) - { - Type = type; - Channel = channel; - Payload = payload; - _rentedArray = rentedArray; - } + public FrameType Type { get; private set; } + public int Channel { get; private set; } + public ReadOnlyMemory Payload { get; private set; } + private byte[]? _rentedArray; private static void ProcessProtocolHeader(ReadOnlySequence buffer) { @@ -254,26 +249,23 @@ private static void ProcessProtocolHeader(ReadOnlySequence buffer) } } - internal static async ValueTask ReadFromPipeAsync(PipeReader reader, - uint maxInboundMessageBodySize, - CancellationToken mainLoopCancellationToken) + internal static async ValueTask ReadFromPipeAsync(PipeReader reader, + uint maxInboundMessageBodySize, InboundFrame frame, CancellationToken mainLoopCancellationToken) { - ReadResult result = await reader.ReadAsync(mainLoopCancellationToken) - .ConfigureAwait(false); + ReadResult result = await reader.ReadAsync(mainLoopCancellationToken).ConfigureAwait(false); ReadOnlySequence buffer = result.Buffer; MaybeThrowEndOfStream(result, buffer); - InboundFrame frame; // Loop until we have enough data to read an entire frame, or until the pipe is completed. - while (!TryReadFrame(ref buffer, maxInboundMessageBodySize, out frame)) + while (!TryReadFrame(ref buffer, maxInboundMessageBodySize, frame)) { reader.AdvanceTo(buffer.Start, buffer.End); // Not enough data, read a bit more result = await reader.ReadAsync(mainLoopCancellationToken) - .ConfigureAwait(false); + .ConfigureAwait(false); MaybeThrowEndOfStream(result, buffer); @@ -281,11 +273,10 @@ internal static async ValueTask ReadFromPipeAsync(PipeReader reade } reader.AdvanceTo(buffer.Start); - return frame; } internal static bool TryReadFrameFromPipe(PipeReader reader, - uint maxInboundMessageBodySize, out InboundFrame frame) + uint maxInboundMessageBodySize, InboundFrame frame) { if (reader.TryRead(out ReadResult result)) { @@ -293,7 +284,7 @@ internal static bool TryReadFrameFromPipe(PipeReader reader, MaybeThrowEndOfStream(result, buffer); - if (TryReadFrame(ref buffer, maxInboundMessageBodySize, out frame)) + if (TryReadFrame(ref buffer, maxInboundMessageBodySize, frame)) { reader.AdvanceTo(buffer.Start); return true; @@ -304,16 +295,14 @@ internal static bool TryReadFrameFromPipe(PipeReader reader, } // Failed to synchronously read sufficient data from the pipe. We'll need to go async. - frame = default; return false; } internal static bool TryReadFrame(ref ReadOnlySequence buffer, - uint maxInboundMessageBodySize, out InboundFrame frame) + uint maxInboundMessageBodySize, InboundFrame frame) { if (buffer.Length < 7) { - frame = default; return false; } @@ -332,8 +321,8 @@ internal static bool TryReadFrame(ref ReadOnlySequence buffer, ProcessProtocolHeader(buffer); } - FrameType type = (FrameType)firstByte; - int channel = NetworkOrderDeserializer.ReadUInt16(buffer.Slice(1)); + frame.Type = (FrameType)firstByte; + frame.Channel = NetworkOrderDeserializer.ReadUInt16(buffer.Slice(1)); int payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.Slice(3)); if ((maxInboundMessageBodySize > 0) && (payloadSize > maxInboundMessageBodySize)) { @@ -345,40 +334,51 @@ internal static bool TryReadFrame(ref ReadOnlySequence buffer, // Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration int readSize = payloadSize + EndMarkerLength; - if ((buffer.Length - 7) < readSize) + if (buffer.Length - 7 < readSize) { - frame = default; return false; } - else - { - byte[] payloadBytes = ArrayPool.Shared.Rent(readSize); - ReadOnlySequence framePayload = buffer.Slice(7, readSize); - framePayload.CopyTo(payloadBytes); - if (payloadBytes[payloadSize] != Constants.FrameEnd) - { - byte frameEndMarker = payloadBytes[payloadSize]; - ArrayPool.Shared.Return(payloadBytes); - throw new MalformedFrameException($"Bad frame end marker: {frameEndMarker}"); - } + byte[] payloadBytes = ArrayPool.Shared.Rent(readSize); + ReadOnlySequence framePayload = buffer.Slice(7, readSize); + framePayload.CopyTo(payloadBytes); - RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize); - frame = new InboundFrame(type, channel, new ReadOnlyMemory(payloadBytes, 0, payloadSize), payloadBytes); - // Advance the buffer - buffer = buffer.Slice(7 + readSize); - return true; + if (payloadBytes[payloadSize] != Constants.FrameEnd) + { + byte frameEndMarker = payloadBytes[payloadSize]; + ArrayPool.Shared.Return(payloadBytes); + throw new MalformedFrameException($"Bad frame end marker: {frameEndMarker}"); } + + RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize); + frame._rentedArray = payloadBytes; + frame.Payload = new ReadOnlyMemory(payloadBytes, 0, payloadSize); + + // Advance the buffer + buffer = buffer.Slice(7 + readSize); + return true; } - public byte[] TakeoverPayload() + public RentedMemory TakeoverPayload(int sliceOffset) { - return _rentedArray; + byte[]? array = _rentedArray ?? throw new InvalidOperationException("Payload was already taken over or returned."); + ReadOnlyMemory payload = Payload.Slice(sliceOffset); + Payload = ReadOnlyMemory.Empty; + _rentedArray = null; + return new RentedMemory(payload, array); } - public void ReturnPayload() + public void TryReturnPayload() { - ArrayPool.Shared.Return(_rentedArray); + byte[]? array = _rentedArray; + if (array is null) + { + return; + } + + ArrayPool.Shared.Return(array); + Payload = ReadOnlyMemory.Empty; + _rentedArray = null; } public override string ToString() diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs index ed59a9a286..0f2b07ec48 100644 --- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs @@ -61,12 +61,12 @@ internal interface IFrameHandler ///Read a frame from the underlying ///transport. Returns null if the read operation timed out ///(see Timeout property). - ValueTask ReadFrameAsync(CancellationToken cancellationToken); + ValueTask ReadFrameAsync(InboundFrame frame, CancellationToken cancellationToken); ///Try to synchronously read a frame from the underlying transport. ///Returns false if connection buffer contains insufficient data. /// - bool TryReadFrame(out InboundFrame frame); + bool TryReadFrame(InboundFrame frame); Task SendProtocolHeaderAsync(CancellationToken cancellationToken); diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index bf715e756f..ce29e81b63 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -74,7 +74,7 @@ internal interface ISession void Close(ShutdownEventArgs reason, bool notify); - Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); + Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); void Notify(); diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index 2e01349a66..edb82ec773 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -54,7 +54,7 @@ public MainSession(Connection connection, uint maxBodyLength) { } - public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) + public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { if (_closing) { @@ -76,7 +76,7 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToke // Either a non-method frame, or not what we were looking // for. Ignore it - we're quiescing. - return Task.FromResult(true); + return Task.CompletedTask; } return base.HandleFrameAsync(frame, cancellationToken); diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs index 0631226ffd..a85d36c1bf 100644 --- a/projects/RabbitMQ.Client/client/impl/Session.cs +++ b/projects/RabbitMQ.Client/client/impl/Session.cs @@ -46,17 +46,16 @@ public Session(Connection connection, ushort channelNumber, uint maxBodyLength) _assembler = new CommandAssembler(maxBodyLength); } - public override async Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) + public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { - bool shallReturnFramePayload = _assembler.HandleFrame(in frame, out IncomingCommand cmd); + _assembler.HandleFrame(frame, out IncomingCommand cmd); - if (!cmd.IsEmpty) + if (cmd.IsEmpty) { - await CommandReceived.Invoke(cmd, cancellationToken) - .ConfigureAwait(false); + return Task.CompletedTask; } - return shallReturnFramePayload; + return CommandReceived.Invoke(cmd, cancellationToken); } } } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 1980d09053..50ec2a17f5 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -118,7 +118,7 @@ public void Close(ShutdownEventArgs reason, bool notify) } } - public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); + public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); public void Notify() { diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index 6393b7efc0..f0c4822ca1 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -278,16 +278,16 @@ await _pipeReader.CompleteAsync() } } - public ValueTask ReadFrameAsync(CancellationToken mainLoopCancellationToken) + public ValueTask ReadFrameAsync(InboundFrame frame, CancellationToken mainLoopCancellationToken) { return InboundFrame.ReadFromPipeAsync(_pipeReader, - _amqpTcpEndpoint.MaxInboundMessageBodySize, mainLoopCancellationToken); + _amqpTcpEndpoint.MaxInboundMessageBodySize, frame, mainLoopCancellationToken); } - public bool TryReadFrame(out InboundFrame frame) + public bool TryReadFrame(InboundFrame frame) { return InboundFrame.TryReadFrameFromPipe(_pipeReader, - _amqpTcpEndpoint.MaxInboundMessageBodySize, out frame); + _amqpTcpEndpoint.MaxInboundMessageBodySize, frame); } public async Task SendProtocolHeaderAsync(CancellationToken cancellationToken) From 5ccfd7fcc390324b973270b8e1ef85b6d036dbdd Mon Sep 17 00:00:00 2001 From: Luiz Carlos Faria Date: Sun, 23 Jun 2024 06:51:31 -0300 Subject: [PATCH 3/4] Add `DispatchConsumersAsyncEnabled` property on `IConnection` (#1611) * Name new property `DispatchConsumersAsyncEnabled`. * Add a check on `BasicConsume` for when a regular dispatcher is used, and an async consumer passed. * Test the new `DispatchConsumersAsyncEnabled` property. --- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 3 ++- .../RabbitMQ.Client/client/api/IConnection.cs | 6 +++++ .../client/impl/AutorecoveringConnection.cs | 2 ++ .../client/impl/ChannelBase.cs | 10 ++++++- .../RabbitMQ.Client/client/impl/Connection.cs | 2 ++ .../Test/Integration/TestAsyncConsumer.cs | 16 ++++++++++++ projects/Test/Integration/TestConsumer.cs | 26 +++++++++++++++++++ 7 files changed, 63 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 6f9bcba8f2..98fa9fc672 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1 +1,2 @@ -RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void \ No newline at end of file +RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void +RabbitMQ.Client.IConnection.DispatchConsumersAsyncEnabled.get -> bool \ No newline at end of file diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index a699b9ce27..286b810570 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -126,6 +126,11 @@ public interface IConnection : INetworkConnection, IDisposable /// IEnumerable ShutdownReport { get; } + /// + /// Returns true if the connection is set to use asynchronous consumer dispatchers. + /// + public bool DispatchConsumersAsyncEnabled { get; } + /// /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot @@ -236,5 +241,6 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// /// Cancellation token Task CreateChannelAsync(CancellationToken cancellationToken = default); + } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index d09becd74f..298e3066c2 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -176,6 +176,8 @@ public event EventHandler RecoveringConsumer public IProtocol Protocol => Endpoint.Protocol; + public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; + public async ValueTask CreateNonRecoveringChannelAsync(CancellationToken cancellationToken) { ISession session = InnerConnection.CreateSession(); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index f13d9e66b2..db66cce057 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -975,12 +975,20 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c { if (ConsumerDispatcher is AsyncConsumerDispatcher) { - if (!(consumer is IAsyncBasicConsumer)) + if (false == (consumer is IAsyncBasicConsumer)) { throw new InvalidOperationException("When using an AsyncConsumerDispatcher, the consumer must implement IAsyncBasicConsumer"); } } + if (ConsumerDispatcher is ConsumerDispatcher) + { + if (consumer is IAsyncBasicConsumer) + { + throw new InvalidOperationException("When using an ConsumerDispatcher, the consumer must not implement IAsyncBasicConsumer"); + } + } + // NOTE: // Maybe don't dispose this instance because the CancellationToken must remain // valid for processing the response. diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 54a84fe062..ad1aeb859d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) public int LocalPort => _frameHandler.LocalPort; public int RemotePort => _frameHandler.RemotePort; + public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; + public IDictionary? ServerProperties { get; private set; } public IEnumerable ShutdownReport => _shutdownReport; diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index b871eb7332..ef4cd879a1 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output) [Fact] public async Task TestBasicRoundtripConcurrent() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -145,6 +147,8 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -320,6 +324,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() [Fact] public async Task TestBasicRejectAsync() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + string queueName = GenerateQueueName(); var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -485,6 +491,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicNackAsync() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => @@ -558,6 +566,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task NonAsyncConsumerShouldThrowInvalidOperationException() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + bool sawException = false; QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); @@ -576,6 +586,8 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException() [Fact] public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); var tasks = new List(); for (int i = 0; i < 256; i++) @@ -596,6 +608,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() [Fact] public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + string exchangeName = GenerateExchangeName(); string queue1Name = GenerateQueueName(); string queue2Name = GenerateQueueName(); @@ -663,6 +677,8 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() [Fact] public async Task TestCloseWithinEventHandler_GH1567() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(); diff --git a/projects/Test/Integration/TestConsumer.cs b/projects/Test/Integration/TestConsumer.cs index d747a51b3e..3b95c14677 100644 --- a/projects/Test/Integration/TestConsumer.cs +++ b/projects/Test/Integration/TestConsumer.cs @@ -49,9 +49,31 @@ public TestConsumer(ITestOutputHelper output) : base(output) { } + [Fact] + public async Task AsyncConsumerShouldThrowInvalidOperationException() + { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + + bool sawException = false; + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); + await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); + var consumer = new AsyncEventingBasicConsumer(_channel); + try + { + string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer); + } + catch (InvalidOperationException) + { + sawException = true; + } + Assert.True(sawException, "did not see expected InvalidOperationException"); + } + [Fact] public async Task TestBasicRoundtrip() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + TimeSpan waitSpan = TimeSpan.FromSeconds(2); QueueDeclareOk q = await _channel.QueueDeclareAsync(); await _channel.BasicPublishAsync("", q.QueueName, _body); @@ -77,6 +99,8 @@ public async Task TestBasicRoundtrip() [Fact] public async Task TestBasicRoundtripNoWait() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + QueueDeclareOk q = await _channel.QueueDeclareAsync(); await _channel.BasicPublishAsync("", q.QueueName, _body); var consumer = new EventingBasicConsumer(_channel); @@ -101,6 +125,8 @@ public async Task TestBasicRoundtripNoWait() [Fact] public async Task ConcurrentEventingTestForReceived() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + const int NumberOfThreads = 4; const int NumberOfRegistrations = 5000; From 870a1f45f0d74c3bba6891a23360d8b06e1a3697 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Jun 2024 10:29:14 -0700 Subject: [PATCH 4/4] Ensure that recovery uses copies of argument dicts Reported here: https://groups.google.com/g/rabbitmq-users/c/hk5pJ4cKF0c * Ensure that arguments passed to a queue, exchange, binding, or consumer are copied when being recorded. --- .../client/impl/RecordedBinding.cs | 9 ++ .../client/impl/RecordedConsumer.cs | 10 +- .../client/impl/RecordedExchange.cs | 10 +- .../client/impl/RecordedQueue.cs | 10 +- .../TestRecoveringConsumerEventHandlers.cs | 18 ++-- .../TestQueueRecoveryWithArguments.cs | 92 +++++++++++++++++++ 6 files changed, 140 insertions(+), 9 deletions(-) create mode 100644 projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs diff --git a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs index ab2d72a1a3..71cd60f020 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs @@ -57,6 +57,15 @@ public RecordedBinding(bool isQueueBinding, string destination, string source, s _source = source; _routingKey = routingKey; _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public RecordedBinding(string destination, in RecordedBinding old) diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs index a2c789c72d..2d7402cb5d 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs @@ -84,7 +84,15 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, _autoAck = autoAck; _exclusive = exclusive; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public AutorecoveringChannel Channel => _channel; diff --git a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs index a5650c81a1..fd492a3b28 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs @@ -56,7 +56,15 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete, _type = type; _durable = durable; _autoDelete = autoDelete; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs index d4148a2b9c..959a12b488 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs @@ -59,7 +59,15 @@ public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusi _durable = durable; _exclusive = exclusive; _autoDelete = autoDelete; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public RecordedQueue(string newName, in RecordedQueue old) diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index 66d08e9b03..d7db66feed 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -69,10 +69,17 @@ public async Task TestRecoveringConsumerEventHandlers_Called(int iterations) [Fact] public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown() { - var myArgs = new Dictionary { { "first-argument", "some-value" } }; + const string key = "first-argument"; + const string value = "some-value"; + + IDictionary arguments = new Dictionary + { + { key, value } + }; + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); var cons = new EventingBasicConsumer(_channel); - string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs); + string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: arguments); bool ctagMatches = false; bool consumerArgumentMatches = false; @@ -82,15 +89,14 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick // and assert in the test function. ctagMatches = args.ConsumerTag == expectedCTag; - consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value"; - args.ConsumerArguments["first-argument"] = "event-handler-set-this-value"; + consumerArgumentMatches = (string)args.ConsumerArguments[key] == value; }; await CloseAndWaitForRecoveryAsync(); Assert.True(ctagMatches, "expected consumer tag to match"); Assert.True(consumerArgumentMatches, "expected consumer arguments to match"); - string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary); - Assert.Equal("event-handler-set-this-value", actualVal); + string actualVal = (string)Assert.Contains(key, arguments); + Assert.Equal(value, actualVal); } } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs new file mode 100644 index 0000000000..6dcfd25962 --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs @@ -0,0 +1,92 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery +{ + public class TestQueueRecoveryWithArguments : TestConnectionRecoveryBase + { + public TestQueueRecoveryWithArguments(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestQueueRecoveryWithDlxArgument_RabbitMQUsers_hk5pJ4cKF0c() + { + string tdiWaitExchangeName = GenerateExchangeName(); + string tdiRetryExchangeName = GenerateExchangeName(); + string testRetryQueueName = GenerateQueueName(); + string testQueueName = GenerateQueueName(); + + await _channel.ExchangeDeclareAsync(exchange: tdiWaitExchangeName, + type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); + await _channel.ExchangeDeclareAsync(exchange: tdiRetryExchangeName, + type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); + + var arguments = new Dictionary + { + { "x-dead-letter-exchange", "tdi.retry.exchange" }, + { "x-dead-letter-routing-key", "QueueTest" } + }; + + await _channel.QueueDeclareAsync(testRetryQueueName, durable: false, exclusive: false, autoDelete: false, arguments); + + arguments["x-dead-letter-exchange"] = "tdi.wait.exchange"; + arguments["x-dead-letter-routing-key"] = "QueueTest"; + + await _channel.QueueDeclareAsync(testQueueName, durable: false, exclusive: false, autoDelete: false, arguments); + + arguments.Remove("x-dead-letter-exchange"); + arguments.Remove("x-dead-letter-routing-key"); + + await _channel.QueueBindAsync(testRetryQueueName, tdiWaitExchangeName, testQueueName); + + await _channel.QueueBindAsync(testQueueName, tdiRetryExchangeName, testQueueName); + + var consumerAsync = new EventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync(queue: testQueueName, autoAck: false, consumer: consumerAsync); + + await CloseAndWaitForRecoveryAsync(); + + QueueDeclareOk q0 = await _channel.QueueDeclarePassiveAsync(testRetryQueueName); + Assert.Equal(testRetryQueueName, q0.QueueName); + + QueueDeclareOk q1 = await _channel.QueueDeclarePassiveAsync(testQueueName); + Assert.Equal(testQueueName, q1.QueueName); + } + } +}