Query Azure Service Bus queue from Azure WebJob using Node.js

Trying to poll an Azure service queue using a WebJob written in Node.js. I have created 2 WebJobs. The first is on demand and sends 10 unique messages to the queue. The second job is continuous and polls the queue for messages.

Meet the following problems:

  • Poll SLOW. It takes about 10 minutes on average to receive 10 messages. See below for details. In principle unusable at this speed. The whole delay lies in getting a response from receiveQueueMessage

    . The response time ranges from 0 seconds to ~ 120 seconds, with an average of 60 seconds.

  • Messages are received in random order. Not FIFO.

  • Sometimes messages are received twice, even if they are read in ReceiveAndDelete mode (I tried without the read mode parameter, which should be the default for ReceiveAndDelete, with {isReceiveAndDelete:true}

    and {isPeekLock:false}

    with the same results).

  • When the queue is empty, it should leave the receive request open for the day, but it always returns with no message error after 230 seconds. According to the documentation the max is 24 days, so I don't know where 230 seconds comes from:

The maximum timeout for a lock receive operation on the queue service bus is 24 days. However, REST based timeouts have a maximum value of 55 seconds.

Basically nothing works as advertised. What am I doing wrong?

Send Test Job message:

var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;

sendMessage(0);

function sendMessage(count)
{
    var message = {
        body: 'test message',
        customProperties: {
            message_number: count,
            sent_date: new Date
        },
        brokerProperties: {
            MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
        }
    };

    serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {

        if (!err) {
            console.log('sent test message number ' + count.toString());
        } else {
            console.error('error sending message: ' + err);
        }

    });

    //wait 5 seconds to ensure messages are received by service bus in correct order
    if (count < messagesToSend) {
        setTimeout(function(newCount) {
            //send next message
            sendMessage(newCount);
        }, 5000, count+1);
    }
}    

      

Continuous setting of message receive message:

console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);

function listenForMessages(serviceBus)
{
    var start = process.hrtime();
    var timeOut = 60*60*24; //long poll for 1 day
    serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {

        var end = process.hrtime(start);
        console.log('received a response in %ds seconds', end[0]);

        if (err) {

            console.log('error requesting message: ' + err);
            listenForMessages(serviceBus);

        } else {

            if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {

                console.log('received test message number ' + message.customProperties.message_number.toString());
                listenForMessages(serviceBus);

            } else {

                console.log('invalid message received');
                listenForMessages(serviceBus);

            }

        }

    });
}

      

Output log example:

[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds    
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive

      

+3


source to share


3 answers


And the problem was that the queue I was using was partitioned (the default option when creating a queue in the Azure portal). Once I created a new queue that was not partitioned, everything worked as expected with no lag (except for a strange 230 second timeout on a long polling attempt). So basically the node.js library doesn't work for partitioned queues. At all. Spent many days to figure this out. Will leave it here for others.



+4


source


Disabling the Service Bus Queue partitioning flag worked for me too.

On a multipartition queue, some messages had delays of more than 30 minutes. A simple DotNet web client was able to download all messages without any delay. However, as soon as nodejs had to load the posts, only the first post was loaded without issue, after which there were delays. Playing with nodejs to change the http keepalive agent and socket timeout parameters did not improve the situation.



After stopping nodejs, I had to wait a few minutes before the DotNet client actually started working without issue. This was reproduced several times. I also found that the simple DotNet web user program showed similar problems after being started and stopped several times in a row.

Anyway, your post showed me the solution: turn off the partitioning flag :)

+1


source


Try using amqp to read messages from an azure service bus split queue and it will work for a split partition / queue and you don't even have to poll much.

const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'your_key_goes_here';
const serviceBusHost = 'namespace.servicebus.windows.net';
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`;
const queueName = 'partitionedQueueName';
const client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(() => Promise.all([client.createReceiver(queueName)]))
.spread((receiver) => {
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', (err) => {
        // check for errors
        console.log(err);
    });
    receiver.on('message', (message) => {
        console.log('Received message');
        console.log(message);
        console.log('----------------------------------------------------------------------------');
    });
})
.error((e) => {
    console.warn('connection error: ', e);
});

      

https://www.npmjs.com/package/amqp10

+1


source







All Articles