Last active
May 26, 2023 08:41
-
-
Save dihjital/6ecacb3101edab2cd5144874a82e15de to your computer and use it in GitHub Desktop.
SmsClient with parallel multi channel processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require_once "vendor/autoload.php"; | |
use App\Helper\Helpers; | |
use parallel\{Runtime, Channel}; | |
Class SmsClient { | |
protected $taskChannel; | |
protected $resultChannel; | |
protected int $maxWorkers = 5; | |
public function __construct($maxWorkers) { | |
// Define the maximum nomber of workers in our pool | |
$this->maxWorkers = $maxWorkers; | |
// Create a buffered (non-blocking) channel for sending tasks and an other one for receiving results | |
$this->taskChannel = Channel::make('taskChannel', $maxWorkers); | |
$this->resultChannel = Channel::make('resultChannel', $maxWorkers); | |
} | |
// Create a function that sends an SMS to a recipient using a URL | |
public static function sendSms($url, $recipient, $file_name): string { | |
try { | |
$recipient = str_shuffle($recipient); | |
if (Helpers::sendMessage($url, '200')) { | |
$result = 'SMS successfully sent to '.$recipient.' phone number'; | |
unlink(DIR.'/'.$file_name); | |
} else { | |
$result = 'Failed to send SMS to '.$recipient.' phone number'; | |
} | |
} catch (Exception $e) { | |
$result = 'Error: ' . $e->getMessage(); | |
} | |
sleep(array_rand(range(1, 5))); | |
return $result; | |
} | |
protected function createTasks($files): array { | |
return array_map(function ($fileName) { | |
list('url' => $url, 'recipient' => $recipient) = Helpers::processSms($fileName); | |
return [($this->sendSms(...)),[ | |
$url, | |
$recipient, | |
$fileName | |
]]; // pass the arguments as an array | |
}, $files); | |
} | |
public function createTaskProcessor(): closure { | |
return | |
function($workerId, $taskChannel, $resultChannel) { | |
list($closure, $args) = $taskChannel->recv(); | |
// Execute the task and send the result | |
$result = ($closure)(...$args); | |
$resultChannel->send("(${workerId}) ${result}"); | |
}; | |
} | |
public function run() { | |
$tasks = $this->createTasks(Helpers::scanDirectory(DIR)); | |
$futures = []; // Keep track of task executors | |
$workers = []; // Keep track of the worker pool | |
foreach ($tasks as $index => $task) { | |
$workerIndex = $index % $this->maxWorkers; // Calculate the worker index | |
$rt = !isset($workers[$workerIndex]) | |
? new Runtime('bootstrap.php') | |
: $workers[$workerIndex]; | |
$future = $rt->run($this->createTaskProcessor(), [$workerIndex, $this->taskChannel, $this->resultChannel]); | |
$futures[] = $future; | |
($index < $this->maxWorkers) && $workers[] = $rt; | |
} | |
// This is non-blocking | |
foreach($tasks as $task) { | |
$this->taskChannel->send($task); | |
} | |
$this->taskChannel->close(); | |
// This is blocking, therefore we move it from the sending part ... | |
// We have to receive a result for each message in our queue ... | |
foreach($tasks as $task) { | |
Helpers::printMessage($this->resultChannel->recv()); | |
} | |
$this->resultChannel->close(); | |
// Runtime graceful join for the worker pool | |
array_map(function ($worker) { | |
$worker->close(); | |
}, $workers); | |
} | |
} | |
// Define some constants | |
define('HOST', 'host.docker.internal'); | |
define('PORT', 8000); | |
define('SCRIPT', 'random-status'); | |
define('DIRMASK', 31); | |
define('USERNAME', 'username'); | |
define('PASSWORD', 'password'); | |
define('MAXWORKERS', 4); | |
define('DIR', 'messages'); | |
// Set the timezone | |
date_default_timezone_set('Europe/Berlin'); | |
(new SmsClient(MAXWORKERS))->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment