Communication between asyncio protocol / server
I'm trying to create a Server Side Events server that I can telnet with and the telnet content will be pushed to the browser. The idea behind using Python and asyncio is to use as little CPU as possible, as this will work on a Raspberry Pi.
So far, I have the following which uses the library found here: https://pypi.python.org/pypi/asyncio-sse/0.1 which uses asyncio.
And I also copied the telnet server which uses asyncio as well.
Both work separately, but I have no idea how to link them together. As I understand it, I need to call send()
in the class SSEHandler
from within Telnet.data_received
, but I don't know how to access it. Both of these "servers" must be started in a loop to accept new connections or to move data.
Can anyone help or point me in a different direction?
import asyncio
import sse
# Get an instance of the asyncio event loop
loop = asyncio.get_event_loop()
# Setup SSE address and port
sse_host, sse_port = '192.168.2.25', 8888
class Telnet(asyncio.Protocol):
def connection_made(self, transport):
print("Connection received!");
self.transport = transport
def data_received(self, data):
print(data)
self.transport.write(b'echo:')
self.transport.write(data)
# This is where I want to send data via SSE
# SSEHandler.send(data)
# Things I've tried :(
#loop.call_soon_threadsafe(SSEHandler.handle_request());
#loop.call_soon_threadsafe(sse_server.send("PAH!"));
def connection_lost(self, esc):
print("Connection lost!")
telnet_server.close()
class SSEHandler(sse.Handler):
@asyncio.coroutine
def handle_request(self):
self.send('Working')
# SSE server
sse_server = sse.serve(SSEHandler, sse_host, sse_port)
# Telnet server
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777))
#telnet_server.something = sse_server;
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())
source to share
Server side events are a kind of HTTP protocol; and you can have any number of concurrent HTTP requests in flight at any given moment, you can have zero if no one is connected, or dozens. This nuance is completed in two designs sse.serve
and sse.Handler
; the former represents a single listening port that sends each individual client request to the latter.
In addition, it sse.Handler.handle_request()
is called once for each client, and the client disconnects after this joint procedure completes. In your code, this coroutine is terminated immediately, and therefore the client sees one "work" event. So we have to wait, more or less forever. We can do this with yield from
ing asyncio.Future()
.
The second problem is that we need to somehow hold on to all the individual instances SSEHandler()
and use a method send()
on each of them. Well, we can have each independent accounting in our methods handle_request()
; by adding each of these to a dict that maps instances of the individual handlers to the future they are waiting for.
class SSEHandler(sse.Handler):
_instances = {}
@asyncio.coroutine
def handle_request(self):
self.send('Working')
my_future = asyncio.Future()
SSEHandler._instances[self] = my_future
yield from my_future
Now, to dispatch an event for each listen, we simply visit all the SSEHandler instances registered in the created dict and use it send()
for each one.
class SSEHandler(sse.Handler):
#...
@classmethod
def broadcast(cls, message):
for instance, future in cls._instances.items():
instance.send(message)
class Telnet(asyncio.Protocol):
#...
def data_received(self, data):
#...
SSEHandler.broadcast(data.decode('ascii'))
Finally, your code exits when the telnet connection is closed. that's fine, but we have to clean up at that time too. Luckily, it's just a matter of determining the outcome for all futures for all handlers
class SSEHandler(sse.Handler):
#...
@classmethod
def abort(cls):
for instance, future in cls._instances.items():
future.set_result(None)
cls._instances = {}
class Telnet(asyncio.Protocol):
#...
def connection_lost(self, esc):
print("Connection lost!")
SSEHandler.abort()
telnet_server.close()
here's a full working reset if my illustration isn't obvious.
import asyncio
import sse
loop = asyncio.get_event_loop()
sse_host, sse_port = '0.0.0.0', 8888
class Telnet(asyncio.Protocol):
def connection_made(self, transport):
print("Connection received!");
self.transport = transport
def data_received(self, data):
SSEHandler.broadcast(data.decode('ascii'))
def connection_lost(self, esc):
print("Connection lost!")
SSEHandler.abort()
telnet_server.close()
class SSEHandler(sse.Handler):
_instances = {}
@classmethod
def broadcast(cls, message):
for instance, future in cls._instances.items():
instance.send(message)
@classmethod
def abort(cls):
for instance, future in cls._instances.items():
future.set_result(None)
cls._instances = {}
@asyncio.coroutine
def handle_request(self):
self.send('Working')
my_future = asyncio.Future()
SSEHandler._instances[self] = my_future
yield from my_future
sse_server = sse.serve(SSEHandler, sse_host, sse_port)
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777))
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())
source to share