-
-
Save mtdowling/47da5eab54337cdc7ecc to your computer and use it in GitHub Desktop.
<?php | |
require 'vendor/autoload.php'; | |
use GuzzleHttp\Stream\BufferStream; | |
use GuzzleHttp\Stream\AsyncReadStream; | |
use GuzzleHttp\Ring\Future; | |
$loop = React\EventLoop\Factory::create(); | |
$dnsResolverFactory = new React\Dns\Resolver\Factory(); | |
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop); | |
$factory = new React\HttpClient\Factory(); | |
$client = $factory->create($loop, $dnsResolver); | |
function complete_response( | |
$request, | |
$reactRequest, | |
$reactResponse, | |
&$ringResponse, | |
$loop | |
) { | |
$expectedPump = 0; | |
// Note: the user should be able to provide a custom buffer with the | |
// save_to client option. | |
$buffer = new BufferStream(); | |
// Get our decorated buffer (for react) and async stream (for consumer) | |
list($buffer, $async) = AsyncReadStream::create([ | |
'buffer' => $buffer, | |
'drain' => function () use ($reactRequest) { | |
// Resume the request, though I'm not sure how to "pause" it. | |
$reactRequest->handleDrain(); | |
}, | |
'pump' => function ($amount) use ($loop, $buffer, &$expectedPump) { | |
// Block until at least the required amount of data has been | |
// read. | |
echo "Pumping $amount\n"; | |
$expectedPump = $amount; | |
while ($expectedPump > 0) { | |
$loop->tick(); | |
} | |
}, | |
'write' => function ($buffer, $data) use (&$expectedPump, $loop) { | |
echo "Writing data\n"; | |
// handle the pump | |
if ($expectedPump) { | |
$expectedPump -= strlen($data); | |
$expectedPump = max(0, $expectedPump); | |
if ($expectedPump == 0) { | |
echo "Amount has been pumped\n"; | |
} | |
} | |
} | |
]); | |
// todo: Actually return an AsyncStream with a pump and drain function | |
// that helps to regulate how fast react reads from the socket. | |
$reactResponse->on('data', function ($data) use ($buffer) { | |
return $buffer->write($data); | |
}); | |
$ringResponse = [ | |
'headers' => $reactResponse->getHeaders(), | |
'status' => $reactResponse->getCode(), | |
'version' => $reactResponse->getProtocol(), | |
'reason' => $reactResponse->getReasonPhrase(), | |
'body' => $async | |
]; | |
if (isset($request['then'])) { | |
$then = $request['then']; | |
$ringResponse = $then($ringResponse) ?: $ringResponse; | |
} | |
} | |
$adapter = function (array $request) use ($loop, $client) { | |
// This is the value we will track in the callbacks and future | |
$result = null; | |
// todo: Implement more advanced request creation and support for all of | |
// the request and request client options. | |
$reactRequest = $client->request($request['http_method'], $request['url']); | |
// When the response is ready, update the result by reference. | |
$reactRequest->on( | |
'response', | |
function ($reactResponse) use ($request, $reactRequest, &$result, $loop) { | |
complete_response($request, $reactRequest, $reactResponse, $result, $loop); | |
} | |
); | |
$reactRequest->end(); | |
// The future is returned immediately. When accessed or wait() is called, | |
// it will block until the response has finished, but the data has not | |
// not yet been read. | |
return new Future(function() use ($reactRequest, $loop, &$result) { | |
if (!$result) { | |
// Block until it's done | |
do { | |
$loop->tick(); | |
} while (!$result); | |
} | |
return $result; | |
}); | |
}; | |
$response = $adapter([ | |
'http_method' => 'GET', | |
'url' => 'http://www.google.com', | |
'then' => function (array $response) { | |
echo "I'm done!\n"; | |
} | |
]); | |
echo $response['status'] . "\n"; | |
echo $response['reason'] . "\n"; | |
// Do some reads. Because there is a pump function, these are "blocking" (but still done in the event loop). | |
echo $response['body']->read(1024); | |
echo $response['body']->read(1024); | |
echo $response['body']->read(1024); | |
echo $response['body']->read(1024); |
When you create a react based future response, the response is fulfilled in the future when it is dereferenced. When the future is dereferenced, it checks to see if the react response has been completed. If it has not completed, it should run the react event loop until the response is completed or it has failed. "Completed" in this case means that the headers of the response have been received and the body has not necessarily completely been received.
This is what worries me, the rest makes sense. The following bit is completely theoretical. And I might be completely wrong. Heck I hope I'm wrong 😄 .
Running tick
on the eventloop is not without consequence. Normally doing that it means all the callbacks get executed and everything is just rainbows and unicorns. But since the last request starts ticking the loop until it has a response it will block
any thing doing the same thing. Lets walk through a scenario:
- Request A is sent out and starts ticking the loop waiting for a response
- While A is ticking, part of the application is called in a callback and request B is made
- Request B does the same thing ticking the loop until it has a response
- Request A has an answer and sets
$running
to false, but we're now in B's while loop and that will not end until it has a response - Request B has an answer and breaks the loop, goes through the rest of that callback and ends up in A's while loop which immediately breaks and continues the rest of the callback
Now this possible issue is manageable with not much going on but when you have 25 to 250 request or other things going on it will become a bottle neck and a memory hog because things that are waiting to handled.
But since the last request starts ticking the loop until it has a response it will block any thing doing the same thing.
Yes, the future will not return a value until it has one. If along the way, other responses were fulfilled, then those will be completed first, delaying the originally requested future from returning. This would also be the case for trying to read from an emulated blocking stream using a pump.
This is actually how the CurlMulti adapter works, except in that adapter, you can specify a maximum limit on the number of concurrent requests. When the threshold is exceeded, it will block until it is at an acceptable level. The CurlMulti adapter also writes data to PHP temp streams, which can help protect you from running out of memory by switching to on-disk storage. You can actually do the same thing with the AsyncReadStream by supplying a Stream object that wraps a php://temp stream. I also just added the ability to provide custom metadata to streams so that the temp stream could also provide a high water mark, telling writers to slow down because the stream is too full.
The CurlAdapter also utilizes the "then" request option which is triggered the instant a response or error is received. This same behavior should be implemented in the react adapter so that when the $req->on('response', function ($response) use (&$result, $loop)
function is triggered, it actually does the update to the result variable by reference and triggers the "then" function. This would then allow listeners using the "then" function to be triggered immediately. The only thing that's blocked at this point is the original caller who is trying to dereference the future.
In addition to helping to manage this concern using high water marks, some of the worry about this can be allayed using Guzzle's high-level abstractions when you know you're sending a ton of requests: https://github.com/guzzle/guzzle/blob/ring/src/Client.php#L262. This function will block until the number of outstanding responses are completed. I know that this goes against a non-blocking event loop a bit, but I think it's a good tradeoff in terms of simplicity.
I think there could be a tradeoff made here that optimizes for two different cases:
- A simpler interface that emulates blocking by running a tick-loop and waiting until the response is read. This is a similar interface to how http-kit works when you dereference a response: http://http-kit.org/client.html#sync. This would be the approach that would allow existing high-level Guzzle abstractions to work.
- A more asynchronous interface that does non blocking tick-loop emulation.
I've just updated the gist to move the response completion code into a function and to trigger the "then" function when the response completes.
Been thinking how to make this work in an async environment but the then
callback already makes it possible. So as long as the user doesn't try to dereference the future $loop->tick()
isn't forced and requests aren't waiting on each other 👍 . This contract would work IMHO, for both sync as async environments.
This function will block until the number of outstanding responses are completed.
Blocking or better, putting requests on a queue isn't a bad thing. As long as you don't block the loop 😄 . Did a upload to S3 thing with react a while back. And one of the key things in there was to limit the max number of ongoing requests to S3. (Heck I have no idea what the max is so I put it at 25.)
As for the response completion closure, was going to suggest something similar 👍 .
Here are some thoughts on this approach and more rationale. This is a follow up to the twitter discussion here: https://twitter.com/mtdowling/status/508369541315842048
When you create a react based future response, the response is fulfilled in the future when it is dereferenced. When the future is dereferenced, it checks to see if the react response has been completed. If it has not completed, it should run the react event loop until the response is completed or it has failed. "Completed" in this case means that the headers of the response have been received and the body has not necessarily completely been received.
The response contains a completely asynchronous body, or a body that emulates synchronous blocking until the requested amount of data read from the socket is received (or "pumped"). The asynchronous body is controlled via the
GuzzleHttp\Stream\AsyncReadStream
. When not provided a "pump" in the constructor, this stream is non-blocking and returns only the data that has been buffered. When provided a pump, it will run the event loop until the desired amount of data has been received, the stream has been closed, or an error occurs. Once the data has been collected into the buffer (GuzzleHttp\Stream\BufferStream
) of the AsyncReadStream, it is returned. The Guzzle-React bindings should have some option for users to turn the pump behavior on or off. I'd prefer that it default to "on" so that React behaves the same way as the other adapters, and users need to opt-in to a completely asynchronous adapter. This provides a consistent interface to up-stream consumers and does not expose the async goop.During any of the "blocking" event loop runs, data will likely be received and written to the buffers of the responses associated with other requests that are being sent concurrently. These buffers will continue to buffer the response data, but once their "high water mark" is hit (a threshold at which the buffer begins warning that it is too full), the buffer will begin to return false when written to. At this point, the Guzzle-react bindings should tell React to cease writing data to that particular buffer and to no longer read from the remote stream to apply backpressure up-stream at the TCP level (assuming I know what the word means). When the buffer is finally read from and the size of the buffer goes back to a reasonable size below the high water mark, the buffer's provided "drain" function is invoked, which tells React to once again begin reading from the response stream and writing data to the buffer.