Skip to content

Instantly share code, notes, and snippets.

@nrk
Last active September 22, 2020 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nrk/90f0eeedb368e384867a679656f89261 to your computer and use it in GitHub Desktop.
Save nrk/90f0eeedb368e384867a679656f89261 to your computer and use it in GitHub Desktop.
Proof of concept for deferred command pipelines in Predis v2.0 (dev).
/vendor
.php-version
.php_cs.cache
composer.lock

Deferred command pipelines in Predis v2.0 (dev)

NOTE: everything on this gist should not be considered stable in terms of correctness and API design. Basically what you see here is far from being definitive and mostly an experiment, that's the reason why I chose a gist instead of a full fledged repository.

This is a proof of concept for deferred command pipelines in Predis v2.0 built on top of the changes implemented in the v2.0-pipeline-rework development branch (see PR #663 for details) and inspired by Jim Nelson's Deferred with some differences (this is also why I purposely tried to avoid the usual Promises / Futures lingo when naming stuff).

Compared to a normal command pipeline, invoking a method that maps to a Redis command on a deferred pipeline returns a future response object. Future responses are just like placeholders for actual Redis responses: they are populated with values only when DeferredPipeline::execute() is invoked to flush any command in the queue and responses are read back from Redis. DeferredPipeline::execute() does not return an array of responses like normal pipelines, values returned from Redis are accessible only through future response instances.

Here is the most basic example of how using a deferred pipeline looks like in practice:

$pipeline = new DeferredPipeline($client);

$foo = $pipeline->get('foo');
$hoge = $pipeline->get('hoge');

$pipeline->execute();

$myStructuredObject = (object) [
    'foo' => $foo->value(),
    'hoge' => $hoge->value(),
];

More examples can be found in files prefixed by example_ on this gist.

The same approach should be implemented for transactions after re-implementing Predis\Transaction\MultiExec on top of command queues so we will have a DeferredMultiExec returning future responses just like DeferredPipeline.

This code is only available as a gist for now but it would be great to integrate this concept in Predis so ideally we will have a proper pull request against predis/predis as soon as PR #663 gets merged and after polishing how deferred pipelines are designed and implemented. In the meanwhile you can just fork this gist an play with the code.

This means that you feedback is welcome and appreciated!

<?php
require_once __DIR__ . '/vendor/autoload.php';
use Predis\ClientInterface;
use Predis\Pipeline\Pipeline;
function populate_redis(ClientInterface $client)
{
$client->pipeline(function (Pipeline $pipeline) {
$pipeline->flushall();
$pipeline->lpush("awesomelist", array_map(function ($item) {
return "item:$item";
}, range(1, 10)));
$pipeline->hmset("awesomehash", [
'name' => 'John',
'surname' => 'Doe',
'age' => 40,
'other' => null
]);
});
}
{
"type": "library",
"description": "Proof of concept for deferred pipeline results with Predis",
"license": "MIT",
"authors": [
{
"name": "Daniele Alessandri",
"email": "suppakilla@gmail.com"
}
],
"autoload": {
"psr-4": {
"Nrk\\Predis\\Experiments\\": "./"
}
},
"require": {
"php": "^7.2 || ^8.0",
"predis/predis": "dev-v2.0-pipeline-rework"
}
}
<?php
namespace Nrk\Predis\Experiments;
use EmptyIterator;
use InvalidArgumentException;
use SplObjectStorage;
use Traversable;
use Predis\ClientInterface;
use Predis\Command\CommandInterface;
use Predis\Connection\ConnectionInterface;
use Predis\Pipeline\Pipeline;
use Predis\Pipeline\Queue\Basic as BasicQueue;
use Predis\Pipeline\Queue\CommandQueueInterface;
use Predis\Pipeline\Queue\FireAndForget as FireAndForgetQueue;
use Predis\Response\ErrorInterface;
/**
* Specialized pipeline implementation returning future responses.
*/
class DeferredPipeline extends Pipeline
{
/** @var SplObjectStorage */
protected $futures;
/**
* @inheritdoc
*/
public function __construct(ClientInterface $client, CommandQueueInterface $queue = null, bool $traversable = false)
{
$queue = $queue ?? new BasicQueue();
static::checkQueueCompatibility($queue);
parent::__construct($client, $queue, $traversable);
$this->futures = new SplObjectStorage();
}
/**
* Checks if the command queue is supported by the pipeline abstraction.
*
* @param CommandQueueInterface $queue Command queue
*
* @throws InvalidArgumentException
*/
protected static function checkQueueCompatibility(CommandQueueInterface $queue)
{
if ($queue instanceof FireAndForgetQueue) {
throw new InvalidArgumentException(sprintf(
'Unsupported command queue: %s',
get_class($queue)
));
}
}
/**
* Creates a new future response instance.
*
* @return FutureResponse
*/
protected function createFutureResponse(): FutureResponse
{
return new FutureResponse();
}
/**
* @inheritdoc
*/
protected function recordCommand(CommandInterface $command): object
{
parent::recordCommand($command);
$this->futures->attach($command, $future = $this->createFutureResponse());
return $future;
}
/**
* @inheritdoc
*/
protected function executePipeline(ConnectionInterface $connection): Traversable
{
$responses = parent::executePipeline($connection);
foreach ($responses as $command => $response) {
$this->futures[$command]($command, $response);
$this->futures->detach($command);
}
yield;
}
/**
* @inheritdoc
*/
protected function onResponseError(ConnectionInterface $connection, CommandInterface $command, ErrorInterface $response)
{
return $response;
}
/**
* @inheritdoc
*/
public function execute(callable $callable = null): ?iterable
{
/** @var \Iterator|array */
$iterable = parent::execute($callable);
if ($iterable instanceof Traversable) {
// NOTE: when the parents returns a traversable we must trigger
// the iteration so that responses are fetched from the buffer.
$iterable->valid();
}
return null;
}
/**
* @inheritdoc
*/
public function getIterator(): Traversable
{
// NOTE: deferred pipelines do not return responses from execute() so we
// just return an empty iterator.
return new EmptyIterator();
}
}
<?php
require_once __DIR__ . '/bootstrap.php';
use Nrk\Predis\Experiments\DeferredPipeline;
// WARNING: THIS SCRIPT DESTROYS EVERYTHING IN YOUR DATABASE ON 127.0.0.1:6379!
$client = new Predis\Client();
populate_redis($client);
/**
* Just an example, it's not like I'm encouraging such an approach.
*/
class MyAwesomeObject
{
protected $string;
protected $list;
protected $hash;
public static function load(DeferredPipeline $pipeline): self
{
$instance = new static();
$pipeline->execute($instance);
return $instance;
}
public function __invoke(DeferredPipeline $pipeline)
{
$pipeline
->echo("deferred is good!")
->validate('is_string')
->transform('strtoupper')
->bind($this->string);
$pipeline
->lrange("awesomelist", 0, -1)
->bind($this->list);
$pipeline
->hgetall("awesomehash")
->transform(function (iterable $response) {
foreach ($response as $key => $value) {
yield "KEY: $key" => $value;
}
})
->transform(function (iterable $response) {
foreach ($response as $key => $value) {
yield $key => "VALUE: $value";
}
})
->asArray()
->bind($this->hash);
}
public function getString(): string
{
return $this->string;
}
public function getList(): array
{
return $this->list;
}
public function getHash(): array
{
return $this->hash;
}
}
// Just pass Predis\Pipeline\Queue\Atomic as the second argument for a pipeline
// wrapped in a MULTI ... EXEC block.
$pipeline = new DeferredPipeline($client);
$awesome = MyAwesomeObject::load($pipeline);
echo "== MyAwesomeObject::getString() =======", PHP_EOL;
var_dump($awesome->getString());
echo PHP_EOL;
echo "== MyAwesomeObject::getList() =======", PHP_EOL;
var_dump($awesome->getList());
echo PHP_EOL;
echo "== MyAwesomeObject::getHash() =======", PHP_EOL;
var_dump($awesome->getHash());
echo PHP_EOL;
<?php
require_once __DIR__ . '/bootstrap.php';
use Predis\Command\CommandInterface;
use Nrk\Predis\Experiments\DeferredPipeline;
use Nrk\Predis\Experiments\FutureResponse;
// WARNING: THIS SCRIPT DESTROYS EVERYTHING IN YOUR DATABASE ON 127.0.0.1:6379!
$client = new Predis\Client();
populate_redis($client);
$mylist = null;
$myhash = null;
// Just pass Predis\Pipeline\Queue\Atomic as the second argument for a pipeline
// wrapped in a MULTI ... EXEC block.
$pipeline = new DeferredPipeline($client);
/** @var FutureResponse */
$echoResponse = $pipeline
->echo("deferred is good!")
->transform('strtoupper');
/** @var FutureResponse */
$pipeline
->lrange("awesomelist", 0, -1)
->validate('is_array')
->bind($mylist);
$pipeline
->hgetall("awesomehash")
->validate('is_array')
->transform(function (iterable $response) {
foreach ($response as $key => $value) {
yield "KEY: $key" => $value;
}
})
->transform(function (iterable $response) {
foreach ($response as $key => $value) {
yield $key => "VALUE: $value";
}
})
->asArray()
->ready(function (CommandInterface $command, $response) {
[$cmdID, $cmdKey, $responseType] = [$command->getId(), $command->getArgument(0), gettype($response)];
echo "** Command `$cmdID` on key `$cmdKey` returned `$responseType` response **", PHP_EOL, PHP_EOL;
})
->bind($myhash);
$pipeline->execute();
echo "== Value from future response ============", PHP_EOL;
var_dump($echoResponse->value());
echo PHP_EOL;
echo "== Value from bound variable \$mylist ====", PHP_EOL;
var_dump($mylist);
echo PHP_EOL;
echo "== Value from bound variable \$myhash =====", PHP_EOL;
var_dump($myhash);
echo PHP_EOL;
<?php
namespace Nrk\Predis\Experiments;
use Exception;
use Generator;
use InvalidArgumentException;
use ReflectionFunction;
use ReflectionMethod;
use RuntimeException;
use Traversable;
use UnexpectedValueException;
use Predis\Command\CommandInterface;
/**
* Future response class.
*/
class FutureResponse
{
protected $ready = false;
protected $throw = true;
protected $value = null;
protected $binding = null;
protected $validator = null;
protected $readyListeners = [];
protected $errorListeners = [];
protected $transformers = [];
protected $transformToArray = false;
protected $transformToGenerator = false;
/**
* Returns whether the future response holds a finalized value.
*
* @return bool
*/
public function isReady(): bool
{
return $this->ready;
}
/**
* Returns the finalized value of the future response.
*
* @throws RuntimeException when the future response has not been finalized yet
*
* @return mixed
*/
public function value()
{
if (!$this->isReady()) {
throw new RuntimeException('Cannot retrieve a value from a non-finalized future response');
}
return $this->value;
}
/**
* Binds the future response value to an external variable.
*
* @param mixed $value
*
* @throws RuntimeException when the future response has been already finalized
*
* @return self
*/
public function bind(&$value): self
{
if ($this->isReady()) {
throw new RuntimeException(
'Cannot bind an external variable when the future response has been already finalized'
);
}
$this->binding = &$value;
return $this;
}
/**
* Makes sure that the user-supplied callable is valid.
*
* The user-supplied callable must define at least two required parameters:
*
* - the first one of type Predis\Command\CommandInterface for the command.
* - the second one of mixed type for the response.
*
* When the user-supplied callable defines just one required parameter it is
* wrapped by another callable defining the signature as described above: in
* this case the original callable receives just the response payload, this
* is to make it easier to directly use PHP functions such as `is_array` for
* validation or `iterator_to_array` for transformation, or any other single
* parameter method that receives just one value.
*
* @param callable $callable User-supplied callable
*
* @return callable
*/
protected static function ensureValidCallable(callable $callable): callable
{
if (is_object($callable)) {
$reflection = new ReflectionMethod($callable, '__invoke');
} elseif (is_array($callable)) {
$reflection = new ReflectionMethod($callable[0], $callable[1]);
} else {
$reflection = new ReflectionFunction($callable);
}
$parameters = $reflection->getNumberOfRequiredParameters();
if ($parameters === 0) {
throw new InvalidArgumentException('User-supplied callable must define at least one parameter');
}
if ($parameters > 1 && $reflection->getParameters()[0]->getType()->getName() !== CommandInterface::class) {
throw new InvalidArgumentException(sprintf(
'User-supplied callable must define at least two parameters with the first one being a %s',
CommandInterface::class
));
}
if ($parameters === 1) {
$callable = function (CommandInterface $command, $response) use ($callable) {
return $callable($response);
};
}
return $callable;
}
/**
* Appends a callable invoked for response transformation.
*
* The user-supplied callable receives the input value as the only
* argument and must always return the resulting value.
*
* Transformations are chained so the next transformer in the chain receives
* the result of the previous transformation.
*
* @param callable $transformer Callable for value transformation
*
* @return self
*/
public function transform(callable $transformer): self
{
$this->transformers[] = static::ensureValidCallable($transformer);
return $this;
}
/**
* Returns the response as an array.
*
* @return self
*/
public function asArray(bool $value = true): self
{
$this->transformToArray = $value;
return $this;
}
/**
* Transforms the response to an array.
*
* Non iterable values are returned as single-element arrays.
*
* @param mixed $response
*
* @return array
*/
protected static function toArray($response): array
{
if (is_array($response)) {
return $response;
}
if ($response instanceof Traversable) {
return iterator_to_array($response);
}
return (array) $response;
}
/**
* Returns the response as a generator.
*
* @return self
*/
public function asGenerator(bool $value = true): self
{
$this->transformToGenerator = $value;
return $this;
}
/**
* Transforms the response to a generator.
*
* Non iterable values are returned as single-element generator.
*
* @param mixed $response
*
* @return Generator
*/
protected static function toGenerator($response): Generator
{
if ($response instanceof Generator) {
return $response;
}
if (!is_iterable($response)) {
$response = (array) $response;
}
foreach ($response as $key => $value) {
yield $key => $value;
}
}
/**
* Attaches a callable acting as a validator for the response.
*
* The user-supplied callable is invoked when the future response is being
* finalized, right before proceeding with transformations, to verify that
* the raw response returned from Redis can be considered valid.
*
* The validator receives three arguments:
*
* - the command associated to the response returned from Redis;
* - the response payload;
* - an optional string by reference to customize the exception message;
*
* The validator must return a boolean to indicate whether the response can
* be considered valid before proceeding.
*
* @param callable $listener Callable for validation
*
* @return self
*/
public function validate(callable $validator): self
{
$this->validator = static::ensureValidCallable($validator);
return $this;
}
/**
* Appends a callable invoked when the future response has been finalized.
*
* The user-supplied callable receives two arguments:
*
* - the command associated to the response returned from Redis;
* - the transformed response payload;
*
* Any value returned by the user-supplied callable is ignored.
*
* @param callable $listener Callable for notification
*
* @return self
*/
public function ready(callable $listener): self
{
$this->readyListeners[] = static::ensureValidCallable($listener);
return $this;
}
/**
* Appends a callable invoked when the future response generates an error.
*
* The user-supplied callable receives three arguments:
*
* - the command associated to the response returned from Redis
* - the transformed response
* - the underlying exception instance
*
* Any value returned by the user-supplied callable is ignored.
*
* @param callable $listener Callable for notification
*
* @return self
*/
public function error(callable $listener): self
{
$this->errorListeners[] = static::ensureValidCallable($listener);
return $this;
}
/**
* Appends internal trasformers to the user-supplied list of transformers.
*/
protected function finalizeTransformers(): void
{
if ($this->transformToArray) {
$this->transformers[] = static::ensureValidCallable([$this, 'toArray']);
}
if ($this->transformToGenerator) {
$this->transformers[] = static::ensureValidCallable([$this, 'toGenerator']);
}
}
/**
* Validates the response returned from Redis.
*
* @param CommandInterface $command Command associated to the response
* @param mixed $response Response returned from Redis
*
* @return mixed
*/
protected function validateResponse(CommandInterface $command, $response)
{
if ($this->validator) {
$message = null;
if (false === call_user_func_array($this->validator, [$command, $response, &$message])) {
throw new UnexpectedValueException(
$message ?? "Failed validating response to {$command->getId()}"
);
}
}
}
/**
* Transforms the response returned from Redis.
*
* @param CommandInterface $command Command associated to the response
* @param mixed $response Response returned from Redis
*
* @return mixed
*/
protected function transformResponse(CommandInterface $command, $response)
{
$this->finalizeTransformers();
foreach ($this->transformers as $transformer) {
$response = $transformer($command, $response);
}
return $response;
}
/**
* Notify ready listeners when the future response has been finalized.
*
* @param CommandInterface $command Command associated to the response
* @param mixed $response Response returned from Redis
*/
protected function notifyReadyListeners(CommandInterface $command, $response): void
{
foreach ($this->readyListeners as $listener) {
$listener($command, $response);
}
}
/**
* Notify error listeners of errors during the future response finalization.
*
* @param CommandInterface $command Command associated to the response
* @param mixed $response Response returned from Redis
* @param Exception $exception Underlying exception
*/
protected function notifyErrorListeners(CommandInterface $command, $response, Exception $exception): void
{
foreach ($this->errorListeners as $listener) {
$listener($command, $response, $exception);
}
}
/**
* Finalizes the future response with the value returned from Redis.
*
* @param CommandInterface $command Command associated to the response
* @param mixed $response Response returned from Redis
*
* @throws RuntimeException when the future response has been already finalized
*/
public function __invoke(CommandInterface $command, $response)
{
if ($this->isReady()) {
throw new RuntimeException('Cannot finalize a future response that has been already finalized');
}
try {
$this->validateResponse($command, $response);
$this->value = $this->transformResponse($command, $response);
} catch (Exception $exception) {
$this->notifyErrorListeners($command, $response, $exception);
// TODO: admittedly I'm still not sure how to handle exceptions from
// validator and transformers callbacks, I have a flag here just to
// quickly switch back and forth while experimenting with stuff.
if ($this->throw) {
throw $exception;
}
return;
}
$this->binding = $this->value;
$this->ready = true;
$this->notifyReadyListeners($command, $this->value);
}
}
Copyright (c) 2020 Daniele Alessandri
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment