Grayscale profile picture

Patrique Ouimet

Developer

Messaging with PHP and Redis Streams

Sun, May 15, 2022 9:55 AM

Introduction

In this article we'll explain how to setup Redis Streams as our message broker using PHP. We will configure a producer (creates messages) and a few simple consumers (reads messages).

NOTE: there is a demo code repository if you want to see it in action that can be found here: https://github.com/patoui/demo_redis_streams

Requirements

Setup

Below we'll show how to setup a producer and a consumer for a bank account's transactions. First, we'll setup a producer for creating messages which will be transactions in our example. Second, we'll setup a consumer to update the accounts current balance based on the incoming messages which are the transactions.

Creating a Producer

<?php

declare(strict_types=1);

$redis = new Redis();
$redis->connect('redis');

$transactions = [
    ['account_no' => 'ABC321', 'value' => 1000],
    ['account_no' => 'ABC321', 'value' => -100],
    ['account_no' => 'XYZ987', 'value' => 555],
    ['account_no' => 'XYZ987', 'value' => -22],
];

foreach ($transactions as $transaction) {
    $redis->xAdd('transactions', '*', $transaction);
}

In the above example we've added 2 transactions that have occurred against the account ABC321 and 2 transactions against the XYZ987 account.

Let's break down the individual components:

  • $transactions is a list of messages to be put onto a stream (aka topic)
  • xAdd will add messages to the given stream name
  • The first argument is the stream which in our case is 'transactions'
  • The second argument is the ID to associate the messages with, in the above scenario it's set to '*' which tells Redis to auto-generate the ID for us. These auto generated IDs consist of Unix timestamp in milliseconds (e.g. 1526919030474) followed by a hyphen - an auto-incrementing integer (e.g 55). An example looks like 1526919030474-55
  • The third argument is the message to add to the stream, which are the transactions in the above scenario

To consume these messages (transactions), we need to create a consumer

Creating a Consumer

<?php

declare(strict_types=1);

$redis = new Redis();
$redis->connect('redis');

$stream_name   = 'transactions';
$group_name    = 'balance_manager';
$consumer_name = 'consumer_1';

$redis->xGroup('CREATE', $stream_name, $group_name, '0');
$stream_messages = $redis->xReadGroup(
    $group_name,
    $consumer_name,
    [$stream_name => '>']
);

if (!$stream_messages) {
    die('No messages' . PHP_EOL);
}

$transactions = $stream_messages[$stream_name] ?? null;

if (!$transactions) {
    die('No transactions' . PHP_EOL);
}

$processed_message_ids = [];
$balances = [];
foreach ($transactions as $message_id => $transaction) {
    if (!isset($balances[$transaction['account_no']])) {
        $balances[$transaction['account_no']] = 0;
    }
    $balances[$transaction['account_no']] += $transaction['value'];
    $processed_message_ids[] = $message_id;
}

foreach ($balances as $account_no => $balance) {
    echo "Account No {$account_no}, current balance: {$balance}" . PHP_EOL;
}

$redis->xAck($stream_name, $group_name, $processed_message_ids);

Let's breakdown the consumer:

  • The consumer will read the messages from the 'transactions' stream under the group named 'balance_manager' and the consumer named 'consumer_1'. Once the messages have successfully been processed they will be acknowledged to notify Redis internals that the message was read and processed successfully.
  • xGroup is used to create the consumer group 'balance_manage'
  • xReadGroup is how we'll read the messages with a consumer group.
  • The first argument is the group name, in the above scenario that is 'balance_manager'.
  • The second argument is the consumer name, in the above scenario that is 'consumer_1'.
  • The third argument is a list of streams. In the above scenario, we'll only be consuming 1 stream named 'transactions' as seen in the producer. You'll notice we've set the array key 'transactions' value to '>', this tells Redis to only give us the new messages.

The return value of xReadGroup is an associative array where the keys are the stream names which contain their messages. The output from our example will look as follows (keys may vary based on your systems current time):

[
    'transactions' => [
        '1652618603161-0' => ['account_no' => 'ABC321', 'value' => 1000],
        '1652618603161-1' => ['account_no' => 'ABC321', 'value' => -100],
        '1652618603161-2' => ['account_no' => 'XYZ987', 'value' => 555],
        '1652618603161-3' => ['account_no' => 'XYZ987', 'value' => -22],
    ]
]

Given the above, we extract the stream we're interested in by accessing the key, in the example that is 'transactions'

// $stream_name = 'transactions'
$transactions = $stream_messages[$stream_name] ?? null;

Now we can loop over the messages (transactions). While looping we need to keep track of which messages we've processed successfully ($processed_message_ids) so that we acknowledge them afterwards. This can be useful as if there's an issue (e.g. exception thrown) with one of the messages (transactions) it will remain unacknowledged so that we may retry processing the message.

$processed_message_ids = [];
$balances = [];
foreach ($transactions as $message_id => $transaction) {
    if (!isset($balances[$transaction['account_no']])) {
        $balances[$transaction['account_no']] = 0;
    }
    $balances[$transaction['account_no']] += $transaction['value'];
    $processed_message_ids[] = $message_id;
}

foreach ($balances as $account_no => $balance) {
    echo "Account No {$account_no}, current balance: {$balance}" . PHP_EOL;
}

If all messages are successful, the above should output

Account No ABC321, current balance: 900
Account No XYZ987, current balance: 533

To briefly explain what happened:

  • Message #1 deposited 1000 to account ABC321
  • Message #2 withdrew 100 from account ABC321
  • Message #3 deposited 555 from account XYZ987
  • Message #4 withdrew 22 from account XYZ987

Therefore after the above transactions have finished processing the account balances are:

  • ABC321: 900
  • XYZ987: 533

As mentioned before, now that the messages has been successfully processed we need to acknowledge them to let Redis know we're done with them. To do this we call xAck

// $stream_name = 'transactions'
// $group_name = 'balance_manager'
// $processed_message_ids = ['1652618603161-0', '1652618603161-1', '1652618603161-2', '1652618603161-3']
$redis->xAck($stream_name, $group_name, $processed_message_ids);

Let's break down the xAck command quickly:

  • The first argument is the stream name which we want to acknowledge the messages on
  • The second argument is the group name which we want to acknowledge the messages on
  • The third argument is an array to message ids we want to acknowledge, in the above scenario it would be
    • ['1652618603161-0', '1652618603161-1', '1652618603161-2', '1652618603161-3']

Conclusion

Redis Streams provide a simple API to quickly get up and running with messaging. All that's needed is xAdd to add messages, xGroup to create the consumer group, xReadGroup to read the messages, and xAck to acknowledge the messages. Given Redis' is widely used as a caching solution this may be an easy way to get started with messaging without needing additional infrastructure. Keep in mind the example shown above is not indicative of what a production implementation should look like, there are some considerations not taken into account for brevity. If you're looking to set this up in production, I strongly suggest you read over the official documentation.

Resources

Comments

Oct 6, 2022

xGroup creates consumer, yes. But how to delete that consumer afterwards? We have \Redis::xGroup, but its signature does not allow to use $consumer_name. How do you deal with it?

Aug 30, 2023

Hi Egor, thanks for reading my article! To answer your question, you can use xGroup method to delete consumers by passing 'DELCONSUMER' as the first argument. For more information see the official extension documentation https://github.com/phpredis/phpredis#xgroup