ZeroMQ: is there a limit on the number of topics a SUB can subscribe to?

I am using the PUB / SUB socket pattern in ZeroMQ. The PUB produces financial data on stocks and publishes them. The theme is set like each stock code. On the SUB side, customers can subscribe to the data they want based on the stock code. PUB is written in C and SUB is written in Python.

However, something went wrong during testing. If only one stock code is set as the message filter on the SUB socket, everything works well. But when it comes to a large number of stocks, the program will close in a short time, reporting a "Segmentation fault (kernel reset)" error (see below for more details).

Here is the PUB (C) code :

while (1) {  
    int rc = 0;
    // send topic
    rc = zmq_send(pub_socket, topic, rc, ZMQ_SNDMORE);
    if (rc == -1) {
        // error handling
    }
    // send stock data
    rc = zmq_send(pub_socket, data, rc, 0);
    if (rc == -1) {
        // error handling
    }
}

      

Here is the SUB (Python) code :

import zmq

# initialize a SUB socket
ctx = zmq.Context.instance()
socket = ctx.socket(zmq.SUB)

# set socket options to filter message
for code in code_list:
    socket.setsockopt_string(zmq.SUBSCRIBE, code)

socket.connect(PUB_ADDR)
# recv data from PUB
while True:
    data = socket.recv()
    print(data)

      

I also used gdb to debug the program. Debug result looks like this: Debug result enter image description here

Does anyone know why the program is crashing? Any help is appreciated, thanks.


Update

If I replace the part setsockopt_string

with the following code, the Python script works well. Strange thing ... I need to dig deeper into the function setsockopt_string

.

New code in Python :

socket.setsockopt_string( zmq.SUBSCRIBE, "" )

      


Last update :

I ran the script provided by @ user3666197 and got a debug log. I only select a few parts of the magazine because it is very long.

Socket initialization Initialization Completed setsockopt_string setsockopt_string finished recv one msg and exited recv one msg and exited

+3


source to share


1 answer


Introduction:

PUB

-side uses ZeroMQ v 4.1.5; -side uses the ZeroMQ Python 16.0.2 wrapper SUB

Implicitly this makes the template PUB/SUB

, unlike previous generations of API, back to v 2.0 to rely on filtering , while reporting problems with -side reporting . PUB

SIGSEGV

SUB

Despite the stated hypothesis that filtering is the root cause, I recall some technical debate about big tree filtering issues, there is still one minor surprise as some of the posts about Trie-search added leaf-node also made a magic service. Please try to find this discussion again if it helps. ""

Initial remarks from Martin Sustrik refer to about ~ 10,000 subscriptions in the ZeroMQ filter so as not to create problems (with some higher numbers in further design discussions):

Effective subscription compliance

ZeroMQ uses simple tries to store and map a subscription PUB/SUB

. The subscription mechanism was designed for up to 10,000 subscribers where a simple trie works well. However, there are users who use up to 150,000,000 subscribers. In such cases, a more efficient data structure is needed. Thus, nanomsg uses the economical Patricia trie instead of the simple trie.

See this article for details .


Diagnose the cause, always using a step-by-step approach:

One small test modification will bring you closer to the open problem:

import zmq
pass;                                           print "DEBUG: Ok, zmq imported. [ver:{0:}]".format( zmq.pyzmq_version() )
#_______________________________________________# SETUP ZMQ:
ctx    = zmq.Context( 2 )                       # Context( nIOthreads )
pass;                                           print "DEBUG: Ok, zmq.Context() instantiated."  
socket = ctx.socket(  zmq.SUB )                 # Socket(  .SUB )
pass;                                           print "DEBUG: Ok, Socket instantiated."
socket.connect(           PUB_ADDR )            # .connect()
pass;                                           print "DEBUG: Ok, .connect() completed."
socket.setsockopt(   zmq.LINGER, 0 )            # explicit LINGER
pass;                                           print "DEBUG: Ok, .setsockopt( LINGER, 0 ) completed."
#_______________________________________________# SET FILTER:
for code in code_list:
    pass;                                       print "DEBUG: Going to set SUB side n-th filter: {0: > 1000d}. == [{1:}]".format( code_list.index( code ), repr( code ) ),
    socket.setsockopt_string( zmq.SUBSCRIBE, code )
    pass;                                       print "DEBUG: Ok, this one was done."
pass;                                           print "DEBUG: Ok, all items from <code_list> exhausted."
#_______________________________________________# LOOP FOREVER:
while True:
    try:
          print "LOOP: .recv() call."
          data = socket.recv()
          print "LOOP: .recv()-ed {0:}[B] repr()-ed as [{1:}]".format( len( data ), repr( data ) )

    except KeyboardInterrupt():
          print "EXC: Ctrl-C will terminate."

    except:
          print "EXC: will terminate."

    finally:
          pass;                                 print "DEBUG: Ok, finally: section entered:"
          socket.close()
          pass;                                 print "DEBUG: Ok, Socket instance .close() call returned"
          ctx.term()
          pass;                                 print "DEBUG: Ok, .Context() instance term()-ed"
          break

      

Considering the testing case described as just one and one , the other performance scaling and granular buffer management options won't blow up the issue at this point. Look at the results after running the mod-ed test and post a trivial DEBUG: log. PUB

SUB

Sending only about 3K messages per second won't be a problem either.




UPDATE: Missing Points - (1) Unicode Processing + (2) Filter Topic

(1) as shown in DEBUG: log you are mixing Unicode and simple C-byte arrays. These views MUST match - system-wide (from .send_string()

, through .setsockopt_string()

, to .recv_string()

)

data = socket.recv_string()           # AS YOUR DEBUG:log shows the b'mkt_bar...'

      

(2) Topic filters MUST match - otherwise the message will be parsed as unsigned ... so u'abc .... 'the filter matches u'abc ....' messages. Not otherwise:

setsockopt_string( option, optval, encoding='utf-8' )



An empty zero length must subscribe to all incoming messages. Non-empty must subscribe to all messages starting with the specified prefix. Multiple filters can be attached to the same ZMQ_SUB socket, in which case a message must be accepted if it matches at least one filter. optval

optval

DEBUG: The above log snippets show (well, PrintScreens ... - please, next time rather copy / paste an ASCII terminal rather than pictures, unless some GUI functionality is shown, right? Thanks ...) that your theme filters never match in a certain sense. Fix it. System-wide.

ZeroMQ shouldn't be blamed for this, Unicode + C byte arrays just can't work and headbang if mixed or in the wrong calling interfaces.


Epilogue:

If you still blame the filtering capabilities of the ZeroMQ topic, the simplest a / b-test to (dis) -adapt the Null hypothesis is to run the VERY SAME test, but only with 5-topic filters. If this is a failure, your hypothesis about bandwidth limitations was wrong.

Keep walking!

+1


source







All Articles