Skip to content

Instantly share code, notes, and snippets.

@Nek-
Created December 5, 2019 22:46
Show Gist options
  • Save Nek-/ae895c30e7da22b0d8bc3e17baf25fc6 to your computer and use it in GitHub Desktop.
Save Nek-/ae895c30e7da22b0d8bc3e17baf25fc6 to your computer and use it in GitHub Desktop.
<?php
// Utilisation d'une classe Task pour clarifier mon code précédent à base de tableaux
class Task
{
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine)
{
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId()
{
return $this->taskId;
}
public function setSendValue($sendValue)
{
$this->sendValue = $sendValue;
}
public function run()
{
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished()
{
return !$this->coroutine->valid();
}
}
// On retrouve notre scheduler qui embarque toutes les tasks
class Scheduler
{
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
// Tableaux indexé par l'id des streams et contenant les tasks à exécuter pour chacun
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function __construct()
{
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine)
{
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task)
{
$this->taskQueue->enqueue($task);
}
public function run()
{
// Task spéciale, elle s'occupe des temps d'attente avec la fameuse fonction select_stream
$this->newTask($this->ioPollTask());
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
// L'utilisation de la classe SystemCall (définie plus bas) permet d'avoir un retour standard
// de nos générateurs et surtout, leur permet de communiquer avec le scheduler
// on va pourvoir ajouter une autre task depuis une task, ou attendre un stream etc...
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
public function waitForRead($socket, Task $task)
{
if (isset($this->waitingForRead[(int) $socket])) {
$this->waitingForRead[(int) $socket][1][] = $task;
} else {
$this->waitingForRead[(int) $socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task)
{
if (isset($this->waitingForWrite[(int) $socket])) {
$this->waitingForWrite[(int) $socket][1][] = $task;
} else {
$this->waitingForWrite[(int) $socket] = [$socket, [$task]];
}
}
// Fonction qui permet l'attente et re-schedule les tasks qui ont eu une activité de stream !
protected function ioPoll($timeout)
{
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int) $socket];
unset($this->waitingForRead[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int) $socket];
unset($this->waitingForWrite[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
protected function ioPollTask()
{
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
}
// Classe proxy qui permet à la task et au scheduler de communiquer
class SystemCall
{
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback; // Can't call it directly in PHP :/
return $callback($task, $scheduler);
}
}
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function waitForRead($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForRead($socket, $task);
}
);
}
function waitForWrite($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForWrite($socket, $task);
}
);
}
// Nous arrivons à la partie "simple" et le coeur de ce que nous voulons faire: le serveur http.
// Cette fois ci nous utilisons un ensemble de classes relativement complexe pour arriver à nos fins
// mais c'est plus élégant à l'utilisation. Voyez vous mêmes.
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket, 0);
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket, 8192);
$msg = "<h1>Reponse asynchrone</h1>\n" . time();
$response = "HTTP/1.1 200 OK\r\n" .
"Date: " . gmdate('D, d M Y H:i:s T') . "\r\n" .
"Connection: close\r\n" .
"Content-Type: text/html\r\n" .
"Content-Length: ". strlen($msg). "\r\n" .
"\r\n" .
$msg . "\r\n";
yield waitForWrite($socket);
fwrite($socket, $response);
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment