Skip to content

Instantly share code, notes, and snippets.

@anovanmaximuz
Created September 3, 2015 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anovanmaximuz/6525750434ab9ea12b76 to your computer and use it in GitHub Desktop.
Save anovanmaximuz/6525750434ab9ea12b76 to your computer and use it in GitHub Desktop.
Notification
<?php
require_once dirname(__FILE__) . '/models/workers.php';
require_once dirname(__FILE__) . '/models/notification.php';
class Modules_Plugin_Flusher {
protected $config;
protected $logger;
protected $activity;
public function __construct(){
$this->config = Config_Loader::getInstance();
$this->logger = Base_Logging::getInstance();
$this->activity = Base_Logging_Activity::getInstance();
}
// NOTE:
// TO MYXL DB using workers model
// TO MYXL_NOTIF DB using notification model
public function jobsproc($suffix_tmp, $tube_tmp, $msg_num_tmp, $tid_tmp){
require_once($this->config->app('phean_init'));
$ipnya = $this->config->app('beanstalkd_ip');
$portnya = $this->config->app('beanstalkd_port');
$timeoutnya = $this->config->app('beanstalkd_timeout');
if(!empty($suffix_tmp)){
$suffixnya = trim($suffix_tmp);
}else{
$suffixnya = $this->config->app('default_suffix');
}
if(!empty($tube_tmp)){
$tubenya = trim($tube_tmp);
}else{
$tubenya = $this->config->app('default_tube');
}
if(!empty($msg_num_tmp)){
$msg_numnya = trim($msg_num_tmp);
}else{
$msg_numnya = $this->config->app('default_msg_num');
}
if(!empty($tid_tmp)){
$tidnya = trim($tid_tmp);
}else{
$tidnya = rand(1000, 9999);
$this->logger->write('debug', 'QUEUE-WATCH : EMPTY TID THREAD_ID');
exit();
}
$mytube = $tubenya."_".$suffixnya;
$this->logger->write('debug', 'QUEUE-WATCH STARTING UP: IP: '.$ipnya.' PORT: '.$portnya.' TIMEOUT: '. $timeoutnya .'TUBE: '.$mytube.' MSG_NUM: '. $msg_numnya.' TID: '.$tidnya);
$queue = new Pheanstalk_Pheanstalk($ipnya,$portnya,$timeoutnya);
$i=0;
while(true){
if($i >= $msg_numnya && $tidnya != "default"){
$this->logger->write('debug', 'QUEUE-WATCH DONE: TUBE: '. $mytube .' INC: '.$i.' THREAD_ID: '.$tidnya);
exit(0);
}else{
$this->logger->write('debug', 'QUEUE-WATCH : TUBE: '. $mytube .' INC: '.$i.' THREAD_ID: '.$tidnya);
}
$i++;
try{
$qjob = $queue->watch($mytube)->ignore('default')->reserve(1);
}catch(Exception $e){
$this->logger->write('debug', 'QUEUE-WATCH ERROR: '.$e->getTraceAsString());
continue;
}
if (!$qjob instanceof Pheanstalk_Job) {
continue;
}
$idjob = $qjob->getId();
$dataq = json_decode($qjob->getData(), true);
$this->logger->write('debug', 'QUEUE-WATCH QUEUE_DATA: '.$qjob->getData());
$msisdn = $dataq['msisdn'];
$token = $dataq['token'];
$user_agent = $dataq['user_agent'];
$msg = $dataq['msg'];
$category = $dataq['category'];
$received_time = $dataq['received_time'];
$myxl_users = new Models_Workers;
$sso_users = $myxl_users->getdevice_os($msisdn);
$do_users = trim($sso_users[0]['device_os']);
$this->logger->write('debug', 'QUEUE-WATCH DEVICE_OS_DATA: '.$do_users);
$pustitout = $this->push_routing($do_users, $msisdn, $token, $user_agent, $msg, $category);
if($pustitout){
$notifikasi = new Models_Notification;
$deleted_job = $queue->delete($qjob);
$this->logger->write('debug', 'QUEUE-DELETE JOB: ID: '.$idjob.' DATA: '.print_r($dataq,1));
$inserted_notif = $notifikasi->insert_notif($idjob, $received_time, $msisdn, $status = 1, $category, $source = 'soa', $msg, $status_pull = 0);
$this->logger->write('debug', 'QUEUE-INSERT DB: '.print_r($dataq,1));
$this->logger->write('debug', 'QUEUE-PUSH JOB: ID: '.$idjob.' STATUS: SUCCESS');
}else{
$released_job = $queue->release($qjob);
$this->logger->write('debug', 'QUEUE-RELEASE JOB: ID: '.$idjob.' DATA: '.print_r($dataq,1));
$this->logger->write('debug', 'QUEUE-PUSH JOB: ID: '.$idjob.' STATUS: FAILED');
}
}
}
protected function push_routing($do_users,$msisdn,$token,$user_agent,$msg,$category){
$result = FALSE;
if(preg_match("/android/i", $do_users)){
$url = $this->config->app('android_bl');
$data_pack = $this->android_payload($msg, $msisdn, $category, $token);
}elseif(preg_match("/ios/i", $do_users)){
$url = $this->config->app('ios_bl');
$data_pack = $this->ios_payload($msg, $msisdn, $category, $token);
}
$hasil = Libs_Curl::simplePost($url, $data_pack);
//$hasil = '{"code":"200","message":"successfully delivered"}'; // DUMMY! PLEASE REMOVE!
$hasil_arr = json_decode($hasil,1);
$this->logger->write('debug', 'QUEUE-WATCH PUSH_ROUTING: URL: '.$url.' PARAMS: '.json_encode($data_pack).' RESPONSE: '.$hasil);
if (!empty($hasil_arr) && $hasil_arr['code'] == '200'){
$result = TRUE;
}
return $result;
}
protected function android_payload($msg,$msisdn,$category,$token){
$cats = strtolower($category);
$catenya = $this->config->app('page_id');
if (array_key_exists($cats, $catenya)) {
$page_id = $catenya[$cats];
}else{
$page_id = "2";
}
$msg = array(
'message' => $msg,
'msisdn' => $msisdn,
'page_id' => $page_id,
'type' => $category
);
$data = array(
'registration_ids' => array($token),
'data' => array(
'data' => $msg
)
);
$payload = json_encode($data);
$this->logger->write('debug', 'QUEUE-WATCH ANDROID_PAYLOAD: '.$payload);
$params = array(
'payload' => $payload
);
return $params;
}
protected function ios_payload($msg,$msisdn,$category,$token){
$cats = strtolower($category);
$catenya = $this->config->app('page_id');
if (array_key_exists($cats, $catenya)){
$page_id = $catenya[$cats];
}else{
$page_id = "2";
}
$body= array(
'aps' => array(
'alert' => $msg,
'badge' => 1,
'sound' => 'default',
'type' => $cats
),
'page_id' => $page_id
);
$payload = json_encode($body);
$this->logger->write('debug', 'QUEUE-WATCH IOS_PAYLOAD: '.$payload);
$params = array(
'payload' => $payload,
'token' => $token
);
return $params;
}
///
public function jobscheck($suffix_tmp, $tube_tmp, $msg_num_tmp){
require_once($this->config->app('phean_init'));
$ipnya = $this->config->app('beanstalkd_ip');
$portnya = $this->config->app('beanstalkd_port');
$timeoutnya = $this->config->app('beanstalkd_timeout');
if(!empty($suffix_tmp)){
$suffixnya = trim($suffix_tmp);
}else{
$suffixnya = $this->config->app('default_suffix');
}
if(!empty($tube_tmp)){
$tubenya = trim($tube_tmp);
}else{
$tubenya = $this->config->app('default_tube');
}
if(!empty($msg_num_tmp)){
$msg_numnya = trim($msg_num_tmp);
}else{
$msg_numnya = $this->config->app('default_msg_num');
}
$mytube = $tubenya."_".$suffixnya;
$this->logger->write('debug', 'QUEUE-CHECK STARTING UP: IP: '.$ipnya.' PORT: '.$portnya.' TIMEOUT: '. $timeoutnya. ' TUBE: '.$mytube.' MSG_NUM: '. $msg_numnya);
$queue = new Pheanstalk_Pheanstalk($ipnya,$portnya,$timeoutnya);
try{
$queue->watch($mytube);
}catch(Exception $e){
$this->logger->write('debug', 'QUEUE-WATCH EXCEPTION: '.$e->getTraceAsString());
}
$num_jobs_ready = (array)$queue->statsTube($mytube);
$results_tmp = (int)$num_jobs_ready['current-jobs-ready'];
$results = (!empty($results_tmp)) ? $results_tmp : 0;
$sum_thread = ceil($results/$msg_num_tmp);
if($sum_thread > 0){
$finres = $sum_thread; // substract of default workers
}else{
$finres = 0;
}
echo $finres;
$this->logger->write('debug', 'QUEUE-CHECK RESULT: NUMBER JOBS: ' . $results . " NUMBER NEW THREAD: " . $finres);
exit();
}
///
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment