RabbitMQ implementation

Here's a little about the setup I have.

  • REST API for data entry (POST) into a queue
  • There is a Consumer in the queue that always runs and creates for Exchange Exchange
  • Exchange paths to multiple other queues (e.g. 20+)
  • Each of the (20+) queues performs a specific task ("Consumers are always running")
  • Cron job runs to check if all tasks (20+) have completed and another queue is issued

I'm not sure if I like users who work all the time, since each consumer uses about 300MB of Ram (I think it's MB, it's not in front of me at the moment) and I'm looking for another implementation.

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

      

I linked the question below, it was suggested to use RPC, but the problem with this is RPC (in my opinion) Will have multiple instances. This is resource intensive as it is and I think by adding RPC calls it will just start the server and then stop responding (please correct me if I'm wrong).

Another approach was to use the aggregator pattern

Which looks like exactly what I needed, but I found the documentation to be limited. Has anyone made this sample?

My question is, I am not happy with how this is currently implemented and I am looking for ways to improve the process. I am looking to either get rid of CRON, Introduce a new pattern and not constantly use Consumers.

The process currently only supports one instance of each user. It may have multiple consumers, but how we implemented it, we only wanted one at the time.

This is implemented in PHP, Symfony2 Framework using RabbitMQBundle

Related question:

+3


source to share


1 answer


OldSound is here, creator of the RabbitMQ package.

The component itself does not support the aggregator pattern out of the box, but you can implement it with basic php-amqplib.

To perform aggregation, you need to publish messages with a correlation ID and flow of that ID along the processing chain. The aggregator will then wait for X messages, depending on the number of different workers that you must process for that particular task. The way to wait for messages is to have an array that you saved as they are indexed by the correlation id.

Therefore, whenever you have an incoming message, you will do:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

      



And then somewhere you:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

      

I am actually working on the Symfony Workflow package right now and I plan to open it soon. This package can be used to implement an existing example that you present in a very simple way (i.e. you only need to provide services for each task).

Now I'm wondering why does every consumer get 300MB of RAM? Do you need to run a complete stack structure with them? If possible, create a new Symfony core for your consumer app and download what you need to reduce the overhead.

+5


source







All Articles