.Net. Subscriber Subscriber.Next.RabbitMQ
I am using RabbitMQ.net client on windows service. I have millions of messages going into a volume that are then processed and then the output is put on another queue. I create a factory connection with a heartbeat of 30 and then create a connection whenever the connection or caller is lost. In production, my code probably works most of the time. However, in my integration tests, I know it doesn't work most of the time. Here is my code:
public void ReceiveAll(Func<IDictionary<ulong, byte[]>, IOnStreamWatchResult> onReceiveAllCallback, int batchSize, CancellationToken cancellationToken)
{
IModel channel = null;
Subscription subscription = null;
while (!cancellationToken.IsCancellationRequested)
{
if (subscription == null || subscription.Model.IsClosed)
{
channel = _channelFactory.CreateChannel(ref _connection, _messageQueueConfig, _connectionFactory);
// This instructs the channel to not prefetch more than batch count into shared queue
channel.BasicQos(0, Convert.ToUInt16(batchSize), false);
subscription = new Subscription(channel, _messageQueueConfig.Queue, false);
}
try
{
BasicDeliverEventArgs message;
var dequeuedMessages = new Dictionary<ulong, byte[]>();
do
{
if (subscription.Next(_messageQueueConfig.DequeueTimeout.Milliseconds, out message))
{
if (message == null)
{
// This means channel is closed and the messages in shared queue would get moved back to ready state
DisposeChannelAndSubcription(ref channel, ref subscription);
ReceiveAll(onReceiveAllCallback, batchSize, cancellationToken);
}
else
{
dequeuedMessages.Add(message.DeliveryTag, message.Body);
}
}
} while (message != null && batchSize > dequeuedMessages.Count && !cancellationToken.IsCancellationRequested);
if (cancellationToken.IsCancellationRequested)
{
if (dequeuedMessages.Any())
{
NackUnProcessedMessages(subscription, dequeuedMessages.Keys);
}
DisposeChannelAndSubcription(ref channel, ref subscription);
dequeuedMessages.Clear();
break;
}
try
{
var onStreamWatchResult = onReceiveAllCallback(dequeuedMessages);
AckProcessedMessages(subscription, onStreamWatchResult.Processed);
NackUnProcessedMessages(subscription, onStreamWatchResult.UnProcessed);
dequeuedMessages.Clear();
}
catch(Exception unhandledException)
{
NackUnProcessedMessages(subscription, dequeuedMessages.Keys);
}
}
catch (EndOfStreamException endOfStreamException)
{
DisposeChannelAndSubcription(ref channel, ref subscription);
}
catch (OperationInterruptedException operationInterruptedException)
{
DisposeChannelAndSubcription(ref channel, ref subscription);
}
}
}
The batch size is set to 4 because I put 4 posts in my integration test, which is just a windows service that I started after running the unit tests.
The problem is that almost always the caller preselects 4 messages, as expected, returns true for the first two. Iterates the next text, but returns false after that. I believe this is happening because my messages are not being received immediately. In my integration test, I will force 2 and nack 2 messages and then read 2 times again in the nacked messages to clear the queue. However, after nacking, the messages do not return to the ready state and hence the test hangs. What am I doing wrong here? Can't I understand something from the documentation? Here is my nacking code:
subscription.Model.BasicNack(deliveryTag, false, true);
source to share
No one has answered this question yet
See similar questions:
or similar: