Created
September 3, 2015 15:57
-
-
Save mebjas/d4913309cd046899f6e1 to your computer and use it in GitHub Desktop.
simulation of vent sink worker model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$TMP = $argv[2]; | |
$no_of_files = intval($argv[1]); | |
function logln($m) { | |
global $TMP; | |
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND); | |
} | |
logln("[sink] started with pid = " .getmypid()); | |
logln("[sink] will be expecting to recieve $no_of_files files"); | |
$context = new ZMQContext(); | |
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); | |
$receiver->bind("ipc://$TMP/workertosink001"); | |
$files_to_recieve = $no_of_files; | |
while ($files_to_recieve > 0) { | |
$string = $receiver->recv(); | |
logln("[sink] recieved ack from file_id = " .$string); | |
if ($string != 'FAIL') { | |
// TODO - take some method | |
} | |
--$files_to_recieve; | |
} | |
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); | |
$sender->connect("ipc://$TMP/sinktovent001"); | |
$sender->send('ACK'); | |
logln('[sink] ACK sent to vent'); | |
exit; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$tstart = microtime(true); | |
$no_of_files = 43; | |
$taskid = 101; | |
$checksum = 'abcdefg'; | |
// Create a tmp directory to hold the ipcs | |
$TMP = $taskid .$checksum; | |
if (!file_exists($TMP)) mkdir($TMP); | |
function logln($m) { | |
global $TMP; | |
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND); | |
} | |
logln("-----------------------------------------------------------------"); | |
// Get file ids in memory | |
$files = array(); | |
for ($i = 1; $i <= $no_of_files; $i++) $files[] = $i; | |
// Start a sink with knowlege of no of files that are supposed to be todofied | |
// sink should listen to workers now. After getting all sink should talk back to | |
// this file so that init moves on | |
logln("[init] calling sink process to make sure it supervises processing "); | |
exec("php sink.php $no_of_files $TMP > $TMP/logs 2>&1 & echo $!", $op); | |
// Start n workers, store their process IDs. | |
$workers = 9; | |
for ($i = 0; $i < $workers; $i++) { | |
$op = exec("php worker.php $taskid $checksum > $TMP/logs 2>&1 & echo $!", $op); | |
} | |
// Distribute the task and wait to listen from | |
$context = new ZMQContext(); | |
// Socket to send messages on | |
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); | |
$sender->bind("ipc://$TMP/venttoworker001"); | |
// // The first message is "0" and signals start of batch | |
// $sender->send(0); | |
// echo "Press Enter when the workers are ready: "; | |
// $fp = fopen('php://stdin', 'r'); | |
// $line = fgets($fp, 512); | |
// fclose($fp); | |
// echo "Sending tasks to workers…", PHP_EOL; | |
sleep(1); | |
for ($i = 0; $i < $no_of_files; $i++) { | |
// Random workload from 1 to 100msecs | |
$workload = '101;abcdefg;' .$files[$i]; | |
$sender->send($workload); | |
} | |
// the sink now. Once sink conform, move on and kill each of the worker process | |
for ($i = 0; $i < $workers; $i++) { | |
$sender->send('kill'); | |
} | |
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); | |
$receiver->bind("ipc://$TMP/sinktovent001"); | |
$ack = $receiver->recv(); | |
if ($ack == 'ACK') logln('[vent] ACK Recieved from sink'); | |
$tend = microtime(true); | |
$total_msec = ($tend - $tstart) * 1000; | |
echo 'TIME TAKEN = ' .$total_msec .'ms' .PHP_EOL; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
echo 'proc id = ' .getmypid() .PHP_EOL; | |
$taskid = $argv[1]; | |
$checksum = $argv[2]; | |
$TMP = $taskid .$checksum; | |
function logln($m) { | |
global $TMP; | |
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND); | |
} | |
logln("[worker] started with process id " .getmypid()); | |
$context = new ZMQContext(); | |
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); | |
$receiver->connect("ipc://$TMP/venttoworker001"); | |
// $receiver->connect("tcp://127.0.0.1:5557"); | |
logln('[worker] connected to venttoworker IPC '); | |
// Socket to send messages to | |
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); | |
$sender->connect("ipc://$TMP/workertosink001"); | |
logln('[worker] connected to workertosink IPC '); | |
// Process tasks forever | |
while (true) { | |
$string = $receiver->recv(); | |
if ($string == 'kill') { | |
logln('[worker] ' .getmypid() .' killing self'); | |
exit; | |
} | |
$data = explode(';', $string); | |
logln("[worker] " .getmypid() ." > TASKID = " .$data[0] ."\tCHECKSUM = " .$data[1] ."\tFILE_ID = " .$data[2]); | |
sleep(1); | |
$sender->send($data[2]); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment