Skip to content

Instantly share code, notes, and snippets.

@mebjas
Created September 3, 2015 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mebjas/d4913309cd046899f6e1 to your computer and use it in GitHub Desktop.
Save mebjas/d4913309cd046899f6e1 to your computer and use it in GitHub Desktop.
simulation of vent sink worker model
<?php
$TMP = $argv[2];
$no_of_files = intval($argv[1]);
function logln($m) {
global $TMP;
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND);
}
logln("[sink] started with pid = " .getmypid());
logln("[sink] will be expecting to recieve $no_of_files files");
$context = new ZMQContext();
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->bind("ipc://$TMP/workertosink001");
$files_to_recieve = $no_of_files;
while ($files_to_recieve > 0) {
$string = $receiver->recv();
logln("[sink] recieved ack from file_id = " .$string);
if ($string != 'FAIL') {
// TODO - take some method
}
--$files_to_recieve;
}
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("ipc://$TMP/sinktovent001");
$sender->send('ACK');
logln('[sink] ACK sent to vent');
exit;
<?php
$tstart = microtime(true);
$no_of_files = 43;
$taskid = 101;
$checksum = 'abcdefg';
// Create a tmp directory to hold the ipcs
$TMP = $taskid .$checksum;
if (!file_exists($TMP)) mkdir($TMP);
function logln($m) {
global $TMP;
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND);
}
logln("-----------------------------------------------------------------");
// Get file ids in memory
$files = array();
for ($i = 1; $i <= $no_of_files; $i++) $files[] = $i;
// Start a sink with knowlege of no of files that are supposed to be todofied
// sink should listen to workers now. After getting all sink should talk back to
// this file so that init moves on
logln("[init] calling sink process to make sure it supervises processing ");
exec("php sink.php $no_of_files $TMP > $TMP/logs 2>&1 & echo $!", $op);
// Start n workers, store their process IDs.
$workers = 9;
for ($i = 0; $i < $workers; $i++) {
$op = exec("php worker.php $taskid $checksum > $TMP/logs 2>&1 & echo $!", $op);
}
// Distribute the task and wait to listen from
$context = new ZMQContext();
// Socket to send messages on
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->bind("ipc://$TMP/venttoworker001");
// // The first message is "0" and signals start of batch
// $sender->send(0);
// echo "Press Enter when the workers are ready: ";
// $fp = fopen('php://stdin', 'r');
// $line = fgets($fp, 512);
// fclose($fp);
// echo "Sending tasks to workers…", PHP_EOL;
sleep(1);
for ($i = 0; $i < $no_of_files; $i++) {
// Random workload from 1 to 100msecs
$workload = '101;abcdefg;' .$files[$i];
$sender->send($workload);
}
// the sink now. Once sink conform, move on and kill each of the worker process
for ($i = 0; $i < $workers; $i++) {
$sender->send('kill');
}
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->bind("ipc://$TMP/sinktovent001");
$ack = $receiver->recv();
if ($ack == 'ACK') logln('[vent] ACK Recieved from sink');
$tend = microtime(true);
$total_msec = ($tend - $tstart) * 1000;
echo 'TIME TAKEN = ' .$total_msec .'ms' .PHP_EOL;
<?php
echo 'proc id = ' .getmypid() .PHP_EOL;
$taskid = $argv[1];
$checksum = $argv[2];
$TMP = $taskid .$checksum;
function logln($m) {
global $TMP;
file_put_contents($TMP .'/logs', $m .PHP_EOL, FILE_APPEND);
}
logln("[worker] started with process id " .getmypid());
$context = new ZMQContext();
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("ipc://$TMP/venttoworker001");
// $receiver->connect("tcp://127.0.0.1:5557");
logln('[worker] connected to venttoworker IPC ');
// Socket to send messages to
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("ipc://$TMP/workertosink001");
logln('[worker] connected to workertosink IPC ');
// Process tasks forever
while (true) {
$string = $receiver->recv();
if ($string == 'kill') {
logln('[worker] ' .getmypid() .' killing self');
exit;
}
$data = explode(';', $string);
logln("[worker] " .getmypid() ." > TASKID = " .$data[0] ."\tCHECKSUM = " .$data[1] ."\tFILE_ID = " .$data[2]);
sleep(1);
$sender->send($data[2]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment