Skip to content

Commit

Permalink
Adjust to client changes embracing the new concurrency setting
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Aug 25, 2020
1 parent 9f4d144 commit 23c7b88
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 48,8 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection
Password = connectionConfiguration.Password,
RequestedHeartbeat = heartbeatInterval ?? connectionConfiguration.RequestedHeartbeat,
NetworkRecoveryInterval = networkRecoveryInterval ?? connectionConfiguration.RetryDelay,
UseBackgroundThreadsForIO = true
UseBackgroundThreadsForIO = true,
DispatchConsumersAsync = true
};

connectionFactory.Ssl.ServerName = connectionConfiguration.Host;
Expand Down Expand Up @@ -82,12 83,14 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection

public IConnection CreateAdministrationConnection() => CreateConnection($"{endpointName} Administration", false);

public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true)
public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true, int consumerDispatchConcurrency = 1)
{
lock (lockObject)
{
connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled;
connectionFactory.ClientProperties["connected"] = DateTime.Now.ToString("G");
// it is OK to modify as long as we are under a lock
connectionFactory.ConsumerDispatchConcurrency = consumerDispatchConcurrency;

var connection = connectionFactory.CreateConnection(connectionName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 12,7 @@
<PackageReference Include="Obsolete.Fody" Version="5.2.1" PrivateAssets="All" />
<PackageReference Include="Particular.CodeRules" Version="0.3.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="0.9.0" PrivateAssets="All" />
<PackageReference Include="RabbitMQ.Client" Version="[6.1.0, 7.0.0)" />
<PackageReference Include="RabbitMQ.Client" Version="[6.2.1, 7.0.0)" />
</ItemGroup>

<PropertyGroup>
Expand Down
75 changes: 22 additions & 53 deletions src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 30,13 @@ sealed class MessagePump : IPushMessages, IDisposable
PushSettings settings;
CriticalError criticalError;
MessagePumpConnectionFailedCircuitBreaker circuitBreaker;
TaskScheduler exclusiveScheduler;

// Start
int maxConcurrency;
SemaphoreSlim semaphore;
long numberOfExecutingReceives;
CancellationTokenSource messageProcessing;
IConnection connection;
EventingBasicConsumer consumer;
AsyncEventingBasicConsumer consumer;

// Stop
TaskCompletionSource<bool> connectionShutdownCompleted;
Expand All @@ -63,8 62,6 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E

circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.InputQueue} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalError);

exclusiveScheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;

if (settings.PurgeOnStartup)
{
queuePurger.Purge(settings.InputQueue);
Expand All @@ -76,10 73,9 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
public void Start(PushRuntimeSettings limitations)
{
maxConcurrency = limitations.MaxConcurrency;
semaphore = new SemaphoreSlim(limitations.MaxConcurrency, limitations.MaxConcurrency);
messageProcessing = new CancellationTokenSource();

connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump");
connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump", consumerDispatchConcurrency: maxConcurrency);

var channel = connection.CreateModel();

Expand All @@ -102,7 98,7 @@ public void Start(PushRuntimeSettings limitations)

channel.BasicQos(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false);

consumer = new EventingBasicConsumer(channel);
consumer = new AsyncEventingBasicConsumer(channel);

consumer.Registered = Consumer_Registered;
connection.ConnectionShutdown = Connection_ConnectionShutdown;
Expand All @@ -117,7 113,7 @@ public async Task Stop()
consumer.Received -= Consumer_Received;
messageProcessing.Cancel();

while (semaphore.CurrentCount != maxConcurrency)
while (Interlocked.Read(ref numberOfExecutingReceives) > 0)
{
await Task.Delay(50).ConfigureAwait(false);
}
Expand All @@ -136,9 132,10 @@ public async Task Stop()
await connectionShutdownCompleted.Task.ConfigureAwait(false);
}

void Consumer_Registered(object sender, ConsumerEventArgs e)
Task Consumer_Registered(object sender, ConsumerEventArgs e)
{
circuitBreaker.Success();
return Task.CompletedTask;
}

void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
Expand All @@ -153,10 150,16 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
}
}

async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventRaisingThreadId = Thread.CurrentThread.ManagedThreadId;
if (messageProcessing.Token.IsCancellationRequested)
{
return;
}

Interlocked.Increment(ref numberOfExecutingReceives);

// technically we don't need this anymore
var messageBody = eventArgs.Body.ToArray();

var eventArgsCopy = new BasicDeliverEventArgs(
Expand All @@ -171,49 174,16 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)

try
{
await semaphore.WaitAsync(messageProcessing.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}

try
{
// The current thread will be the event-raising thread if either:
//
// a) the semaphore was entered synchronously (did not have to wait).
// b) the event was raised on a thread pool thread,
// and the semaphore was entered asynchronously (had to wait),
// and the continuation happened to be scheduled back onto the same thread.
if (Thread.CurrentThread.ManagedThreadId == eventRaisingThreadId)
{
// In RabbitMQ.Client 4.1.0, the event is raised by reusing a single, explicitly created thread,
// so we are in scenario (a) described above.
// We must yield to allow the thread to raise more events while we handle this one,
// otherwise we will never process messages concurrently.
//
// If a future version of RabbitMQ.Client changes its threading model, then either:
//
// 1) we are in scenario (a), but we *may not* need to yield.
// E.g. the client may raise the event on a new, explicitly created thread each time.
// 2) we cannot tell whether we are in scenario (a) or scenario (b).
// E.g. the client may raise the event on a thread pool thread.
//
// In both cases, we cannot tell whether we need to yield or not, so we must yield.
await Task.Yield();
}

await Process(eventArgsCopy, messageBody).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.Warn("Failed to process message. Returning message to queue...", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(eventArgs.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicRejectAndRequeueIfOpen(eventArgs.DeliveryTag).ConfigureAwait(false);
}
finally
{
semaphore.Release();
Interlocked.Decrement(ref numberOfExecutingReceives);
}
}

Expand Down Expand Up @@ -286,7 256,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
catch (Exception ex)
{
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);

return;
}
Expand All @@ -295,13 265,13 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)

if (processed && tokenSource.IsCancellationRequested)
{
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);
}
else
{
try
{
await consumer.Model.BasicAckSingle(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicAckSingle(message.DeliveryTag).ConfigureAwait(false);
}
catch (AlreadyClosedException ex)
{
Expand Down Expand Up @@ -329,14 299,14 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
catch (Exception ex)
{
Logger.Error($"Failed to move poison message to queue '{queue}'. Returning message to original queue...", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);

return;
}

try
{
await consumer.Model.BasicAckSingle(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
await consumer.Model.BasicAckSingle(message.DeliveryTag).ConfigureAwait(false);
}
catch (AlreadyClosedException ex)
{
Expand All @@ -347,7 317,6 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
public void Dispose()
{
circuitBreaker?.Dispose();
semaphore?.Dispose();
messageProcessing?.Dispose();
connection?.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 13,17 @@ class MessageState
public ulong DeliveryTag { get; set; }
}

public static Task BasicAckSingle(this IModel channel, ulong deliveryTag, TaskScheduler scheduler) =>
public static Task BasicAckSingle(this IModel channel, ulong deliveryTag) =>
TaskEx.StartNew(
new MessageState { Channel = channel, DeliveryTag = deliveryTag },
state =>
{
var messageState = (MessageState)state;
messageState.Channel.BasicAck(messageState.DeliveryTag, false);
},
scheduler);
TaskScheduler.Default);

public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong deliveryTag, TaskScheduler scheduler) =>
public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong deliveryTag) =>
TaskEx.StartNew(
new MessageState { Channel = channel, DeliveryTag = deliveryTag },
state =>
Expand All @@ -41,6 41,6 @@ public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong delive
}
}
},
scheduler);
TaskScheduler.Default);
}
}

0 comments on commit 23c7b88

Please sign in to comment.