Qt with ZeroMQ publish subscription template

I would like to use ZeroMQ (4.1.2) with Qt (5.2.1). The idea is to have zmq pub / sub (where the server is outside) and sub is a qt application. Currently fetching in Qt application starts once, can someone jump? Does the ZeroMQ receiver need to be done in some other way?

My code currently looks like this:

mainwindow.h

namespace Ui {
class MainWindow;
}

class MainWindow : public QMainWindow
{
Q_OBJECT

public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();

private slots:
    void on_pushButton_clicked();

    void readZMQData();

private:
    Ui::MainWindow *ui;
    QSocketNotifier *qsn;
    void *context;
    void *subscriber;

};

      

mainwindow.cpp

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);

    /***** ZMQ *****/

    context = zmq_ctx_new ();
    subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");

    char *filter = "";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
    unsigned int fd=0;
    size_t fd_size = sizeof(fd);
    rc = zmq_getsockopt(subscriber,ZMQ_FD,&fd,&fd_size);

    qsn = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(qsn, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);

}

MainWindow::~MainWindow()
{
    zmq_close (this->subscriber);
    zmq_ctx_destroy (this->context);
    delete ui;
}


void MainWindow::readZMQData()
{
    qsn->setEnabled(false);
    qDebug() << "Got data!";

    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";

        char *string = s_recv(subscriber);
        qDebug() << "DATA: " << string;
        free(string);
    }

    qsn->setEnabled(true);
}

      

And the server application (from the ZeroMQ examples):

#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

      

+3


source to share


3 answers


First tnx for help,

I found an issue where ZeroMQ notifies that there is a message to read, you need to read them all, not just the first one.



    void MainWindow::readZMQData(int fd)
{
    qsn->setEnabled(false);

    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";

        char *string;
        // THIS IS THE TRICK! READ UNTIL THERE IS MSG
        while((string = s_recv_nb(subscriber)) != NULL){
            qDebug() << "DATA: " << string;
            free(string);
        }
    }

    qsn->setEnabled(true);
}

      

+3


source


The socket id looks like it should work. Have you read the docs on correct handling? Especially if you're on Windows, it looks like there are special ways to deal with this when reading ... disconnecting, reading, etc.

http://doc.qt.io/qt-5/qsocketnotifier.html#details



Hope it helps.

+1


source


As unclear what this is s_recv_nb(zmq::socket_t & socket)

, I'll provide my - slightly more verbose - execution:

      void MainWindow::readZMQData()
      {
       qsn->setEnabled(false);

       int events = 0;
       std::size_t eventsSize = sizeof(events);
       zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
       if(events & ZMQ_POLLIN){
       qDebug() << " ======  Data to read ======";

       char *string;
       int64_t more;
       size_t more_size = sizeof (more);

       do {
           /* Create an empty ØMQ message to hold the message part */
           zmq_msg_t part;
           int rc = zmq_msg_init (&part);
           assert (rc == 0);
           rc = zmq_msg_recv (&part, subscriber, 0);
           assert (rc != -1);
           /* Determine if more message parts are to follow */
           rc = zmq_getsockopt (subscriber, ZMQ_RCVMORE, &more, &more_size);
           assert (rc == 0);
           string = (char*) zmq_msg_data(&part);
           qDebug() << QString(string) ; // << "more" << more;

           zmq_msg_close (&part); 
          } while (more);
       }

       qsn->setEnabled(true);
     }

      

And one more note: in case of Windows 64 filedescriptor fd

should be uint64_t

like in zmq_getsockopt returns EINVAL on x64 windows when sent local address of option_M ______________> option_val

0


source







All Articles