Skip to content

Instantly share code, notes, and snippets.

@jsor
Last active April 27, 2018 07:12
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jsor/4135177 to your computer and use it in GitHub Desktop.
Save jsor/4135177 to your computer and use it in GitHub Desktop.
Async MySQL Client
<?php
namespace Jsor\MysqlAsync;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
class Connection
{
private $loop;
private $parameters;
private $mysqli;
private $jobs;
private $connectDeferred;
public function __construct(LoopInterface $loop, array $parameters = array(), array $options = array())
{
$this->loop = $loop;
$this->parameters = $parameters;
$this->jobs = new \SplQueue();
$this->mysqli = mysqli_init();
foreach ($options as $name => $option) {
$this->mysqli->options($name, $option);
}
}
public function connect()
{
if ($this->connectDeferred) {
return $this->connectDeferred->promise();
}
$this->connectDeferred = new Deferred();
$host = isset($this->parameters['host']) ? $this->parameters['host'] : null;
$username = isset($this->parameters['username']) ? $this->parameters['username'] : null;
$password = isset($this->parameters['password']) ? $this->parameters['password'] : null;
$dbname = isset($this->parameters['dbname']) ? $this->parameters['dbname'] : null;
$port = isset($this->parameters['port']) ? $this->parameters['port'] : null;
$socket = isset($this->parameters['socket']) ? $this->parameters['socket'] : null;
@$this->mysqli->real_connect($host, $username, $password, $dbname, $port, $socket);
if ($this->mysqli->connect_error) {
$this->connectDeferred->reject($this->mysqli->connect_error);
} else {
$this->connectDeferred->resolve($this);
}
return $this->connectDeferred->promise();
}
public function query($sql)
{
$job = (object) array(
'type' => 'query',
'sql' => $sql
);
return $this
->connect()
->then(function ($connection) use ($job) {
return $connection->enqueue($job);
});
}
public function end()
{
$job = (object) array(
'type' => 'close'
);
return $this
->connect()
->then(function ($connection) use ($job) {
return $connection->enqueue($job);
});
}
public function enqueue($job)
{
$deferred = new Deferred();
$job->resolver = $deferred;
$this->jobs->enqueue($job);
if (1 === count($this->jobs)) {
$this->start();
}
return $deferred->promise();
}
protected function start()
{
if ($this->jobs->isEmpty()) {
return;
}
$job = $this->jobs->bottom();
switch ($job->type) {
case 'query':
$this->mysqli->query($job->sql, \MYSQLI_ASYNC);
$this->poll();
break;
case 'close':
if ($this->mysqli->close()) {
$job->resolver->resolve();
} else {
$job->resolver->reject(mysqli_error($this->mysqli));
}
$this->dequeue();
break;
}
}
public function poll()
{
$links = $errors = $reject = array($this->mysqli);
if (0 === mysqli_poll($links, $errors, $reject, 1)) {
$this->loop->addTimer(0.001, array($this, 'poll'));
return;
}
$job = $this->jobs->bottom();
$result = $links[0]->reap_async_query();
if ($result) {
$job->resolver->resolve($result);
} else {
$job->resolver->reject(mysqli_error($links[0]));
}
$this->dequeue();
}
protected function dequeue()
{
$this->jobs->dequeue();
if ($this->jobs->isEmpty()) {
//$this->emit('drain');
} else {
$this->start();
}
}
}
<?php
include 'vendor/autoload.php';
$loop = React\EventLoop\Factory::create();
$connection = new Jsor\MysqlAsync\Connection($loop, array(
'host' => 'localhost',
'username' => 'root',
'password' => '',
'dbname' => 'test'
));
$errorCallback = function ($error) {
echo "ERROR\n";
print_r($error);
echo "\n";
};
$connection
->connect()
->then(function($result) {
echo "CONNECT\n";
}, $errorCallback);
$connection
->query("select 'query1', sleep(5)")
->then(function($result) {
echo "RESULT1\n";
print_r($result->fetch_row());
}, $errorCallback);
echo "STEP1\n";
$connection
->query("SELECT 'query2'")
->then(function($result) {
echo "RESULT2\n";
print_r($result->fetch_row());
}, $errorCallback);
echo "STEP2\n";
$connection
->end()
->then(function() {
echo "END\n";
}, $errorCallback);
$loop->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment