yeti logo icon
Close Icon
contact us
We'll reply within 24 hours.
Thank you! Your message has been received!
A yeti hand giving a thumb's up
Oops! Something went wrong while submitting the form.

Establishing a Websocket PUBSUB server with Redis and Asyncio

By
-
April 17, 2017

If you've been following my never-ending quest for a light-sensor setup as detailed first in this post (set up a Raspberry Pi with Python 3.5) and then in this other post (make a light sensor with an Arduino and Raspberry Pi), you would know that I've only set up the data-collection half of this project.

The other half will involve getting the data to make hardware do stuff in the real world. This post in particular covers how to leverage a PUBSUB (publish-subscribe) Redis server and websockets to this purpose.

What is PUBSUB?

Publish Subscribe idea

The idea behind publish/subscribe is simple. There are subscribers, which are clients that listen for changes on a set of data. Then there are publishers, which are clients that edit the set of data. Whenever the data is edited by a publisher, all the subscribers that are subscribed to that publisher hear it and act on it! Redis is a data store that supports this behavior out of the box, so we'll be using it as our data store. For this light sensor project, we'll have one publisher (the light sensor) and potentially many subscribers (such as this browser page that is tracking the light levels in the Yeti office currently, a client program writing to a database, or even a smart mechanical shutter).

The High Level

We'll be running a Redis server and two Python websocket servers (one for accepting PUBLISH signals and another for accepting SUBSCRIBE signals) on a Debian VPS (i.e. an AWS EC2 instance, Digital Ocean droplet, etc.), exposing only the websocket server processes with Nginx. The websocket servers will access and edit the Redis server. I opted to use websockets rather than expose Redis directly simply because exposing Redis would potentially involve editing my iptables, which I am reluctant to do.

High level websocket/redis server architecture

Procedure

All the steps here will be done SSH'd in on the VPS.

Installing Redis Server

Redis is a server that is useful for storing key-value pairs in memory. For web devs, it tends to be used as a backend cache to speed up DB queries. By default it listens to signals on port 6379. To install this wonderful piece of software, the Redis folks have a pretty sweet set of documentation here. I especially like being able to interact with Redis directly through the redis-cli tool (which is handy for debugging). You can try it out after installation by typing redis-cli in the terminal.

Test out Redis Server

We can try out some Redis functionality ad-hoc with the redis-cli tool.

You can set values to arbitrary keys.

127.0.0.1:6379> SET test meOK

You can get values by key

127.0.0.1:6379> GET test"me"

Deleting key-value pairs is easy too.

127.0.0.1:6379> DEL test(integer) 1127.0.0.1:6379> GET test(nil)

Now let's try some PUBSUB behavior. I tend to run multiple terminals at a time to test multiple processes like this out. With two windows SSH'd into the VPS, (and redis-cli running on both), try subscribing to a channel on one...

127.0.0.1:6379> SUBSCRIBE test_channelReading messages... (press Ctrl-C to quit)1) "subscribe"2) "test_channel"3) (integer) 1

... while publishing on another...

127.0.0.1:6379> PUBLISH test_channel "Hello World"(integer) 1

... and then looking at the "subcribe" terminal to see the result.

127.0.0.1:6379> SUBSCRIBE test_channelReading messages... (press Ctrl-C to quit)1) "subscribe"2) "test_channel"3) (integer) 11) "message"2) "test_channel"3) "Hello World"  # <- There is the published message

So very cool.

More Setup

An approach then that makes sense would be to use the aioredis library in conjunction with the websockets library. At this point, you'll want to install Python 3.5 on this VPS, as well as set up a virtualenv for the Websocket servers. In the directory where all this code will live, you can create a requirements.txt file with the following dependencies...

aioredis==0.3.0appdirs==1.4.0hiredis==0.2.0websockets==3.2

... and once you've activated your Python virtual environment, run pip install -r path/to/requirements.txt

A Consumer Websocket Server

The websockets Python library is pretty well documented and lays out basic usage with two common patterns. One type of websocket server pattern is called a consumer, in which the websocket listens to incoming traffic and then pipes in the processed data to a "consumer" co-routine, in this example a call to Redis's PUBLISH function. Here is an example of this below:

# consumer.pyimport asynciofrom aioredis import create_connection, Channelimport websocketsasync def publish_to_redis(msg, path):    # Connect to Redis    conn = await create_connection(('localhost', 6379))    # Publish to channel "lightlevel{path}"    await conn.execute('publish', 'lightlevel{}'.format(path), msg)async def server(websocket, path):    try:        while True:            # Receive data from "the outside world"            message = await websocket.recv()            # Feed this data to the PUBLISH co-routine            await publish_to_redis(message, path)             await asyncio.sleep(1)    except websockets.exceptions.ConnectionClosed:        print('Connection Closed!')if __name__ == '__main__':    # Boiler-plate for the websocket server, running on localhost, port 8765    loop = asyncio.get_event_loop()    loop.set_debug(True)    ws_server = websockets.serve(server, 'localhost', 8765)    loop.run_until_complete(ws_server)    loop.run_forever()

There's a few things to note here:

A Producer Websocket Server

The other pattern is called a producer websocket server. This takes data from a producer coroutine and pushes it to the websocket client. In this case, the producer is the Redis SUBSCRIBE call, which produces data whenever data is published to the appropriate channel.

# producer.pyimport asynciofrom aioredis import create_connection, Channelimport websocketsasync def subscribe_to_redis(path):    conn = await create_connection(('localhost', 6379))    # Set up a subscribe channel    channel = Channel('lightlevel{}'.format(path), is_pattern=False)    await conn.execute_pubsub('subscribe', channel)    return channel, connasync def browser_server(websocket, path):    channel, conn = await subscribe_to_redis(path)    try:        while True:            # Wait until data is published to this channel            message = await channel.get()            # Send unicode decoded data over to the websocket client            await websocket.send(message.decode('utf-8'))    except websockets.exceptions.ConnectionClosed:        # Free up channel if websocket goes down        await conn.execute_pubsub('unsubscribe', channel)        conn.close()if __name__ == '__main__':    # Runs a server process on 8767. Just do 'python producer.py'    loop = asyncio.get_event_loop()    loop.set_debug(True)    ws_server = websockets.serve(browser_server, 'localhost', 8767)    loop.run_until_complete(ws_server)    loop.run_forever()

A quick NGINX configuration

I would create another file in /etc/nginx/conf.d with the below content, and then do service nginx restart to activate these changes. Because of port conflicts, the Nginx server will listen to 8766 and 8768 for the publisher and subscriber sockets respectively.

map $http_upgrade $connection_upgrade {    default upgrade;    '' close;}upstream publisherWebsocket {    server 127.0.0.1:8765;}upstream subscriberSocket {    server 127.0.0.1:8767;}server {    listen  8766;    location / {        proxy_pass http://publisherWebsocket;        proxy_http_version 1.1;        proxy_set_header Upgrade $http_upgrade;        proxy_set_header Connection $connection_upgrade;    }}server {    listen  8768;    location / {        proxy_pass http://subscriberSocket;        proxy_http_version 1.1;        proxy_set_header Upgrade $http_upgrade;        proxy_set_header Connection $connection_upgrade;    }}

Debugging

We've written all these code snippets, and at this point are not experiencing any glaring runtime errors (hopefully). How can we tell if everything is connected up correctly? You can try out the light sensor facing part of the setup by setting the PUBLISH_SOCKET_LINK environment variable on the Pi to the VPS's IP address or DNS at port 8766 with a specific path (i.e. ws://example.com:8766/<path>), running the consumer server, and then using redis-cli to SUBSCRIBE to the channel lightsensor<path>. If you see a stream of numbers corresponding to the light levels, congratulations! You can then try starting the producer server to expose the data to potential subscribers (which will listen on ws://example.com:8768/<path>)!

What did we accomplish?

So, provided that everything works you should have a server setup that:

The <channelPath> refers to an arbitrary string that when set tells Redis which "channel" to subscribe or publish to.

Next Steps

Some ideas for firming up this server include:

As for the front-end side of things, it seems that all there is left to do is write some subscriber clients to start doing things with that sweet, sweet data. Happy building!

You Might also like...

Busting 5 App Development Myths

The following are five pervasive myths about why app development should be a simple process — and the real reasons that a good product takes time.

Token-based Header Authentication for WebSockets behind Node.js

Using Node.JS to proxy requests to mutate them under the hood can be beneficial. In cases like these, it can also make your product more secure.

VIDEO: Building API's with Django and GraphQL

At our last Django Meetup Group event, Jayden Windle, the lead engineer at Jetpack, an on demand delivery company, talks building APIs with Django and GraphQL. Watch the video to learn more.

Browse all Blog Articles

Ready for your new product adventure?

Let's Get Started