Patrique Ouimet
Senior Product Engineer
Sun, May 15, 2022 9:55 AM
In this article we'll explain how to setup Redis Streams as our message broker using PHP. We will configure a producer (creates messages) and a few simple consumers (reads messages).
NOTE: there is a demo code repository if you want to see it in action that can be found here: https://github.com/patoui/demo_redis_streams
Below we'll show how to setup a producer and a consumer for a bank account's transactions. First, we'll setup a producer for creating messages which will be transactions in our example. Second, we'll setup a consumer to update the accounts current balance based on the incoming messages which are the transactions.
<?php
declare(strict_types=1);
$redis = new Redis();
$redis->connect('redis');
$transactions = [
['account_no' => 'ABC321', 'value' => 1000],
['account_no' => 'ABC321', 'value' => -100],
['account_no' => 'XYZ987', 'value' => 555],
['account_no' => 'XYZ987', 'value' => -22],
];
foreach ($transactions as $transaction) {
$redis->xAdd('transactions', '*', $transaction);
}
In the above example we've added 2 transactions that have occurred against the account ABC321
and 2 transactions against the XYZ987
account.
Let's break down the individual components:
$transactions
is a list of messages to be put onto a stream (aka topic)xAdd
will add messages to the given stream name'transactions'
'*'
which tells Redis to auto-generate the ID for us. These auto generated IDs consist of Unix timestamp in milliseconds (e.g. 1526919030474
) followed by a hyphen -
an auto-incrementing integer (e.g 55
). An example looks like 1526919030474-55
To consume these messages (transactions), we need to create a consumer
<?php
declare(strict_types=1);
$redis = new Redis();
$redis->connect('redis');
$stream_name = 'transactions';
$group_name = 'balance_manager';
$consumer_name = 'consumer_1';
$redis->xGroup('CREATE', $stream_name, $group_name, '0');
$stream_messages = $redis->xReadGroup(
$group_name,
$consumer_name,
[$stream_name => '>']
);
if (!$stream_messages) {
die('No messages' . PHP_EOL);
}
$transactions = $stream_messages[$stream_name] ?? null;
if (!$transactions) {
die('No transactions' . PHP_EOL);
}
$processed_message_ids = [];
$balances = [];
foreach ($transactions as $message_id => $transaction) {
if (!isset($balances[$transaction['account_no']])) {
$balances[$transaction['account_no']] = 0;
}
$balances[$transaction['account_no']] += $transaction['value'];
$processed_message_ids[] = $message_id;
}
foreach ($balances as $account_no => $balance) {
echo "Account No {$account_no}, current balance: {$balance}" . PHP_EOL;
}
$redis->xAck($stream_name, $group_name, $processed_message_ids);
Let's breakdown the consumer:
'transactions'
stream under the group named 'balance_manager'
and the consumer named 'consumer_1'
. Once the messages have successfully been processed they will be acknowledged to notify Redis internals that the message was read and processed successfully.xGroup
is used to create the consumer group 'balance_manage'
xReadGroup
is how we'll read the messages with a consumer group.'balance_manager'
.'consumer_1'
.'transactions'
as seen in the producer. You'll notice we've set the array key 'transactions'
value to '>'
, this tells Redis to only give us the new messages.The return value of xReadGroup
is an associative array where the keys are the stream names which contain their messages. The output from our example will look as follows (keys may vary based on your systems current time):
[
'transactions' => [
'1652618603161-0' => ['account_no' => 'ABC321', 'value' => 1000],
'1652618603161-1' => ['account_no' => 'ABC321', 'value' => -100],
'1652618603161-2' => ['account_no' => 'XYZ987', 'value' => 555],
'1652618603161-3' => ['account_no' => 'XYZ987', 'value' => -22],
]
]
Given the above, we extract the stream we're interested in by accessing the key, in the example that is 'transactions'
// $stream_name = 'transactions'
$transactions = $stream_messages[$stream_name] ?? null;
Now we can loop over the messages (transactions). While looping we need to keep track of which messages we've processed successfully ($processed_message_ids
) so that we acknowledge them afterwards. This can be useful as if there's an issue (e.g. exception thrown) with one of the messages (transactions) it will remain unacknowledged so that we may retry processing the message.
$processed_message_ids = [];
$balances = [];
foreach ($transactions as $message_id => $transaction) {
if (!isset($balances[$transaction['account_no']])) {
$balances[$transaction['account_no']] = 0;
}
$balances[$transaction['account_no']] += $transaction['value'];
$processed_message_ids[] = $message_id;
}
foreach ($balances as $account_no => $balance) {
echo "Account No {$account_no}, current balance: {$balance}" . PHP_EOL;
}
If all messages are successful, the above should output
Account No ABC321, current balance: 900
Account No XYZ987, current balance: 533
To briefly explain what happened:
1000
to account ABC321
100
from account ABC321
555
from account XYZ987
22
from account XYZ987
Therefore after the above transactions have finished processing the account balances are:
ABC321
: 900
XYZ987
: 533
As mentioned before, now that the messages has been successfully processed we need to acknowledge them to let Redis know we're done with them. To do this we call xAck
// $stream_name = 'transactions'
// $group_name = 'balance_manager'
// $processed_message_ids = ['1652618603161-0', '1652618603161-1', '1652618603161-2', '1652618603161-3']
$redis->xAck($stream_name, $group_name, $processed_message_ids);
Let's break down the xAck
command quickly:
['1652618603161-0', '1652618603161-1', '1652618603161-2', '1652618603161-3']
Redis Streams provide a simple API to quickly get up and running with messaging. All that's needed is xAdd
to add messages, xGroup
to create the consumer group, xReadGroup
to read the messages, and xAck
to acknowledge the messages. Given Redis' is widely used as a caching solution this may be an easy way to get started with messaging without needing additional infrastructure. Keep in mind the example shown above is not indicative of what a production implementation should look like, there are some considerations not taken into account for brevity. If you're looking to set this up in production, I strongly suggest you read over the official documentation.
XADD
command: https://redis.io/commands/xadd/
XGROUP CREATE
command: https://redis.io/commands/xgroup-create/
XREADGROUP
command: https://redis.io/commands/xreadgroup/
XACK
command: https://redis.io/commands/xack/