Skip to content

Instantly share code, notes, and snippets.

@dihjital
Last active May 27, 2023 11:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dihjital/6552914cfba7541161050a7c4f64dd3b to your computer and use it in GitHub Desktop.
Save dihjital/6552914cfba7541161050a7c4f64dd3b to your computer and use it in GitHub Desktop.
Simple example of Worker Pool model with PHP pthreads
<?php
class DirectoryNotFoundException extends Exception {
public function __construct($directory) {
$message = "Directory not found: " . $directory;
parent::__construct($message);
}
}
class Helpers {
public static function printMessage(string $message) {
echo '[' . date(DATE_RFC2822) . '] ' . $message . PHP_EOL;
}
public static function checkStatusCode($status, $httpCode): bool {
return ($status === null) ? ($httpCode < 400 ? true : false) : ($status == $httpCode ? true : false);
}
public static function sendMessage($url, $status = null, $wait = 5) {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HEADER, TRUE);
curl_setopt($ch, CURLOPT_NOBODY, TRUE);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $wait);
curl_setopt($ch, CURLOPT_TIMEOUT, $wait);
$head = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if (!$head)
throw new Exception('HTTP request failed');
return self::checkStatusCode($status, $httpCode);
}
public static function http_build_url($host, $port, $script): string {
$url = "http://${host}";
if ($port !== 80) {
$url .= ':' . $port;
}
return "{$url}/${script}";
}
public static function scanDirectory($dir): array {
$filesInDir = @scandir($dir);
if ($filesInDir === false) {
throw new DirectoryNotFoundException($dir);
}
return array_diff($filesInDir, ['..', '.']);
}
public static function processSms($file_name): array {
$text = file_get_contents(DIR.'/'.$file_name);
// Get the phone number from the file name and store it in phone_number
$phone_number = explode('.', $file_name);
// Convert the + sign to 00
$phone_number[0] = preg_replace('/\+/', '00', $phone_number[0]);
$url = self::createURL($phone_number[0], $text);
return ['url' => $url, 'recipient' => $phone_number[0]];
}
public static function createURL($recipient, $text): string {
return Helpers::http_build_url(HOST, PORT, SCRIPT) . '?' .
http_build_query([
'username' => USERNAME,
'password' => PASSWORD,
'to' => $recipient,
'dir-mask' => DIRMASK,
'text' => urlencode($text),
]);
}
}
class MyPool extends Pool {
public $data = [];
private $numTasks = 0;
public function submit(Threaded $task): int {
$this->numTasks++;
return parent::submit($task);
}
public function process() {
// Run this loop as long as we have jobs in the pool
while (count($this->data) < $this->numTasks) {
$this->collect(function (SmsWorker $task) {
// If a task was marked as done, collect its results
if ($task->isDone()) {
$this->data[] = (object) [
'complete' => $task->complete,
'result' => $task->result,
'recipient' => $task->recipient
];
}
return $task->isDone();
});
}
// All jobs are done, we can shutdown the pool
$this->shutdown();
return $this->data;
}
}
class SmsWorker extends Threaded {
private $url;
private $recipient;
private $file_name;
protected $result;
protected $complete = false;
public function __construct($url, $recipient, $file_name) {
$this->url = $url;
$this->recipient = $recipient;
$this->file_name = $file_name;
}
public function run() {
try {
if (Helpers::sendMessage($this->url, '200')) {
$this->result = 'SMS successfully sent to '.$this->recipient.' phone number';
unlink($this->dir.'/'.$this->file_name);
} else {
$this->result = 'Failed to send SMS to '.$this->recipient.' phone number';
}
} catch (Exception $e) {
$this->result = 'Error: ' . $e->getMessage();
}
$this->complete = true;
}
public function isDone() {
return $this->complete;
}
}
class SmsClient {
private $maxProcess;
private $results = [];
public function __construct($maxProcess) {
$this->maxProcess = $maxProcess;
}
public function run() {
try {
$files = Helpers::scanDirectory(DIR);
$pool = new MyPool($this->maxProcess);
$widSuffix = 1;
foreach ($files as $fileName) {
$smsData = Helpers::processSms($fileName);
$wid = $pool->submit(new SmsWorker($smsData['url'], $smsData['recipient'], $fileName));
$wid = (string) "${wid}." . $widSuffix++;
$this->results[$smsData['recipient']][] = $wid;
Helpers::printMessage("SMS client (${wid}) started");
}
array_map(function ($task) {
Helpers::printMessage('SMS client ('.$this->results[$task->recipient][0].') finished: '.$task->result);
$this->results[$task->recipient][] = $task->result;
}, $pool->process());
} catch (Exception $e) {
Helpers::printMessage('Error: ' . $e->getMessage());
}
}
}
define('HOST', 'host.docker.internal');
define('PORT', 8000);
define('SCRIPT', 'random-status');
define('DIRMASK', 31);
define('USERNAME', 'username');
define('PASSWORD', 'password');
define('MAXPROCESS', 2);
define('DIR', 'messages');
date_default_timezone_set('Europe/Berlin');
$smsClient = new SmsClient(MAXPROCESS);
$smsClient->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment