Skip to content

Instantly share code, notes, and snippets.

@dihjital
Last active May 26, 2023 08:41
Show Gist options
  • Save dihjital/6ecacb3101edab2cd5144874a82e15de to your computer and use it in GitHub Desktop.
Save dihjital/6ecacb3101edab2cd5144874a82e15de to your computer and use it in GitHub Desktop.
SmsClient with parallel multi channel processing
<?php
require_once "vendor/autoload.php";
use App\Helper\Helpers;
use parallel\{Runtime, Channel};
Class SmsClient {
protected $taskChannel;
protected $resultChannel;
protected int $maxWorkers = 5;
public function __construct($maxWorkers) {
// Define the maximum nomber of workers in our pool
$this->maxWorkers = $maxWorkers;
// Create a buffered (non-blocking) channel for sending tasks and an other one for receiving results
$this->taskChannel = Channel::make('taskChannel', $maxWorkers);
$this->resultChannel = Channel::make('resultChannel', $maxWorkers);
}
// Create a function that sends an SMS to a recipient using a URL
public static function sendSms($url, $recipient, $file_name): string {
try {
$recipient = str_shuffle($recipient);
if (Helpers::sendMessage($url, '200')) {
$result = 'SMS successfully sent to '.$recipient.' phone number';
unlink(DIR.'/'.$file_name);
} else {
$result = 'Failed to send SMS to '.$recipient.' phone number';
}
} catch (Exception $e) {
$result = 'Error: ' . $e->getMessage();
}
sleep(array_rand(range(1, 5)));
return $result;
}
protected function createTasks($files): array {
return array_map(function ($fileName) {
list('url' => $url, 'recipient' => $recipient) = Helpers::processSms($fileName);
return [($this->sendSms(...)),[
$url,
$recipient,
$fileName
]]; // pass the arguments as an array
}, $files);
}
public function createTaskProcessor(): closure {
return
function($workerId, $taskChannel, $resultChannel) {
list($closure, $args) = $taskChannel->recv();
// Execute the task and send the result
$result = ($closure)(...$args);
$resultChannel->send("(${workerId}) ${result}");
};
}
public function run() {
$tasks = $this->createTasks(Helpers::scanDirectory(DIR));
$futures = []; // Keep track of task executors
$workers = []; // Keep track of the worker pool
foreach ($tasks as $index => $task) {
$workerIndex = $index % $this->maxWorkers; // Calculate the worker index
$rt = !isset($workers[$workerIndex])
? new Runtime('bootstrap.php')
: $workers[$workerIndex];
$future = $rt->run($this->createTaskProcessor(), [$workerIndex, $this->taskChannel, $this->resultChannel]);
$futures[] = $future;
($index < $this->maxWorkers) && $workers[] = $rt;
}
// This is non-blocking
foreach($tasks as $task) {
$this->taskChannel->send($task);
}
$this->taskChannel->close();
// This is blocking, therefore we move it from the sending part ...
// We have to receive a result for each message in our queue ...
foreach($tasks as $task) {
Helpers::printMessage($this->resultChannel->recv());
}
$this->resultChannel->close();
// Runtime graceful join for the worker pool
array_map(function ($worker) {
$worker->close();
}, $workers);
}
}
// Define some constants
define('HOST', 'host.docker.internal');
define('PORT', 8000);
define('SCRIPT', 'random-status');
define('DIRMASK', 31);
define('USERNAME', 'username');
define('PASSWORD', 'password');
define('MAXWORKERS', 4);
define('DIR', 'messages');
// Set the timezone
date_default_timezone_set('Europe/Berlin');
(new SmsClient(MAXWORKERS))->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment