Dealer ZeroMQ - high latency compared to winsock

My company is researching the use of ZeroMQ as a transport mechanism. I first compared the results to see what I was playing with.

So I created an app comparing dealer vs dealer zmq vs winsock setup. I posted a message about sending synchronous messages from client to server and then I calculated the average.

Here's a winsock server running:

DWORD RunServerWINSOCKTest(DWORD dwPort)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        printf("WSAStartup failed with error: %d\n", iRet);
        return iRet;
    }

    struct addrinfo hints;
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    struct addrinfo *result = NULL;
    iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
    if (iRet != 0)
    {
        WSACleanup();
        return iRet;
    }

    SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
    if (ListenSocket == INVALID_SOCKET)
    {
        freeaddrinfo(result);
        WSACleanup();
        return WSAGetLastError();
    }

    iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
    if (iRet == SOCKET_ERROR)
    {
        freeaddrinfo(result);
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    freeaddrinfo(result);
    iRet = listen(ListenSocket, SOMAXCONN);
    if (iRet == SOCKET_ERROR)
    {
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    while (true)
    {
        SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
        if (ClientSocket == INVALID_SOCKET)
        {
            closesocket(ListenSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        char value = 0;
        setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));

        char recvbuf[DEFAULT_BUFLEN];
        int recvbuflen = DEFAULT_BUFLEN;
        do {

            iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
            if (iRet > 0) {
            // Echo the buffer back to the sender
                int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
                if (iSendResult == SOCKET_ERROR)
                {
                    closesocket(ClientSocket);
                    WSACleanup();
                    return WSAGetLastError();
                }
            }
            else if (iRet == 0)
                printf("Connection closing...\n");
            else  {
                closesocket(ClientSocket);
                WSACleanup();
                return 1;
            }

        } while (iRet > 0);

        iRet = shutdown(ClientSocket, SD_SEND);
        if (iRet == SOCKET_ERROR)
        {
            closesocket(ClientSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        closesocket(ClientSocket);
    }
    closesocket(ListenSocket);

    return WSACleanup();
}

      

Here is a client running winsock:

DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        return iRet;
    }

    SOCKET ConnectSocket = INVALID_SOCKET;
    struct addrinfo *result = NULL,  *ptr = NULL, hints;


    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
    if (iResult != 0) {
        WSACleanup();
        return 1;
    }

    for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
        ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
        if (ConnectSocket == INVALID_SOCKET) {
            WSACleanup();
            return 1;
        }

        iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            ConnectSocket = INVALID_SOCKET;
            continue;
        }
        break;
    }

    freeaddrinfo(result);

    if (ConnectSocket == INVALID_SOCKET) {
        WSACleanup();
        return 1;
    }


    // Statistics
    UINT64 uint64BytesTransmitted = 0;
    UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
    UINT64 uint64WaitForResponse = 0;

    DWORD dwMessageCount = 1000000;

    CHAR cRecvMsg[DEFAULT_BUFLEN];
    SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);

    std::string strSendMsg(dwMessageSize, 'X');

    for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
    {
        int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
        if (iRet == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        uint64BytesTransmitted += strSendMsg.size();

        UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
        if (iRet < 1)
        {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        std::string strMessage(cRecvMsg);

        if (strMessage.compare(strSendMsg) == 0)
        {
            uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
        }
        else
        {
            return NO_ERROR;
        }
}

    UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
    PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);

    iResult = shutdown(ConnectSocket, SD_SEND);
    if (iResult == SOCKET_ERROR) {
        closesocket(ConnectSocket);
        WSACleanup();
        return 1;
    }
    closesocket(ConnectSocket);
    return WSACleanup();
}

      

Here is the server running ZMQ (dealer)

DWORD RunServerZMQTest(DWORD dwPort)
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t server(context, ZMQ_DEALER);

        // Set options here
        std::string strIdentity = s_set_id(server);
        printf("Created server connection with ID: %s\n", strIdentity.c_str());

        std::string strConnect = "tcp://*:" + std::to_string(dwPort);
        server.bind(strConnect.c_str());

        bool bRunning = true;
        while (bRunning)
        {
            std::string strMessage = s_recv(server);

            if (!s_send(server, strMessage))
            {
                return NO_ERROR;
            }
        }
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

return NO_ERROR;

      

}

Here is a customer running ZMQ (dealer)

DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    try
    {
        zmq::context_t ctx(1);
        zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ

        // Set options here
        std::string strIdentity = s_set_id(client);

        std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
        client.connect(strConnect.c_str());

        if(s_send(client, "INIT"))
        {
            std::string strMessage = s_recv(client);
            if (strMessage.compare("INIT") == 0)
            {
                printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
            }
            else
            {
                return NO_ERROR;
            }
        }
        else
        {
            return NO_ERROR;
        }


        // Statistics
        UINT64 uint64BytesTransmitted   = 0;
        UINT64 uint64StartTime          = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        UINT64 uint64WaitForResponse    = 0;

        DWORD dwMessageCount = 10000000;


        std::string strSendMsg(dwMessageSize, 'X');
        for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
        {
            if (s_send(client, strSendMsg))
            {
                uint64BytesTransmitted += strSendMsg.size();

                UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
                std::string strRecvMsg = s_recv(client);
                if (strRecvMsg.compare(strSendMsg) == 0)
                {
                    uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
                }
                else
                {
                    return NO_ERROR;
                }
            }
            else
            {
                return NO_ERROR;
            }
        }
        UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
        PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

    return NO_ERROR;
    }

      

I run the test locally with a message size of 5 bytes and I get the following result:

WINSOCK

Messages sent:                 1 000 000
Time elapsed (us):            48 019 415
Time elapsed (s):                     48.019 415
Message size (bytes):                  5
Msg/s:                            20 825
Bytes/s:                         104 125
Mb/s:                                  0.099
Total   response time (us):   24 537 376
Average repsonse time (us):           24.0

      

and

ZeroMQ

Messages sent:                 1 000 000
Time elapsed (us):           158 290 708
Time elapsed (s):                    158.290 708    
Message size (bytes):                  5
Msg/s:                             6 317
Bytes/s:                          31 587
Mb/s:                                  0.030
Total   response time (us):  125 524 178    
Average response time (us):          125.0

      

Can anyone explain why the average response time is much higher when using the ZMQ?

The goal is to find a setup where I can send and receive messages asynchronously without requiring a response. If this can be achieved with a different installation than the dealer dealer, please let me know!

+3


source to share


3 answers


This is just an answer to a small part of your question, but here goes -

Why do you need a dealer / dealer? I'm guessing that communication can start from anywhere? You are not tied to the dealer / dealer, specifically it restricts you to only two endpoints, if you ever add another endpoint on either side of a message, say a second customer, then each customer will only receive half of the messages as the dealer is strictly round.

What you need for asynchronous communication is some combination of dealer and router connectors. Also no answer required, the main differences are in how they choose which connected peer to send a message:

  • The dealer is said to be strictly circular, it will send to every connected peer in the series
  • A router is a strongly addressed message, you must know the "name" of the partner you want to send in order to receive a message there.

The two types of sockets work together because dealer sockets (and inquiry sockets, dealer is a "inquiry type connector") send their "name" as part of a message that a router socket can use to send data back. This is a request / response paradigm, and you will see such a paradigm apply in all examples in the manual , but you can bend that paradigm to what you are looking for, in particular, neither the dealer nor the router requires a response.



Without knowing your full requirements, I cannot tell you which ZMQ architecture I would choose, but in general I prefer the extensibility of router sockets, it is easier to handle the appropriate addressing than it is necessary to train everything to a single peer ... you you will see warnings against router / router execution, and I agree with them insofar as you have to understand what you are doing before you try, but understand what you are doing, the implementation is not that difficult.


You also have the option, if it suits your requirements, to configure each end with a pub socket and each with an additional socket if there are literally no answers. If it strictly feeds data from the source to the target, and neither partner needs any feedback on what it is sending, then this is probably the best choice, although that means you are dealing with two sockets per end, not one.


None of these addresses directly impact performance, but it is important to understand that zmq sockets are optimized for specific use cases, and as pointed out in John Jeffery's answer, you are breaking that use case for your dealer socket, the messaging in your test is strictly synchronous ... The first thing to start with is refining your ZMQ architecture and then simulating the actual message flow, specifically not adding arbitrary waits and synchronicity, which will inevitably change the way you display bandwidth as you are testing it, largely by definition.

+4


source


You say you want to send and receive messages asynchronously without asking for a response. However, the tests performed so far are completely synchronous, mostly request-response, but at the dealer's dealer outlet. Something is not calculated there. Why not run tests that more closely mimic the design you are targeting?

ZeroMQ gets a fair amount of "faster than TCP" by queuing messages into a single message. Obviously, this mechanism cannot be activated in a purely synchronous design with one message in flight at a time.



As to why this particular test, very small messages sent and received extremely synchronously, are relatively slow, I cannot say. Have you done profiling? I'll say it again that doing this test and making decisions on it doesn't make sense if it doesn't look exactly like your final design.

One thing that looks strange is the try / catch block in the ZeroMQ code. This doesn't seem fair because the winsock test was not written that way. There is a fair amount of overhead in try / catch.

+3


source


The OPs problem is a matter of bandwidth, not latency, and is most likely a matter of the pattern used in the examples provided. However, you will probably always find which ZeroMQ

has a higher latency, which I will explain, although in this situation it might not be suitable for the OP.

ZeroMQ

works by buffering messages. Imagine (as the main illustration) creating and adding to it many small lines (thousands, each of which contains a small header to see the size of these small segments) and then sending the big line in the intervals , , or something else. On the receiving end, a large string is fetched, and each smaller message is removed one at a time based on the size header that is sent along with it. This allows you to potentially send millions of messages in batches (although std::string

100us

1000us

10ms

std::string

obviously a bad choice) without the overhead of shipping those millions of very small flaws one at a time. As a result, you make full use of your network resources and increase your bandwidth, you also create basic behavior FIFO

. However, you are also creating a delay to allow the buffer to fill, which means an increase in latency.

Imagine (again, as a basic illustration): if you spend half a second (including string operations, etc.) buffering a million messages, it will increase the string by several megabytes. Modern networks can easily send this large string in the remaining half a second. 1000000us

(1 second) / 1,000,000 messages will be per message, right? False - all messages had a half second latency to allow the queue to fill, resulting in a half second latency for all messages. sends batches much faster than each one , but the latency increase that this illustrates still occurs in , although it usually occurs across lines of multiple . 1us

ZeroMQ

500ms

ZeroMQ

ms

+2


source







All Articles