Skip to content

Instantly share code, notes, and snippets.

@mmoreram
Last active April 27, 2023 16:07
Show Gist options
  • Save mmoreram/2640d6c99685e405a24d09b0a00f45d9 to your computer and use it in GitHub Desktop.
Save mmoreram/2640d6c99685e405a24d09b0a00f45d9 to your computer and use it in GitHub Desktop.
First view on local storage middleware based ReactPHP stream download

Motivation

Working with large streamed files can be a very heavy consumption, basically because if you process slow and you download fast, you must provide, at least, the total size of the downloaded file in terms of memory. And this could turn a very expensive process.

What if, instead of storing the whole "waiting list" in memory, we store it in our locally SSD?

  • You open a streamed download of the feed, and you save it into a local socket file
  • When you finish the stream, you append an END_TOKEN value into the file.
  • You open a local file and read from it as soon as there is something to read
  • When you read a partial content that ends with END_TOKEN, you're done.
  • You remove the socket file

This is only for testing purposes. No tested on production.

Usage

  • Add your own feed file (no matter the size. The bigger, the better)
  • php index.php

In your console, each 1 second, you will se the memory consumed by the PHP process (both the regular and the real) in MB. You will see as well a x character once you read some content in your stream, and a 'LAST' token when you are done reading the file.

<?php
namespace App;
use React\EventLoop\Loop;
use React\Filesystem\Factory;
use React\Http\Browser;
require "vendor/autoload.php";
require "StreamedFile.php";
/*
* Fill here your feed URL
*/
$fileUrl = 'https://your-file.jsonl';
$filesystem = Factory::create();
$browser = new Browser();
$streamedFile = new StreamedFile($filesystem, $browser);
$bytes = 0;
$stream = $streamedFile->get($fileUrl);
$stream->on('data', function(string $data) use (&$bytes) {
echo 'x';
// Emulation of a heavy blocking process here.
usleep(50000);
$bytes += strlen($data);
});
$stream->on('close', function() use (&$bytes) {
echo PHP_EOL . "Readed $bytes from source";
});
Loop::addPeriodicTimer(1, function() {
echo '[Mem] ' . (intval(memory_get_usage() / 1024 / 2014)) . ' - ' . (intval(memory_get_usage(true) / 1024 / 2014)) . PHP_EOL;
});
Loop::run();
<?php
namespace App;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\Loop;
use React\EventLoop\TimerInterface;
use React\Filesystem\AdapterInterface;
use React\Http\Browser;
use React\Stream\ReadableStreamInterface;
use React\Stream\ThroughStream;
class StreamedFile
{
const END_STREAM_TOKEN = '---END---';
private AdapterInterface $filesystem;
private Browser $browser;
public function __construct(
AdapterInterface $filesystem,
Browser $browser
)
{
$this->filesystem = $filesystem;
$this->browser = $browser;
}
public function get(string $url) : ReadableStreamInterface
{
$socketFileName = '/tmp/file-middleware-' . rand(1, 999999999999999). '.sock';
touch($socketFileName);
$fileForWriting = $this->filesystem->file($socketFileName);
$fileForReading = $this->filesystem->file($socketFileName);
$stream = new ThroughStream();
$this
->browser
->requestStreaming('GET', $url)
->then(function(ResponseInterface $response) use ($fileForWriting, &$contentToRead) {
$stream = $response->getBody();
$stream->on('data', function(string $data) use ($fileForWriting, &$contentToRead) {
$contentToRead = $contentToRead + strlen($data);
$fileForWriting->putContents($data, \FILE_APPEND);
});
$stream->on('close', function() use ($fileForWriting, &$contentToRead) {
$contentToRead = $contentToRead + strlen(self::END_STREAM_TOKEN);
$fileForWriting->putContents(self::END_STREAM_TOKEN, \FILE_APPEND);
});
}, function (\Throwable $e) {
// echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
$offset = 0;
Loop::addPeriodicTimer(.1, function (TimerInterface $timer) use ($fileForReading, &$offset, &$contentToRead, $stream, $socketFileName): void {
if ($contentToRead === 0) {
return;
}
// echo '[Con] ' . $contentToRead . PHP_EOL;
$fileForReading->getContents($offset, 8192)->then(function (string $contents) use (&$offset, $timer, &$contentToRead, $stream, $socketFileName): void {
// echo '[Dat] ' . strlen($contents) . PHP_EOL;
// echo '[Off] ' . $offset . PHP_EOL;
// That emulates 1ms blocking work (for transformations, for example)
$stream->write($contents);
$contentToRead = $contentToRead - strlen($contents);
$offset = $offset + strlen($contents);
$isLast = str_ends_with($contents, self::END_STREAM_TOKEN);
if ($isLast) {
var_dump('LAST');
Loop::cancelTimer($timer);
$stream->end();
$stream->close();
unlink($socketFileName);
}
});
});
return $stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment