BrokeredMessage is automatically removed after calling OnMessage ()

I'm trying to enqueue items from the Azure Service Bus so I can process them in bulk. I know Azure Service Bus has ReceiveBatch (), but it seems problematic for the following reasons:

  • I can get a maximum of 256 messages at a time, and even that might be random based on the size of the message.
  • Even if I drop in how many messages are pending, I don't know how many RequestBatch requests to make, because I don't know how many messages each call will return to me. Since messages will keep coming, I can't just keep making requests until it is empty, as it will never be empty.

I decided to just use a message listener which is cheaper than wasted and gives me more control.

Basically I'm trying to create many messages and then process them all at once. I use a timer to delay the delay, but I need to be able to queue up my items as they arrive.

As per my timer requirement, it seemed like collecting the lock was not a good option, so I am trying to use ConcurrentBag.

var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

while (true)
{
    var sw = WaitableStopwatch.StartNew();
    BrokeredMessage msg;
    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
    {
        ...do this until I have a thousand ready to be written to DB in batch
        Console.WriteLine("Completing message");
        msg.Complete(); // <== ERRORS HERE
    }

    sw.Wait(MINIMUM_DELAY);
}

      

However, as soon as I access the message outside of OnMessage it shows that BrokeredMessage is already hosted.

I think it must be some kind of automatic behavior of OnMessage and I don't see any way to do anything with a message other than a process that I don't want to do.

+3


source to share


5 answers


It's incredibly easy to do with BlockingCollection

.

var batchingQueue = new BlockingCollection<BrokeredMessage>();

myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

      

And your consumer flow:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    Console.WriteLine("Completing message");
    msg.Complete();
}

      

GetConsumingEnumerable returns an iterator that consumes items in the queue until the property is set IsCompleted

and the queue is empty. If the queue is empty but IsCompleted

is False

, it waits for an unoccupied wait for the next item.

To cancel the consumer thread (i.e. shutdown the program), you will stop adding things to the queue and get a call to the main thread batchingQueue.CompleteAdding

. The consumer will empty the queue, see that the property IsCompleted

has a value, True

and exit.

The usage BlockingCollection

here is better than ConcurrentBag

or ConcurrentQueue

because the interface is BlockingCollection

easier to work with. In particular, usage GetConsumingEnumerable

saves you from having to worry about checking an account or busy waiting (polling loops). It just works.

Also note that it ConcurrentBag

has a rather strange behavior when deleting. In particular, the order of deleting elements differs depending on which thread is deleting the element. The thread that created the bag removes the items in a different order than the other threads. See Using the ConcurrentBag Collection for details .



You have not specified why you want to inject items in the input. Unless there are fundamental reasons for creating performance, it is not a good idea to complicate your code with this dispensing logic.


If you want to do a batch write to the database, I would suggest using a simple one List<T>

to buffer items. If you need to process items before writing them to the database, use the technique shown above to process them. Then, instead of writing directly to the database, add the item to the list. When the list receives 1000 items or the specified amount of time has elapsed, select a new list and run a task to write the old list to the database. Like this:

// at class scope

// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;

// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);  

// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();

List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

      

Then your consumer:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    // process the message
    Console.WriteLine("Completing message");
    msg.Complete();
    lock (_listLock)
    {
        _recordBuffer.Add(msg);
        if (_recordBuffer.Count >= MaxBufferItems)
        {
            // Stop the timer
            _flushTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // Save the old list and allocate a new one
            var myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();

            // Start a task to write to the database
            Task.Factory.StartNew(() => FlushBuffer(myList));

            // Restart the timer
            _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
        }
    }
}

private void TimedFlush()
{
    bool lockTaken = false;
    List<BrokeredMessage> myList = null;

    try
    {
        if (Monitor.TryEnter(_listLock, 0, out lockTaken))
        {
            // Save the old list and allocate a new one
            myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_listLock);
        }
    }

    if (myList != null)
    {
        FlushBuffer(myList);
    }

    // Restart the timer
    _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}

      

The idea here is that you delete the old list, highlight the new list to continue processing, and then write the old list items to the database. The interlock prevents the timer and the counter of recordings from being activated on each other. Without blocking, everything will most likely work fine for a while, and then you get weird crashes at unpredictable times.

I like this design because it excludes consumer polling. The only thing I don't like is that the consumer needs to know the timer (i.e. it should stop and then restart the timer). With a little thought, I could eliminate this requirement. But he works well as he wrote.

+3


source


Switching to OnMessageAsync solved the problem for me



_queueClient.OnMessageAsync(async receivedMessage =>

      

+2


source


I reached out to Microsoft that the BrokeredMessage issue has been fixed on MSDN , this is the answer:

A very basic rule of thumb and I'm not sure if this is documented. The received message must be processed in the callback function mode. In your case, the messages will be disposed of when the asynchronous callback is complete, so your full attempts fail with an ObjectDisposedException on another thread.

I really don't see how queuing messages for further processing help in throughput. This will add more burden to the client. Try to handle the message in an async callback, which should be workable enough.

In my case, this means that I cannot use ServiceBus in the way I wanted and I need to think about how I want everything to work. Asshole.

+1


source


I had the same problem when I started working with Azure Service Bus.

I found that the OnMessage method always deletes the BrokedMessage object. The approach suggested by Jim Michel did not work for me (but it was very interesting to read - thanks!).

After some investigation, I found that the whole approach was flawed. Let me explain the correct way to do what you want.

  • Use the BrokedMessage.Complete () method only inside the OnMessage method handler.
  • If you need to process the message outside of this method, you should use the QueueClient.Complete (Guid lockToken) method. "LockToken" is a property of the BrokeredMessage object.

Example:

 var messageOptions = new OnMessageOptions {
      AutoComplete       = false,
      AutoRenewTimeout   = TimeSpan.FromMinutes( 5 ),
     MaxConcurrentCalls = 1
 };
 var buffer = new Dictionary<string, Guid>();

 // get message from queue 
 myQueueClient.OnMessage(
      m => buffer.Add(key: m.GetBody<string>(), value: m.LockToken), 
      messageOptions // this option says to ServiceBus to "froze" message in he queue until we process it
 );         

 foreach(var item in buffer){
    try {
        Console.WriteLine($"Process item: {item.Key}");
        myQueueClient.Complete(item.Value);// you can also use method CompleteBatch(...) to improve performance
    } 
    catch{
        // "unfroze" message in ServiceBus. Message would be delivered to other listener 
        myQueueClient.Defer(item.Value);
    }
 }

      

+1


source


My solution was to get the SequenceNumber message, then postpone the message and add the SequenceNumber to the BlockingCollection. Once the BlockingCollection selects a new item, it can retrieve the pending message using the SequenceNumber and mark the message complete. If for some reason the BlockingCollection does not process the SequenceNumber, it will remain in the queue as pending, so it can be retrieved later when the process is restarted. This protects against message loss if the process terminates abnormally while there are still items in the BlockingCollection.

BlockingCollection<long> queueSequenceNumbers = new BlockingCollection<long>();

//This finds any deferred/unfinished messages on startup. 
BrokeredMessage existingMessage = client.Peek();
while (existingMessage != null)
{
    if (existingMessage.State == MessageState.Deferred)
    {
        queueSequenceNumbers.Add(existingMessage.SequenceNumber);
    }
    existingMessage = client.Peek();
}


//setup the message handler
Action<BrokeredMessage> processMessage = new Action<BrokeredMessage>((message) =>
{
    try
    {
        //skip deferred messages if they are already in the queueSequenceNumbers collection.
        if (message.State != MessageState.Deferred || (message.State == MessageState.Deferred && !queueSequenceNumbers.Any(x => x == message.SequenceNumber)))
        {
            message.Defer();
            queueSequenceNumbers.Add(message.SequenceNumber);
        }

    }
    catch (Exception ex)
    {
         // Indicates a problem, unlock message in queue
         message.Abandon();
    }

});


// Callback to handle newly received messages
client.OnMessage(processMessage, new OnMessageOptions() { AutoComplete = false, MaxConcurrentCalls = 1 });            

//start the blocking loop to process messages as they are added to the collection
foreach (var queueSequenceNumber in queueSequenceNumbers.GetConsumingEnumerable())
{
     var message = client.Receive(queueSequenceNumber);
     //mark the message as complete so it removed from the queue
     message.Complete();                 
     //do something with the message       
}

      

0


source







All Articles