Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Integrating Guzzle 6 Asynchronous Requests with ReactPHP
<?php
/**
* Credits to :
* @see https://gist.github.com/psampaz/7f2aad5d1d54eeeec8ae
*/
use GuzzleHttp\Handler\CurlMultiHandler;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Middleware;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\LoopInterface;
use function GuzzleHttp\Promise\queue;
use function GuzzleHttp\choose_handler;
use React\EventLoop\StreamSelectLoop;
use React\EventLoop\TimerInterface;
use React\Promise\Deferred;
use GuzzleHttp\Promise\PromiseInterface as GuzzlePromise;
class ReactHandlerStack extends HandlerStack
{
/**
* @var TimerInterface
*/
protected $timer;
/**
* @var callable
*/
protected $handlerRef;
/**
* @var LoopInterface
*/
protected $loop;
public function __construct(callable $handler = null)
{
$this->handlerRef = $handler;
parent::__construct($handler);
}
/**
* @param callable|null $handler
* @return HandlerStack|ReactHandlerStack
*/
public static function create(callable $handler = null)
{
$stack = new static($handler ?: choose_handler());
$stack->push(Middleware::httpErrors(), 'http_errors');
$stack->push(Middleware::redirect(), 'allow_redirects');
$stack->push(Middleware::cookies(), 'cookies');
$stack->push(Middleware::prepareBody(), 'prepare_body');
return $stack;
}
public static function bindReactPhpPromiseTicker(LoopInterface $loop)
{
return (self::create(new CurlMultiHandler()))->enableReactPhpPromiseTicker($loop);
}
/**
* Crea un HandlerStack che funziona con ReactPHP
*
* @param LoopInterface $loop
* @return HandlerStack
*/
protected function enableReactPhpPromiseTicker(LoopInterface $loop)
{
/**
* Avvia un timer ogni qualvolta parte una prima chiamata, quando le risposte arrivano
*/
$this->push(function (callable $handler) use($loop) {
return function (
RequestInterface $request,
array $options
) use ($handler,$loop) {
$that = $this;
if(!$this->timer) {
$this->timer = $loop->addPeriodicTimer($loop instanceof StreamSelectLoop ? 0 : 0.001,\Closure::bind(function($timer)use($that){
$this->execute();
$that->cancelTimer();
},$this->handlerRef,$this->handlerRef));
}
return $handler($request, $options);
};
},'reactphp_promise_ticker');
$this->setLoop($loop);
return $this;
}
/**
* @return LoopInterface
*/
public function getLoop(): LoopInterface
{
return $this->loop;
}
/**
* @param LoopInterface $loop
* @return ReactHandlerStack
*/
public function setLoop(LoopInterface $loop): ReactHandlerStack
{
$this->loop = $loop;
return $this;
}
public function cancelTimer()
{
$this->loop->cancelTimer($this->timer);
$this->timer = null;
}
/**
* Dont ever mix GuzzlePromise with ReactPhpPromise !!! use instead this wrapper
*
* @param GuzzlePromise $promise
* @return Promise|\React\Promise\PromiseInterface
*/
public static function guzzleToReactPromise(GuzzlePromise $promise)
{
$deferred = new Deferred();
$promise->then(function ($value) use($deferred){
$deferred->resolve($value);
}, function ($error) use($deferred) {
$deferred->reject($error);
});
return $deferred->promise();
}
}
@naive17
Copy link

naive17 commented Sep 16, 2019

wow bellissimo.

@MatteoOreficeIT
Copy link
Author

I simplified the code, now the timer is going to be canceled when the execute() method ends, since the promise queue becomes empty

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment