Skip to content

Instantly share code, notes, and snippets.

@mtdowling
Last active August 29, 2015 14:06
Show Gist options
  • Save mtdowling/47da5eab54337cdc7ecc to your computer and use it in GitHub Desktop.
Save mtdowling/47da5eab54337cdc7ecc to your computer and use it in GitHub Desktop.
React and Guzzle experiment
<?php
require 'vendor/autoload.php';
use GuzzleHttp\Stream\BufferStream;
use GuzzleHttp\Stream\AsyncReadStream;
use GuzzleHttp\Ring\Future;
$loop = React\EventLoop\Factory::create();
$dnsResolverFactory = new React\Dns\Resolver\Factory();
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
$factory = new React\HttpClient\Factory();
$client = $factory->create($loop, $dnsResolver);
function complete_response(
$request,
$reactRequest,
$reactResponse,
&$ringResponse,
$loop
) {
$expectedPump = 0;
// Note: the user should be able to provide a custom buffer with the
// save_to client option.
$buffer = new BufferStream();
// Get our decorated buffer (for react) and async stream (for consumer)
list($buffer, $async) = AsyncReadStream::create([
'buffer' => $buffer,
'drain' => function () use ($reactRequest) {
// Resume the request, though I'm not sure how to "pause" it.
$reactRequest->handleDrain();
},
'pump' => function ($amount) use ($loop, $buffer, &$expectedPump) {
// Block until at least the required amount of data has been
// read.
echo "Pumping $amount\n";
$expectedPump = $amount;
while ($expectedPump > 0) {
$loop->tick();
}
},
'write' => function ($buffer, $data) use (&$expectedPump, $loop) {
echo "Writing data\n";
// handle the pump
if ($expectedPump) {
$expectedPump -= strlen($data);
$expectedPump = max(0, $expectedPump);
if ($expectedPump == 0) {
echo "Amount has been pumped\n";
}
}
}
]);
// todo: Actually return an AsyncStream with a pump and drain function
// that helps to regulate how fast react reads from the socket.
$reactResponse->on('data', function ($data) use ($buffer) {
return $buffer->write($data);
});
$ringResponse = [
'headers' => $reactResponse->getHeaders(),
'status' => $reactResponse->getCode(),
'version' => $reactResponse->getProtocol(),
'reason' => $reactResponse->getReasonPhrase(),
'body' => $async
];
if (isset($request['then'])) {
$then = $request['then'];
$ringResponse = $then($ringResponse) ?: $ringResponse;
}
}
$adapter = function (array $request) use ($loop, $client) {
// This is the value we will track in the callbacks and future
$result = null;
// todo: Implement more advanced request creation and support for all of
// the request and request client options.
$reactRequest = $client->request($request['http_method'], $request['url']);
// When the response is ready, update the result by reference.
$reactRequest->on(
'response',
function ($reactResponse) use ($request, $reactRequest, &$result, $loop) {
complete_response($request, $reactRequest, $reactResponse, $result, $loop);
}
);
$reactRequest->end();
// The future is returned immediately. When accessed or wait() is called,
// it will block until the response has finished, but the data has not
// not yet been read.
return new Future(function() use ($reactRequest, $loop, &$result) {
if (!$result) {
// Block until it's done
do {
$loop->tick();
} while (!$result);
}
return $result;
});
};
$response = $adapter([
'http_method' => 'GET',
'url' => 'http://www.google.com',
'then' => function (array $response) {
echo "I'm done!\n";
}
]);
echo $response['status'] . "\n";
echo $response['reason'] . "\n";
// Do some reads. Because there is a pump function, these are "blocking" (but still done in the event loop).
echo $response['body']->read(1024);
echo $response['body']->read(1024);
echo $response['body']->read(1024);
echo $response['body']->read(1024);
@WyriHaximus
Copy link

When you create a react based future response, the response is fulfilled in the future when it is dereferenced. When the future is dereferenced, it checks to see if the react response has been completed. If it has not completed, it should run the react event loop until the response is completed or it has failed. "Completed" in this case means that the headers of the response have been received and the body has not necessarily completely been received.

This is what worries me, the rest makes sense. The following bit is completely theoretical. And I might be completely wrong. Heck I hope I'm wrong 😄 .

Running tick on the eventloop is not without consequence. Normally doing that it means all the callbacks get executed and everything is just rainbows and unicorns. But since the last request starts ticking the loop until it has a response it will block any thing doing the same thing. Lets walk through a scenario:

  • Request A is sent out and starts ticking the loop waiting for a response
  • While A is ticking, part of the application is called in a callback and request B is made
  • Request B does the same thing ticking the loop until it has a response
  • Request A has an answer and sets $running to false, but we're now in B's while loop and that will not end until it has a response
  • Request B has an answer and breaks the loop, goes through the rest of that callback and ends up in A's while loop which immediately breaks and continues the rest of the callback

Now this possible issue is manageable with not much going on but when you have 25 to 250 request or other things going on it will become a bottle neck and a memory hog because things that are waiting to handled.

@mtdowling
Copy link
Author

But since the last request starts ticking the loop until it has a response it will block any thing doing the same thing.

Yes, the future will not return a value until it has one. If along the way, other responses were fulfilled, then those will be completed first, delaying the originally requested future from returning. This would also be the case for trying to read from an emulated blocking stream using a pump.

This is actually how the CurlMulti adapter works, except in that adapter, you can specify a maximum limit on the number of concurrent requests. When the threshold is exceeded, it will block until it is at an acceptable level. The CurlMulti adapter also writes data to PHP temp streams, which can help protect you from running out of memory by switching to on-disk storage. You can actually do the same thing with the AsyncReadStream by supplying a Stream object that wraps a php://temp stream. I also just added the ability to provide custom metadata to streams so that the temp stream could also provide a high water mark, telling writers to slow down because the stream is too full.

The CurlAdapter also utilizes the "then" request option which is triggered the instant a response or error is received. This same behavior should be implemented in the react adapter so that when the $req->on('response', function ($response) use (&$result, $loop) function is triggered, it actually does the update to the result variable by reference and triggers the "then" function. This would then allow listeners using the "then" function to be triggered immediately. The only thing that's blocked at this point is the original caller who is trying to dereference the future.

In addition to helping to manage this concern using high water marks, some of the worry about this can be allayed using Guzzle's high-level abstractions when you know you're sending a ton of requests: https://github.com/guzzle/guzzle/blob/ring/src/Client.php#L262. This function will block until the number of outstanding responses are completed. I know that this goes against a non-blocking event loop a bit, but I think it's a good tradeoff in terms of simplicity.

I think there could be a tradeoff made here that optimizes for two different cases:

  1. A simpler interface that emulates blocking by running a tick-loop and waiting until the response is read. This is a similar interface to how http-kit works when you dereference a response: http://http-kit.org/client.html#sync. This would be the approach that would allow existing high-level Guzzle abstractions to work.
  2. A more asynchronous interface that does non blocking tick-loop emulation.

@mtdowling
Copy link
Author

I've just updated the gist to move the response completion code into a function and to trigger the "then" function when the response completes.

@WyriHaximus
Copy link

Been thinking how to make this work in an async environment but the then callback already makes it possible. So as long as the user doesn't try to dereference the future $loop->tick() isn't forced and requests aren't waiting on each other 👍 . This contract would work IMHO, for both sync as async environments.

This function will block until the number of outstanding responses are completed.

Blocking or better, putting requests on a queue isn't a bad thing. As long as you don't block the loop 😄 . Did a upload to S3 thing with react a while back. And one of the key things in there was to limit the max number of ongoing requests to S3. (Heck I have no idea what the max is so I put it at 25.)

As for the response completion closure, was going to suggest something similar 👍 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment