React ZMQ, Router to Dealer, doesn't work. ZMQ 4.0.5

I want to make an asynchronous messaging router with React, but it doesn't work. The code at http://zguide.zeromq.org/php:rtdealer works, but I can't figure out what I'm doing otherwise. I am using libzmq 4.0.5

Here is my code:

$context = new React\ZMQ\Context($loop);

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
    echo 'Time to send!'. PHP_EOL;
    $i++;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

      

The problem is that only the dealer sends the first "END" message. The router then tries to send messages, but the dealer does not receive them.

Also, this function seems to be called only once:

// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
    $messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
    echo 'Receiving...';    // Added
    var_dump($messages);    // Added

    if ($messages !== false) {
        if (count($messages) === 1) {
            $this->emit('message', array($messages[0]));
        }

        $this->emit('messages', array($messages));
    }
}

      

Output:

Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...

      

Edit:

Changed the code to bind the router before the dealer connects to it, the problem still happens:

$loop = React\EventLoop\Factory::create();

$context = new React\ZMQ\Context($loop);

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
    echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);

$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
    echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
    $worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
    $worker->connect('tcp://127.0.0.1:5556');
    $worker->send('END');
    $timer->getLoop()->cancelTimer($timer);     // Cancel the timer after connecting
});

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

      

This is the terminal output:

After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds

      

+3


source to share


1 answer


I finally solved the problem. Helped @Jason's comment Didn't use PHP / React, are you sure your namespace is correct ?, I looked at the method send

. This method looks like this:

// \React\ZMQ\SocketWrapper

public function send($message)
{
    echo 'Inside send, sending message'. PHP_EOL; //This line was added by myself
    $this->buffer->send($message);
}

      

Then I thought: ok if I call this method like this:

$router->send('A', \ZMQ::MODE_SNDMORE);

      

The second argument is never passed.

Then I saw that the method was sendmulti

not implemented in SocketWrapper

, but the method __call

was implemented in this class SocketWrapper

to call the method implemented in the ZMQ api for PHP, so I tried to call the sendmulti

wrapping of these two parameters in an array. It worked, so I tried to call the first method send

passing the same array and I don't know why, but it worked too, so the trick was calling:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('END'));

      



Instead:

$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');

      

Also, be careful with the ZMQ PHP API residing in the GitHub repository, this is wrong. PhpStorm downloaded some other ZMQ PHP API located somewhere which seems to be correct, the prototype of the method sendmulti

looks like this in this API:

public function sendmulti(array $message, $mode = 0)

      

Surprisingly, the following calls to this method give the same results:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('A'), \ZMQ::MODE_SNDMORE);

      

+2


source







All Articles