Why can't asyncio.Queue work as expected?

I am writing a simple producer / consumer program.

import zmq

@asyncio.coroutine
def receive_data(future,s):
        print("begin to recv sth from.....socket"
        my_data = s.recv()
        future.set_result(my_data)

@asyncio.coroutine
def producer(loop,q,s):
        while True:
                future = asyncio.Future()
                yield from receive_data(future,s)
                data = str(future.result())
                yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
       while True:
          a = yield from q.get()
          print("i am get..."+str(a)+"..."+str(type(a)))  
loop = asyncio.get_event_loop()

c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

      

It looks like there is no way for the consumer to fulfill.

Sockets receive data every 500ms, so when the function yield from

in the receive_data function suspends the accompanying producer copy, the consumer coroutine will print the information.

What can explain this?

+3


source to share


3 answers


s.recv()

blocks the call, so receive_data

hangs until a new ZMQ message appears.

This blocks the event loop and the consumer has no way of doing it himself.



You can pass a flag zmq.NOBLOCK

in .recv

and call asyncio.sleep(0)

if no data is available to give eventloop the ability to iterate over other ready tasks.

Or just use the aiozmq library :)

+3


source


You are mixing synchronous and asynchronous calls, the results will be synchronous. If you want to continue using asyncio, you must define an asynchronous context c = zmq.asyncio.context()

and use ROUTER socket s = c.socket(zmq.ROUTER)

. Then, following the asyncio syntax, you should get from recv_multipart()

in order for your to my_data = s.recv()

become my_data = yield from s.recv_multipart()

.



+1


source


Here's a guide to what should happen:

  • you must use Context

    and ZMQEventLoop

    fromzmq.asyncio

  • you have to make asyncio

    use of the zmq loop by callingset_event_loop()

  • just for testing, I switched to ROUTER_RAW connector

Working example:

import asyncio
import zmq
from zmq.asyncio import Context, ZMQEventLoop

async def receive_data(s):
        data = await s.recv()
        print('receive_data', data)
        return data

async def producer(q, s):
        while True:
                data = await receive_data(s)
                await q.put(data)

async def consumer(q):
       while True:
          a = await q.get()
          print('i got... {} ... {}'.format(a, type(a)))  

loop = ZMQEventLoop()
asyncio.set_event_loop(loop)

c = Context()
s = c.socket(zmq.ROUTER)
s.setsockopt(zmq.ROUTER_RAW, 1)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[producer(q, s), consumer(q)]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

      

You can check it with ROUTER_RAW using telnet:

$ telnet localhost 5515
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
test
abcd1234
^]
telnet> Connection closed.
$ 

      

The response from this app would be:

receive_data b'\x00\xa3\x8e\x1f)'
receive_data b''
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b'test\r\n'
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'test\r\n' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b'abcd1234\r\n'
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'abcd1234\r\n' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b''
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'' ... <class 'bytes'>

      

0


source







All Articles