Qt with ZeroMQ publish subscription template
I would like to use ZeroMQ (4.1.2) with Qt (5.2.1). The idea is to have zmq pub / sub (where the server is outside) and sub is a qt application. Currently fetching in Qt application starts once, can someone jump? Does the ZeroMQ receiver need to be done in some other way?
My code currently looks like this:
mainwindow.h
namespace Ui {
class MainWindow;
}
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_pushButton_clicked();
void readZMQData();
private:
Ui::MainWindow *ui;
QSocketNotifier *qsn;
void *context;
void *subscriber;
};
mainwindow.cpp
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
/***** ZMQ *****/
context = zmq_ctx_new ();
subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
char *filter = "";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
unsigned int fd=0;
size_t fd_size = sizeof(fd);
rc = zmq_getsockopt(subscriber,ZMQ_FD,&fd,&fd_size);
qsn = new QSocketNotifier(fd, QSocketNotifier::Read, this);
connect(qsn, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);
}
MainWindow::~MainWindow()
{
zmq_close (this->subscriber);
zmq_ctx_destroy (this->context);
delete ui;
}
void MainWindow::readZMQData()
{
qsn->setEnabled(false);
qDebug() << "Got data!";
int events = 0;
std::size_t eventsSize = sizeof(events);
zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
if(events & ZMQ_POLLIN){
qDebug() << " ====== Data to read ======";
char *string = s_recv(subscriber);
qDebug() << "DATA: " << string;
free(string);
}
qsn->setEnabled(true);
}
And the server application (from the ZeroMQ examples):
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
source to share
First tnx for help,
I found an issue where ZeroMQ notifies that there is a message to read, you need to read them all, not just the first one.
void MainWindow::readZMQData(int fd)
{
qsn->setEnabled(false);
int events = 0;
std::size_t eventsSize = sizeof(events);
zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
if(events & ZMQ_POLLIN){
qDebug() << " ====== Data to read ======";
char *string;
// THIS IS THE TRICK! READ UNTIL THERE IS MSG
while((string = s_recv_nb(subscriber)) != NULL){
qDebug() << "DATA: " << string;
free(string);
}
}
qsn->setEnabled(true);
}
source to share
The socket id looks like it should work. Have you read the docs on correct handling? Especially if you're on Windows, it looks like there are special ways to deal with this when reading ... disconnecting, reading, etc.
http://doc.qt.io/qt-5/qsocketnotifier.html#details
Hope it helps.
source to share
As unclear what this is s_recv_nb(zmq::socket_t & socket)
, I'll provide my - slightly more verbose - execution:
void MainWindow::readZMQData()
{
qsn->setEnabled(false);
int events = 0;
std::size_t eventsSize = sizeof(events);
zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
if(events & ZMQ_POLLIN){
qDebug() << " ====== Data to read ======";
char *string;
int64_t more;
size_t more_size = sizeof (more);
do {
/* Create an empty ΓMQ message to hold the message part */
zmq_msg_t part;
int rc = zmq_msg_init (&part);
assert (rc == 0);
rc = zmq_msg_recv (&part, subscriber, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (subscriber, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
string = (char*) zmq_msg_data(&part);
qDebug() << QString(string) ; // << "more" << more;
zmq_msg_close (&part);
} while (more);
}
qsn->setEnabled(true);
}
And one more note: in case of Windows 64 filedescriptor fd
should be uint64_t
like in zmq_getsockopt returns EINVAL on x64 windows when sent local address of option_M ______________> option_val
source to share