Zero-mq: socket.recv () call will block
I'm trying to use zero-mq.My's requirement is very simple. I want to be able to communicate between two peers on a network. I found this program in examples in the book.
$ pub_server.py
import zmq
import random
import sys
import time
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
topic = random.randrange(9999,10005)
messagedata = random.randrange(1,215) - 80
print "%d %d" % (topic, messagedata)
socket.send("%d %d" % (topic, messagedata))
time.sleep(1)
$sub_client.py
import sys
import zmq
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
# Process 5 updates
total_value = 0
for update_nbr in range (5):
string = socket.recv()
topic, messagedata = string.split()
total_value += int(messagedata)
print ('{} {}'.format(topic, messagedata))
print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))
The problem with this model is that
string = socket.recv()
blocks until the message is received. I don't want this to happen. I need messages to be queued on the receiving side so that I can pull it out of the queue (or something like that)
Is there some model in zero-mq that allows this?
source to share
zmq.Socket.recv will not block if you pass the flag parameter zmq.NOBLOCK
.
The docs say:
If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.
zmq will queue the messages it receives and one message will be returned for each call to recv () until that queue is exhausted, at which point a ZMQError is raised.
zmq.Again used in the examples below is wrapper for zmq.EAGAIN
.
For example:
while True:
try:
#check for a message, this will not block
message = socket.recv(flags=zmq.NOBLOCK)
#a message has been received
print "Message received:", message
except zmq.Again as e:
print "No message received yet"
# perform other important stuff
time.sleep(10)
An example sub_client.py
can be written to use non-blocking behavior like this:
import sys, time
import zmq
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
try:
#process all messages waiting on subscribe socket
while True:
#check for a message, this will not block
string = socket.recv(flags=zmq.NOBLOCK)
#message received, process it
topic, messagedata = string.split()
total_value += int(messagedata)
print ('{} {}'.format(topic, messagedata))
#check if we have all the messages we want
received_value_count += 1
if received_value_count > 4:
do_receive_loop = False
break
except zmq.Again as e:
#No messages waiting to be processed
pass
#Here we can do other stuff while waiting for messages
#contemplate answer to 'The Last Question'
time.sleep(15)
print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"
print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))
source to share