Pre-requisites
Redis Server
Redis can be installed directly on the machine by following this
installation guidelines by DigitalOcean.
However, for the examples shown here, I run the Redis server in a Docker container.
In case a system wide instance redis-server is running already shut it down first to avoid port conflicts:
1
|
$ /etc/init.d/redis-server stop
|
1
2
3
4
5
6
7
|
FROM redis:5.0.9-alpine
COPY redis.conf /etc/redis.conf
RUN chmod 777 /etc/redis.conf
CMD [ "redis-server", "/etc/redis.conf" ]
|
# redis.conf
requirepass my-secret-password
To build an image called “redis” run one of the following docker image build
commands:
1
2
3
|
$ docker image build --rm -t redis -f redis.Dockerfile . # if the file is called redis.Dockerfile
$ # OR
$ docker image build --rm -t redis .
|
To start the container from the created “redis” image
1
2
3
4
5
|
$ docker container run --rm -p 6379:6379 --name redis redis
$ # OR
$ docker container run --rm -d -p 6379:6379 --name redis redis # deamonized
$ # OR
$ docker container run --rm [-it] -p 6379:6379 --name redis redis # interactive (see below for CLI usage)
|
To test and interact with the running redis container, start it up and connect to it with redis-cli:
1
2
3
4
5
6
7
|
$ docker exec -it redis redis-cli -a my-secret-password [-h 127.0.0.1 -p 6379]
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> get hello
"world"
|
Press ctrl
+c
to close redis-cli.
Redis-Py
Python can interact with the Redis server using the package redis-py.
Install it in your environment:
1
|
(venv)$ pip install redis
|
Publish/Subscribe
Publisher
The publisher creates and publishes the message by providing the channel name and the message:
1
2
3
4
|
from redis import Redis
r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
r.publish('my-channel', 'a published message')
|
Python objects can also be published, but must be serialized first (with pickle
or json
):
1
2
3
4
5
6
7
8
|
import pickle
from redis import Redis
r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
d = {'job_id': '123-456-789', 'status' = 'done'}
r.publish('my-channel', pickle.dumps(d))
|
Subscriber
The subscriber can subscribe to a channel channel and retrieve the message from it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from redis import Redis
r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
p = r.pubsub()
p.subscribe('my-channel') # multiple args possible to subscribe to more channels
complete_message = p.get_message()
print(complete_message)
# this prints:
# {'pattern': None, 'type': 'message', 'channel': b'my_channel', 'data': b'a published message'}
complete_message = p.get_message()
print(complete_message)
# if no new messages have been published this prints:
# None
|
The published message can be found under p.get_message()['data']
.
In case of messages being seriealized, just de-serialize them:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import pickle
from redis import Redis
r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
p = r.pubsub()
p.subscribe('my-channel') # multiple args possible to subscribe to more channels
complete_message = p.get_message()
if complete_message:
message = pickle.loads(complete_message['data'])
print(message)
# this prints:
# {'job_id': '123-456-789', 'status' = 'done'}
|
Callback Message Handler
The subscriber can register a function to handle published messages:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import pickle
from redis import Redis
r = Redis.from_url('redis://:my-secret-password@127.0.0.1:6379/0')
p = r.pubsub()
def my_handler(message):
m = pickle.loads(message['data'])
print(f"Handler received this object: {m}")
p.subscribe(**{'my-channel': my_handler})
message = p.get_message()
# this will print
# Handler received this object: {'job_id': '123-456-789', 'status': 'done'}
print(message)
# this will print
# None
# because message has been consumed by the handler function.
|
Continuous Message Reading
Blocking
1
2
3
|
for message in p.listen():
# do something with message['data']
# eventually break out of the loop?
|
p.listen()
is blocking. The process blocks and waits until a message is published.
Not-Blocking
1
2
3
4
5
6
|
while True:
message = p.get_message()
if message:
# do something with message['data']
# eventually break out of the loop?
time.sleep(1)
|
p.get_message()
is not-blocking. If no new message has been published it simply returns None
.
Alternatively, an event loop may be started in a separate thread:
1
2
|
p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=1)
|
Messages must be processed by registered handler function when p.run_in_thread()
is used!
The thread can be stopped with