Skip to content

Instantly share code, notes, and snippets.

@GromNaN
Last active December 18, 2023 16:05
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GromNaN/a9014a9c69c9a5cc08ac8c30677408f4 to your computer and use it in GitHub Desktop.
Save GromNaN/a9014a9c69c9a5cc08ac8c30677408f4 to your computer and use it in GitHub Desktop.
Fiber powered Symfony HttpClient with Revolt (and AMPHP)
{
"require": {
"php": ">=8.1.2",
"amphp/amp": "v3.x-dev",
"symfony/http-client": "^6.0"
}
}
<?php
namespace Demo;
use App\RevoltHttpClient;
use Symfony\Component\HttpClient\CurlHttpClient;
use function Amp\async;
use function Amp\Future\await;
require __DIR__.'/vendor/autoload.php';
require __DIR__.'/RevoltHttpClient.php';
$httpClient = new RevoltHttpClient(new CurlHttpClient());
$request = function (string $version) use ($httpClient) {
return async(function () use ($version, $httpClient) {
printf('0-Start %s%s', $version, PHP_EOL);
$response1 = $httpClient->request('GET', 'https://symfony.com/releases/'.$version.'.json?1');
printf('1-Continue %s%s', $version, PHP_EOL);
$response2 = $httpClient->request('GET', 'https://symfony.com/releases/'.$version.'.json?2');
printf('2-Continue %s%s', $version, PHP_EOL);
$response3 = $httpClient->request('GET', 'https://symfony.com/releases/'.$version.'.json?3');
printf('3-End %s%s', $version, PHP_EOL);
return [
$response1->toArray(),
$response2->toArray(),
$response3->toArray(),
];
});
};
$responses = [
$request('4.4'),
$request('5.4'),
$request('6.0'),
];
$data = await($responses);
printf('THE END%s', PHP_EOL);
//var_dump($data);
0-Start 4.4
0-Start 5.4
0-Start 6.0
1-Continue 5.4
1-Continue 4.4
1-Continue 6.0
2-Continue 5.4
2-Continue 4.4
3-End 5.4
2-Continue 6.0
3-End 4.4
3-End 6.0
THE END
========================================================================
# The 3 fibers are run concurrently.
# When any request is completed, the corresponding fiber continues.
# Horizontal timeline. Each | is a new HTTP request that is sent.
4.4 |--------|-------|--------|
5.4 |-----|-------|-----|
6.0 |---------|-----------|-------|
<?php
namespace App;
use Revolt\EventLoop;
use Symfony\Contracts\HttpClient\HttpClientInterface;
class RevoltHttpClient
{
private int $requests = 0;
private array $responses = [];
private array $suspensions = [];
private bool $registered = false;
public function __construct(
private readonly HttpClientInterface $httpClient
)
{
}
public function request(string $method, string $url, array $options = [])
{
$response = $this->httpClient->request($method, $url, $options);
$id = $this->requests++;
$this->suspensions[$id] = $suspension = EventLoop::getSuspension();
$this->responses[$id] = $response;
$this->register();
// Here is the magic. The function is interrupted and wait
// The "race" method will resume when the response is received.
$suspension->suspend();
return $response;
}
private function register(): void
{
if (!$this->registered) {
EventLoop::defer($this->race(...));
$this->registered = true;
}
}
/**
* Resume the suspension for the first received response.
*
* @todo Would be more efficient if HttpClient had a "race" function
*/
private function race(): void
{
$stream = $this->httpClient->stream($this->responses);
foreach ($stream as $response => $chunk) {
if ($chunk->isLast()) {
$this->registered = false;
$id = array_search($response, $this->responses);
/** @var EventLoop\Suspension $suspension */
$suspension = $this->suspensions[$id];
unset($this->responses[$id], $this->suspensions[$id]);
if (\count($this->responses) > 0) {
$this->register();
}
$suspension->resume();
return;
}
}
throw new \RuntimeException('A request should have ended');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment