Defer QueueTrigger gracefully

I am currently using the WebJobs SDK to consume messages released to the queue.

My method is as one parameter with the attribute [Microsoft.Azure.WebJobs.QueueTrigger (...)] and runs fine. In some cases, the method can handle the message, but at times I would prefer it to reject the message until a critical resource becomes available.

I tried to throw an exception in this case, but contrary to what the link says, the queue trigger fires again again (apparently not waiting for the lease time).

Is there a way to gracefully defer message processing? Would it be safe to just freeze a thread waiting for a critical resource?

Any hints would be much appreciated.

+3


source to share


3 answers


I don't think you can postpone the post in the current version.

Possible workaround

You can re-add the same message with a delay, and to avoid duplication, you can set MaxDequeueCount

to 1

, this will send an error message immediately to reflect the queue after the exception:

        JobHostConfiguration configuration = new JobHostConfiguration();
        configuration.Queues.MaxDequeueCount = 1;

      



and message processor - add your message with delay and exception:

    public static void ProcessMessage([QueueTrigger("resource-heavy-queue")] string message, [Queue("resource-heavy-queue")]  CloudQueue originalQueue)
    {
        if ( /*Resource unavaliable*/)
        {
            var messageToReAdd = new CloudQueueMessage(message);
            originalQueue.AddMessage(messageToReAdd, null, TimeSpan.FromSeconds(10));
            throw new ResourcesNotAvailableException();
        }
    }

      

This way you can implement some sort of return strategy for your resource. Unfortunately, you have to manually solve some problems:

  • Dealing with Poisonous Messages - If you keep re-adding the same message, you may end up in an endless loop, so you need to expand the message model to carry over NumberOfRetries

    and increment it every time you re-add it
  • The message Id

    and InsertionTime

    will be different after each re-add, so you cannot rely on them.
+1


source


Sorry, you cannot postpone the trigger. What you want is a multitrigger (trigger when a message and resource is available) that the Azure WebJobs SDK does not have

There are several workarounds. You can freeze a thread if all messages on that queue require critical resources - you implement a semaphore - otherwise it becomes difficult to schedule because the SDK will not process new messages if you reach the maximum number of functions running in parallel.



What I would do instead of triggering on the queue message is triggering on a critical resource. When a resource is available, it puts a message on another queue and only then checks to see if there are any messages to process that require the resource.

0


source


What I am doing in this situation is to have ICollector

for the same queue I am starting with, for example

public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)

      

(notation %QueueName%

makes SDK fetch from app.config)

Then in the handler, I do something like

if (needToWait)
{
    var delayedMessage = new BrokeredMessage(originalMessageBody) { Label = originalLabel, MessageId = originalMessageId, ScheduledEnqueueTimeUtc = DateTime.UtcNow + this.ExecutionDelay };
    queue.Add(delayedMessage);
    return;
}

      

Thus, the current message will be completed, but a new one with the same properties is scheduled to be delivered after a certain timeout.

No threads are blocked in the creation of this program.

0


source







All Articles