AMQP Rabbitmq Nodejs is an extra publisher created every time a subscriber goes down

I currently have the following situation:


publisher.js:

var queue_connection = new amqp.createConnection( { host: config.rabbitmq.host, port: config.rabbitmq.port }  );

queue_connection.on( 'ready', function () {

    var exchange = queue_connection.exchange( 'http_worker', { type: 'topic' } );
        exchange.on( 'open', function(){
            setInterval(function(){
                exchange.publish( 'AZ', 'This is test message' );
                logger.info( 'AZ message pushed' );
            }, 1000);
    })
});

      


subscriber.js:

var connection = amqp.createConnection({ host: config.rabbitmq.host, port: config.rabbitmq.port });

connection.on( 'ready', function () {

    connection.queue( 'AZ', function ( q ) {

        q.bind( 'http_worker', 'AZ' );

        q.subscribe(function ( message ) {
            console.log(unescape( message.data ))
        });
    });

});

      


The example above works great. When I run publisher.js and subscriber. Js, the subscriber will receive messages without any problem. But if I close the subscriber and start it again (without closing or starting publisher.js), the subscriber starts receiving 3 identical messages at once.

  • I'm guessing this is due to 3 callbacks in publisher.js, but I don't understand why these callbacks are called every time the caller goes down.
  • How can I prevent this? Are there any options for rabbitmq / amqp to make the publisher wait for the caller to reconnect.
  • Also, is it possible to achieve this without losing queued messages that are queued when the caller is not working?
  • Is there a difference when you start subscriber first and then publisher (and vice versa). How does it work, who needs to create the queue, subscriber or publisher?

Editorial staff:

Based:

By default, both queues and exchanges are automatically removed when there are no consumers (for queues) or queues (for exchanges) associated with them, which is what happens in your situation.

adding "autoDelete: false" fixed the problem.

+3


source to share


1 answer


By default, both queues and exchanges are automatically removed when there are no consumers (for queues) or queues (for exchanges) associated with them, which is what happens in your situation.

You can see that when adding an error event handler:

queue_connection.on('error', console.log);

      

When this happens, the connection is closed by RabbitMQ, the driver amqp

reconnects, it re-dispatches the event ready

, and you re-create the exchange.



However, the previous instance of the exchange (from the first connection) is still running and continues to post messages (which seems to be the case, although I suspect the message has been deleted).

So, after the first reconnection, you have two publishers sending messages. Disconnect the subscriber, get an error, reconnect, and the third publisher will start. Etc.

You probably need to keep track of the state of the connection, so make sure you clear any exchange instances or queues when the connection is reset.

+1


source







All Articles