.Net. Subscriber Subscriber.Next.RabbitMQ

I am using RabbitMQ.net client on windows service. I have millions of messages going into a volume that are then processed and then the output is put on another queue. I create a factory connection with a heartbeat of 30 and then create a connection whenever the connection or caller is lost. In production, my code probably works most of the time. However, in my integration tests, I know it doesn't work most of the time. Here is my code:

public void ReceiveAll(Func<IDictionary<ulong, byte[]>, IOnStreamWatchResult> onReceiveAllCallback, int batchSize, CancellationToken cancellationToken)
{
    IModel channel = null;
    Subscription subscription = null;

    while (!cancellationToken.IsCancellationRequested)
    {
        if (subscription == null || subscription.Model.IsClosed)
        {
            channel = _channelFactory.CreateChannel(ref _connection, _messageQueueConfig, _connectionFactory);
            // This instructs the channel to not prefetch more than batch count into shared queue
            channel.BasicQos(0, Convert.ToUInt16(batchSize), false);
            subscription = new Subscription(channel, _messageQueueConfig.Queue, false);
        }

        try
        {
            BasicDeliverEventArgs message;
            var dequeuedMessages = new Dictionary<ulong, byte[]>();
            do
            {
                if (subscription.Next(_messageQueueConfig.DequeueTimeout.Milliseconds, out message))
                {
                    if (message == null)
                    {
                        // This means channel is closed and the messages in shared queue would get moved back to ready state
                        DisposeChannelAndSubcription(ref channel, ref subscription);
                        ReceiveAll(onReceiveAllCallback, batchSize, cancellationToken);
                    }
                    else
                    {
                        dequeuedMessages.Add(message.DeliveryTag, message.Body);
                    }
                }
            } while (message != null && batchSize > dequeuedMessages.Count && !cancellationToken.IsCancellationRequested);

            if (cancellationToken.IsCancellationRequested)
            {
                if (dequeuedMessages.Any())
                {
                    NackUnProcessedMessages(subscription, dequeuedMessages.Keys);
                }
                DisposeChannelAndSubcription(ref channel, ref subscription);
                dequeuedMessages.Clear();
                break;
            }

            try
            {
                var onStreamWatchResult = onReceiveAllCallback(dequeuedMessages);
                AckProcessedMessages(subscription, onStreamWatchResult.Processed);
                NackUnProcessedMessages(subscription, onStreamWatchResult.UnProcessed);
                dequeuedMessages.Clear();
            }
            catch(Exception unhandledException)
            {
                NackUnProcessedMessages(subscription, dequeuedMessages.Keys);
            }
        }
        catch (EndOfStreamException endOfStreamException)
        {
            DisposeChannelAndSubcription(ref channel, ref subscription);
        }
        catch (OperationInterruptedException operationInterruptedException)
        {
            DisposeChannelAndSubcription(ref channel, ref subscription);
        }
    }
}

      

The batch size is set to 4 because I put 4 posts in my integration test, which is just a windows service that I started after running the unit tests.

The problem is that almost always the caller preselects 4 messages, as expected, returns true for the first two. Iterates the next text, but returns false after that. I believe this is happening because my messages are not being received immediately. In my integration test, I will force 2 and nack 2 messages and then read 2 times again in the nacked messages to clear the queue. However, after nacking, the messages do not return to the ready state and hence the test hangs. What am I doing wrong here? Can't I understand something from the documentation? Here is my nacking code:

subscription.Model.BasicNack(deliveryTag, false, true);

      

+3


source to share





All Articles