Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
use Amp\Loop;
use Amp\Mysql\ConnectionConfig;
use Amp\Mysql\Pool;
use Amp\Mysql\ResultSet;
use Amp\Mysql\TimeoutConnector;
use Amp\Socket\ClientTlsContext;
use Illuminate\Support\Collection;
class AsyncMySql {
private const DEFAULT_MAX_CONNECTIONS = 20;
private const ASYNC_RETRIES = 10;
static private $pools = [];
public function perform(array $queries, int $maxConnections = self::DEFAULT_MAX_CONNECTIONS) : void {
// this fixes a bug in amphp mysql, if we have more queries than connections, the pool just skips
// count($maxConnections) - count($queries) queries...
foreach(array_chunk($queries, $maxConnections) as $chunk) {
$this->performAsyncWithRetry($chunk, $maxConnections);
}
}
private function performAsyncWithRetry(array $asyncMysqlQueries, int $maxConnections) : void {
$retries = self::ASYNC_RETRIES;
$lastErrorCount = 0;
$keyToException = $asyncMysqlQueries; //initial exception array to perform all queries
while(true) {
$keyToQuery = [];
foreach($keyToException as $key => $notUsedYet) {
$keyToQuery[$key] = $asyncMysqlQueries[$key];
}
$keyToException = [];
$this->performAsync($keyToQuery, $keyToException, $maxConnections);
if(!empty($keyToException)) {
if($retries > 0) {
$count = count($keyToException);
if($lastErrorCount === $count) {
// we only want to reduce counter if we only got errors. otherwise there is a chance to fulfill
// the requests
$retries--;
}
$lastErrorCount = $count;
Logger::logWithNewLine("$retries retries left for $count async queries...");
$sleep = self::ASYNC_RETRIES - ($retries + 2);
if($sleep > 0) {
SleepUtils::sleepRandom(0, $sleep, ' -- Sleeping <sec> seconds before retry...');
}
continue;
}
$e = array_first($keyToException); //just take first, we don't care about others.
throw new AsyncMysqlException('Retry able async mysql failed!', 0, $e);
}
break;
}
}
private function performAsync(array $asyncMysqlQueries, array &$keyToException, int $maxConnections) : void {
Loop::run(function() use ($asyncMysqlQueries, &$keyToException, $maxConnections) {
$promises = [];
/** @var AsyncMySqlQuery $asyncMysqlQuery */
foreach($asyncMysqlQueries as $key => $asyncMysqlQuery) {
$pool = $this->getPool($asyncMysqlQuery, $maxConnections);
$promises[$key] = $pool->execute($asyncMysqlQuery->getQuery(), $asyncMysqlQuery->getParams());
}
foreach($promises as $key => $promis) {
try {
/** @var ResultSet $result */
$result = yield $promis;
} catch(\Throwable $e) {
$this->handleAsyncException($asyncMysqlQueries, $e, $key, $keyToException);
continue;
}
/** @var AsyncMySqlQuery $asyncMysqlQuery */
$asyncMysqlQuery = $asyncMysqlQueries[$key];
$function = $asyncMysqlQuery->getCallable();
if($function !== null) {
$collection = new Collection();
while(yield $result->advance()) {
$var = $result->getCurrent();
$collection->push((object)$var);
}
$function($collection, $asyncMysqlQuery->getUserData());
}
}
});
}
private function handleAsyncException(array $asyncMysqlQueries, \Throwable $e,
$key, array &$keyToException) : void {
/** @var AsyncMySqlQuery $asyncMysqlQuery */
$asyncMysqlQuery = $asyncMysqlQueries[$key];
if($asyncMysqlQuery->isRetryAble() && $this->isRetryThrowable($e)) {
$keyToException[$key] = $e;
return;
}
throw new AsyncMysqlException('Not retry able async mysql failed!', 0, $e);
}
private function getPool(AsyncMySqlQuery $asyncMysqlQuery, int $maxConnections) : Pool {
$dbConfig = $asyncMysqlQuery->getDbConfig();
$key = $maxConnections . ' ' . md5(\GuzzleHttp\json_encode($dbConfig));
$pool = self::$pools[$key] ?? null;
if($pool === null) {
$sslOption = null;
if(isset($dbConfig['options'][\PDO::MYSQL_ATTR_SSL_CA])) {
$sslOption = (new ClientTlsContext())
->withCaFile($dbConfig['options'][\PDO::MYSQL_ATTR_SSL_CA]);
}
$host = $dbConfig['host'];
$port = $dbConfig['port'];
$user = $dbConfig['username'];
$pass = $dbConfig['password'];
$db = $dbConfig['database'];
$timeout = $dbConfig['options'][\PDO::ATTR_TIMEOUT] ?? 30000;
$config =
ConnectionConfig::parseConnectionString("host=$host:$port;user=$user;pass=$pass;db=$db", $sslOption)
->withCharset($dbConfig['charset'], $dbConfig['collation']);
$pool = new Pool($config, $maxConnections, new TimeoutConnector($timeout));
self::$pools[$key] = $pool;
}
return $pool;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment