Last active
May 27, 2023 11:33
-
-
Save dihjital/6552914cfba7541161050a7c4f64dd3b to your computer and use it in GitHub Desktop.
Simple example of Worker Pool model with PHP pthreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class DirectoryNotFoundException extends Exception { | |
public function __construct($directory) { | |
$message = "Directory not found: " . $directory; | |
parent::__construct($message); | |
} | |
} | |
class Helpers { | |
public static function printMessage(string $message) { | |
echo '[' . date(DATE_RFC2822) . '] ' . $message . PHP_EOL; | |
} | |
public static function checkStatusCode($status, $httpCode): bool { | |
return ($status === null) ? ($httpCode < 400 ? true : false) : ($status == $httpCode ? true : false); | |
} | |
public static function sendMessage($url, $status = null, $wait = 5) { | |
$ch = curl_init(); | |
curl_setopt($ch, CURLOPT_URL, $url); | |
curl_setopt($ch, CURLOPT_HEADER, TRUE); | |
curl_setopt($ch, CURLOPT_NOBODY, TRUE); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE); | |
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $wait); | |
curl_setopt($ch, CURLOPT_TIMEOUT, $wait); | |
$head = curl_exec($ch); | |
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); | |
curl_close($ch); | |
if (!$head) | |
throw new Exception('HTTP request failed'); | |
return self::checkStatusCode($status, $httpCode); | |
} | |
public static function http_build_url($host, $port, $script): string { | |
$url = "http://${host}"; | |
if ($port !== 80) { | |
$url .= ':' . $port; | |
} | |
return "{$url}/${script}"; | |
} | |
public static function scanDirectory($dir): array { | |
$filesInDir = @scandir($dir); | |
if ($filesInDir === false) { | |
throw new DirectoryNotFoundException($dir); | |
} | |
return array_diff($filesInDir, ['..', '.']); | |
} | |
public static function processSms($file_name): array { | |
$text = file_get_contents(DIR.'/'.$file_name); | |
// Get the phone number from the file name and store it in phone_number | |
$phone_number = explode('.', $file_name); | |
// Convert the + sign to 00 | |
$phone_number[0] = preg_replace('/\+/', '00', $phone_number[0]); | |
$url = self::createURL($phone_number[0], $text); | |
return ['url' => $url, 'recipient' => $phone_number[0]]; | |
} | |
public static function createURL($recipient, $text): string { | |
return Helpers::http_build_url(HOST, PORT, SCRIPT) . '?' . | |
http_build_query([ | |
'username' => USERNAME, | |
'password' => PASSWORD, | |
'to' => $recipient, | |
'dir-mask' => DIRMASK, | |
'text' => urlencode($text), | |
]); | |
} | |
} | |
class MyPool extends Pool { | |
public $data = []; | |
private $numTasks = 0; | |
public function submit(Threaded $task): int { | |
$this->numTasks++; | |
return parent::submit($task); | |
} | |
public function process() { | |
// Run this loop as long as we have jobs in the pool | |
while (count($this->data) < $this->numTasks) { | |
$this->collect(function (SmsWorker $task) { | |
// If a task was marked as done, collect its results | |
if ($task->isDone()) { | |
$this->data[] = (object) [ | |
'complete' => $task->complete, | |
'result' => $task->result, | |
'recipient' => $task->recipient | |
]; | |
} | |
return $task->isDone(); | |
}); | |
} | |
// All jobs are done, we can shutdown the pool | |
$this->shutdown(); | |
return $this->data; | |
} | |
} | |
class SmsWorker extends Threaded { | |
private $url; | |
private $recipient; | |
private $file_name; | |
protected $result; | |
protected $complete = false; | |
public function __construct($url, $recipient, $file_name) { | |
$this->url = $url; | |
$this->recipient = $recipient; | |
$this->file_name = $file_name; | |
} | |
public function run() { | |
try { | |
if (Helpers::sendMessage($this->url, '200')) { | |
$this->result = 'SMS successfully sent to '.$this->recipient.' phone number'; | |
unlink($this->dir.'/'.$this->file_name); | |
} else { | |
$this->result = 'Failed to send SMS to '.$this->recipient.' phone number'; | |
} | |
} catch (Exception $e) { | |
$this->result = 'Error: ' . $e->getMessage(); | |
} | |
$this->complete = true; | |
} | |
public function isDone() { | |
return $this->complete; | |
} | |
} | |
class SmsClient { | |
private $maxProcess; | |
private $results = []; | |
public function __construct($maxProcess) { | |
$this->maxProcess = $maxProcess; | |
} | |
public function run() { | |
try { | |
$files = Helpers::scanDirectory(DIR); | |
$pool = new MyPool($this->maxProcess); | |
$widSuffix = 1; | |
foreach ($files as $fileName) { | |
$smsData = Helpers::processSms($fileName); | |
$wid = $pool->submit(new SmsWorker($smsData['url'], $smsData['recipient'], $fileName)); | |
$wid = (string) "${wid}." . $widSuffix++; | |
$this->results[$smsData['recipient']][] = $wid; | |
Helpers::printMessage("SMS client (${wid}) started"); | |
} | |
array_map(function ($task) { | |
Helpers::printMessage('SMS client ('.$this->results[$task->recipient][0].') finished: '.$task->result); | |
$this->results[$task->recipient][] = $task->result; | |
}, $pool->process()); | |
} catch (Exception $e) { | |
Helpers::printMessage('Error: ' . $e->getMessage()); | |
} | |
} | |
} | |
define('HOST', 'host.docker.internal'); | |
define('PORT', 8000); | |
define('SCRIPT', 'random-status'); | |
define('DIRMASK', 31); | |
define('USERNAME', 'username'); | |
define('PASSWORD', 'password'); | |
define('MAXPROCESS', 2); | |
define('DIR', 'messages'); | |
date_default_timezone_set('Europe/Berlin'); | |
$smsClient = new SmsClient(MAXPROCESS); | |
$smsClient->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment