Skip to content

Instantly share code, notes, and snippets.

@kelunik
Created March 11, 2018 13:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kelunik/f49bc3c8874ed1f77b9181b39f47e9f9 to your computer and use it in GitHub Desktop.
Save kelunik/f49bc3c8874ed1f77b9181b39f47e9f9 to your computer and use it in GitHub Desktop.
<?php
use Amp\Emitter;
use Amp\Iterator;
use Amp\Promise;
class CsvReader implements Iterator {
/** @var EventEmitter */
private $eventEmitter;
/** @var Emitter|null */
private $emitter;
private $iterator;
public function __construct(EventEmitter $eventEmitter) {
$this->eventEmitter = $eventEmitter;
$this->emitter = new Emitter;
$this->iterator = $emitter->iterate();
if ($stream->isReadable()) {
$this->attachHandlers();
} else {
$this->emitter->complete();
}
}
private function attachHandlers() {
$this->reactStream->on("data", function (string $chunk) {
$this->reactStream->pause();
$this->emitter->emit($chunk)->onResolve(function () {
$this->reactStream->resume();
});
});
$this->reactStream->on("end", function () {
if ($this->emitter) {
$emitter = $this->emitter;
$this->emitter = null;
$emitter->complete();
}
});
$this->reactStream->on("error", function (\Throwable $error) {
if ($this->emitter) {
$emitter = $this->emitter;
$this->emitter = null;
$emitter->fail($error);
}
});
// Catches any streams that neither emit "end" nor "error", e.g. by being explicitly closed
$this->reactStream->on("close", function () {
if ($this->emitter) {
$emitter = $this->emitter;
$this->emitter = null;
$emitter->complete();
}
});
}
/** @inheritdoc */
public function advance(): Promise {
return $this->iterator->advance();
}
/** @inheritdoc */
public function getCurrent() {
return $this->iterator->getCurrent();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment