This page looks best with JavaScript enabled

Pubsub with Redis & Redis-py

 ·  ☕ 4 min read

Pre-requisites

Redis Server

Redis can be installed directly on the machine by following this
installation guidelines by DigitalOcean.

However, for the examples shown here, I run the Redis server in a Docker container.

In case a system wide instance redis-server is running already shut it down first to avoid port conflicts:

1
$ /etc/init.d/redis-server stop
1
2
3
4
5
6
7
FROM redis:5.0.9-alpine

COPY redis.conf /etc/redis.conf

RUN chmod 777 /etc/redis.conf

CMD [ "redis-server", "/etc/redis.conf" ]
# redis.conf
requirepass my-secret-password

To build an image called “redis” run one of the following docker image build commands:

1
2
3
$ docker image build --rm -t redis -f redis.Dockerfile .  # if the file is called redis.Dockerfile
$ # OR
$ docker image build --rm -t redis .

To start the container from the created “redis” image

1
2
3
4
5
$ docker container run --rm -p 6379:6379 --name redis redis
$ # OR
$ docker container run --rm -d -p 6379:6379 --name redis redis  # deamonized
$ # OR
$ docker container run --rm [-it] -p 6379:6379 --name redis redis  # interactive (see below for CLI usage)

To test and interact with the running redis container, start it up and connect to it with redis-cli:

1
2
3
4
5
6
7
$ docker exec -it redis redis-cli -a my-secret-password [-h 127.0.0.1 -p 6379]
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> get hello
"world"

Press ctrl+c to close redis-cli.

Redis-Py

Python can interact with the Redis server using the package redis-py.

Install it in your environment:

1
(venv)$ pip install redis

Publish/Subscribe

Publisher

The publisher creates and publishes the message by providing the channel name and the message:

1
2
3
4
from redis import Redis

r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
r.publish('my-channel', 'a published message')

Python objects can also be published, but must be serialized first (with pickle or json):

1
2
3
4
5
6
7
8
import pickle
from redis import Redis

r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')

d = {'job_id': '123-456-789', 'status' = 'done'}
r.publish('my-channel', pickle.dumps(d))

Subscriber

The subscriber can subscribe to a channel channel and retrieve the message from it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from redis import Redis

r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')

p = r.pubsub()
p.subscribe('my-channel')  # multiple args possible to subscribe to more channels

complete_message = p.get_message()
print(complete_message)  
# this prints:
# {'pattern': None, 'type': 'message', 'channel': b'my_channel', 'data': b'a published message'}

complete_message = p.get_message()  
print(complete_message) 
# if no new messages have been published this prints:
# None

The published message can be found under p.get_message()['data'].

In case of messages being seriealized, just de-serialize them:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import pickle
from redis import Redis

r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')

p = r.pubsub()
p.subscribe('my-channel')  # multiple args possible to subscribe to more channels

complete_message = p.get_message()
if complete_message:
    message = pickle.loads(complete_message['data'])
    print(message)  
    # this prints:
    # {'job_id': '123-456-789', 'status' = 'done'}

Callback Message Handler

The subscriber can register a function to handle published messages:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import pickle
from redis import Redis

r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')

p = r.pubsub()

def my_handler(message):
    m = pickle.loads(message['data'])
    print(f"Handler received this object: {m}")

p.subscribe(**{'my-channel': my_handler})

message = p.get_message()
# this will print
# Handler received this object: {'job_id': '123-456-789', 'status': 'done'}
print(message)
# this will print
# None
# because message has been consumed by the handler function.

Continuous Message Reading

Blocking

1
2
3
for message in p.listen():
    # do something with message['data']
    # eventually break out of the loop?

p.listen() is blocking. The process blocks and waits until a message is published.

Not-Blocking

1
2
3
4
5
6
while True:
    message = p.get_message()
    if message:
        # do something with message['data']
        # eventually break out of the loop?
    time.sleep(1)

p.get_message() is not-blocking. If no new message has been published it simply returns None.

Alternatively, an event loop may be started in a separate thread:

1
2
p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=1)
Messages must be processed by registered handler function when p.run_in_thread() is used!

The thread can be stopped with

1
thread.stop()

Philipp Westphal
WRITTEN BY
Philipp Westphal
Software Developer & Structural Engineer