Learn how to create, manage, and clean up RabbitMQ queues dynamically with MassTransit — ideal for adaptive, message-driven architectures.
Foreword
In modern distributed systems, queues are essential for scalable, decoupled communication. But static queue configurations cannot keep up with dynamic workloads or evolving message flows.
This article explains how to connect and disconnect queues dynamically using MassTransit and RabbitMQ, allowing services to create, use, and clean up queues on demand — building a more responsive and efficient messaging architecture.
In a Nutshell
Normally, all queues are registered during the application’s startup phase. However, to make your application more dynamic and flexible, you can manage endpoints at runtime by injecting an IReceiveEndpointConnector. This interface provides the ConnectReceiveEndpoint method, which allows you to connect queues on demand.
The ConnectReceiveEndpoint method returns a HostReceiveEndpointHandle object, which can later be used to stop or disconnect the dynamically added queue. Let’s see how this works in practice:
public class MassTransitQueueManager
{
private readonly IReceiveEndpointConnector _receiveEndpointConnector;
private readonly ConcurrentDictionary<string, HostReceiveEndpointHandle> _handles = new();
public MassTransitQueueManager(IReceiveEndpointConnector receiveEndpointConnector)
{
_receiveEndpointConnector = receiveEndpointConnector;
}
public void ConnectEndpoint<TConsumer>(string queueName)
where TConsumer : class, IConsumer
{
var handle = _receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.ConfigureConsumer(context, typeof(TConsumer));
});
_handles[queueName] = handle;
}
public async Task DisconnectEndpointAsync(string queueName, CancellationToken cancellationToken = default)
{
if (_handles.TryGetValue(queueName, out var handle))
{
await handle.StopAsync(cancellationToken);
_handles.Remove(queueName, out _);
}
}
}
Inside the configuration callback of ConnectReceiveEndpoint, you can define your consumer configurations — including batching, fault handling, and other options.
One prerequisite for this approach is that the consumer must still be registered during the application’s startup phase to enable dependency injection.
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context,cfg) =>
{
cfg.Host("localhost", "/", h => {
h.Username("guest");
h.Password("guest");
});
});
});
Bonus Material: Deleting Dynamically Added Queues
If you want dynamically added queues to be automatically deleted when their consumers are removed (i.e. endpoint is stopped), you can slightly modify the above code to enable RabbitMQ’s AutoDelete feature while configuring the endpoint.
public void ConnectEndpoint<TConsumer>(string queueName)
where TConsumer : class, IConsumer
{
var handle = _receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rabbitCfg)
rabbitCfg.AutoDelete = true; // deletes queue automatically
cfg.ConfigureConsumer(context, typeof(TConsumer));
});
_handles[queueName] = handle;
}
Optimization
The above code can be improved in two ways.
- By retaining the handle references so that endpoints can be stopped later — this can be achieved by registering the service as a singleton, allowing it to maintain a list of active handles.
- By ensuring thread safety using semaphores to guarantee that queues are created and destroyed only once.
Here’s a polished and well-structured revision of the above code.
public class MassTransitQueueManager : IMassTransitQueueManager, IAsyncDisposable
{
private readonly IReceiveEndpointConnector _receiveEndpointConnector;
private readonly ConcurrentDictionary<string, HostReceiveEndpointHandle> _handles = new();
private readonly SemaphoreSlim _connectSemaphore = new(1, 1);
private readonly SemaphoreSlim _disconnectSemaphore = new(1, 1);
public MassTransitQueueManager(IReceiveEndpointConnector receiveEndpointConnector)
{
_receiveEndpointConnector = receiveEndpointConnector ?? throw new ArgumentNullException(nameof(receiveEndpointConnector));
}
public bool IsEndpointConnected(string queueName)
{
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentException("Queue name cannot be null or whitespace.", nameof(queueName));
return _handles.ContainsKey(queueName);
}
public async Task ConnectEndpointAsync<TConsumer>(string queueName, CancellationToken cancellationToken = default)
where TConsumer : class, IConsumer
{
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentException("Queue name cannot be null or whitespace.", nameof(queueName));
await _connectSemaphore.WaitAsync(cancellationToken);
try
{
if (IsEndpointConnected(queueName))
return;
var handle = _receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rabbitCfg)
{
rabbitCfg.AutoDelete = true;
}
cfg.ConfigureConsumer<TConsumer>(context);
});
_handles[queueName] = handle;
}
finally
{
_connectSemaphore.Release();
}
}
public async Task DisconnectEndpointAsync(string queueName, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentException("Queue name cannot be null or whitespace.", nameof(queueName));
await _disconnectSemaphore.WaitAsync(cancellationToken);
try
{
if (_handles.TryRemove(queueName, out var handle))
{
await handle.StopAsync(cancellationToken);
}
}
finally
{
_disconnectSemaphore.Release();
}
}
public async ValueTask DisposeAsync()
{
foreach (var (_, handle) in _handles)
{
await handle.StopAsync();
}
_handles.Clear();
_connectSemaphore.Dispose();
_disconnectSemaphore.Dispose();
}
}
Conclusion
Dynamic queue management adds flexibility and control to message-driven systems. By connecting and disconnecting queues at runtime, you can build more adaptive and efficient applications while keeping your messaging infrastructure clean and responsive.