Skip to content

Instantly share code, notes, and snippets.

@anthonyTS
Created October 27, 2015 14:36
Show Gist options
  • Save anthonyTS/b6c46d54ee8bf2d714b8 to your computer and use it in GitHub Desktop.
Save anthonyTS/b6c46d54ee8bf2d714b8 to your computer and use it in GitHub Desktop.
Database Problems
<?php
include_once dirname(__FILE__) . DIRECTORY_SEPARATOR . 'config.php';
include_once dirname(__FILE__) . DIRECTORY_SEPARATOR . 'functions.php';
include_once dirname(__FILE__) . DIRECTORY_SEPARATOR . 'db.php';
require_once dirname(__FILE__) . DIRECTORY_SEPARATOR . '../../application/libraries/Stalk.php';
$stalk = new Stalk(array(BEANSTALKD_SERVER, BEANSTALKD_PORT));
$stalk->background(function() {
$tube = 'check-monitoring-logs';
$stalk = new Stalk(array(BEANSTALKD_SERVER, BEANSTALKD_PORT));
$children = $stalk->spawn_children(WORKERS_PER_CORE);
$parent_pid = posix_getppid();
$current_pid = posix_getpid();
if ($parent_pid > 1) {
$tube .= '-' . HOSTNAME . '-' . $current_pid;
}
$stalk = new Stalk(array(BEANSTALKD_SERVER, BEANSTALKD_PORT));
while (true) {
$job = $stalk->watch($tube)->ignore('default')->reserve();
$raw_data = $job->getData();
$job_id = $job->getId();
$data = json_decode($raw_data, true);
if (isset($data['terminate']) && $data['terminate'] == true) {
// need to terminate the script and any running children
$stalk->log('Received terminate command');
if ($parent_pid == 1) {
for ($i = 0; $i < count($children); $i++) {
$child = $children[$i];
$stalk->log('terminating child ' . $child);
$stalk->useTube($tube . '-' . HOSTNAME . '-' . $child)->put(json_encode(array('terminate' => true)));
}
}
$stalk->delete($job);
exit(0);
} else if (!empty($data['monitoring_log_ids']) && count($data['monitoring_log_ids']) > 0) {
if ($parent_pid == 1) {
// check that the child processes are still watching their tube
check_children($children, $tube, $stalk);
// split the workload across the children
$chunks = array_chunk($data['monitoring_log_ids'], min(max(floor(count($data['monitoring_log_ids']) / count($children)),1),MAX_WORKLOAD_SIZE), true);
$stalk->log('Split into ' . count($chunks) . ' chunks');
$distribution = array();
shuffle($children);
$i = 0;
foreach($chunks as $chunk) {
if ($i == count($children)) {
$i = 0;
}
$child = $children[$i];
$distribution[$child][]=$chunk;
$i++;
}
// write the split workload to the dedicated tube for each child
$stalk->log($distribution);
foreach($distribution as $child=>$chunks) {
foreach($chunks as $ary) {
$stalk->log('Adding job into ' . $tube . '-' . HOSTNAME . '-' . $child . ' tube');
$stalk->useTube($tube . '-' . HOSTNAME . '-' . $child)->put(json_encode(array('monitoring_log_ids' => $ary, 'run_id' => $data['run_id'])));
}
}
// there's nothing to do now in this job, so delete it and move on
$stalk->log(__LINE__);
$stalk->delete($job);
} else {
$stalk->log($data);
$updates = array();
$con = mysql_connect(DB_HOST, DB_USER, DB_PASS);
mysql_select_db(DB_DATABASE);
new DB(DB_HOST, DB_USER, DB_PASS, DB_DATABASE);
foreach ($data['monitoring_log_ids'] as $id) {
// Get the monitoring_log entry
$SQL = 'SELECT * FROM monitoring_log WHERE id = ' . $id;
$stalk->log($SQL);
$ml = DB::single($SQL);
$stalk->log($ml);
// Get the last source price / source stock for this item
$SQL = 'SELECT source_price, source_stock FROM monitoring_log WHERE id < ' . $ml['id'] . ' AND monitoring_id = ' . $ml['monitoring_id'] . ' ORDER BY id DESC LIMIT 1';
$stalk->log($SQL);
$res = DB::select($SQL);
if (count($res) == 1) {
$last_source_price = $res[0]['source_price'];
$last_source_stock = $res[0]['source_stock'];
} else {
$last_source_price = '';
$last_source_stock = '';
}
$stalk->log($last_source_price . ' ' . $last_source_stock);
// Get all users that are monitoring this item
$SQL = 'SELECT e.id, e.user_id, COUNT(ru.run_id) as total_runs, e.selling_price, e.stock_level FROM ebay_items e INNER JOIN monitoring m ON m.sku = e.sku LEFT JOIN runs_users ru ON ru.user_id = e.user_id WHERE m.id = ' . $ml['monitoring_id'];
$stalk->log($SQL);
$res = DB::select($SQL);
foreach ($res as $row){
$stalk->log($row);
$SQL = 'SELECT su.value FROM settings_user su INNER JOIN settings s ON s.id = su.settings_id WHERE s.code = "amazon/monitor_third_parties" AND su.user_id = ' . $row['user_id'];
$res2 = DB::single($SQL);
$monitor_third_parties = $res2['value'] == 1;
if ($monitor_third_parties === false && $ml['isprime'] == 0) {
$ml['source_stock'] = 'OOS';
$ml['stock_reason'] = 'Not fulfilled by Amazon';
$stalk->log('Temporarily changed ' . $ml['monitoring_id'] . ' to OOS as not fulfilled by Amazon and customer not monitoring third parties');
}
$SQL = 'INSERT INTO runs_users (run_id, user_id, monitor_third_parties) VALUES (' . $ml['run_id'] . ', ' . $row['user_id'] . ', ' . $res2['value'] . ') ON DUPLICATE KEY UPDATE monitor_third_parties = VALUES(monitor_third_parties)';
$stalk->log($SQL);
DB::insert($SQL);
// if (
// (empty($row['total_runs']) || $row['total_runs'] <= 1)
// ||
// (
// ($last_source_price > 0 || empty($last_source_price))
// &&
// ($last_source_price != $ml['source_price'] || $last_source_stock != $ml['source_stock'])
// )
// ) {
if ($ml['source_price'] > 0) {
$SQL = 'CALL calculate_ebay_price(' . $row['user_id'] . ', ' . $ml['source_price'] . ', @selling_price);';
$stalk->log($SQL);
mysql_query($SQL) or $stalk->log(mysql_error());
$SQL = 'SELECT @selling_price as selling_price';
$res = mysql_query($SQL);
if (mysql_num_rows($res) == 1) {
list($selling_price) = mysql_fetch_row($res);
$selling_price = round($selling_price, 2);
$stalk->log('new selling_price = ' . $selling_price);
if ($selling_price != $row['selling_price'] || (stripos($ml['source_stock'],
'OOS') === false && $row['stock_level'] == 0) || (stripos($ml['source_stock'],
'OOS') !== false && $row['stock_level'] > 0)
) {
$SQL = 'SELECT IFNULL(su.value, s.default_value) FROM settings s LEFT JOIN settings_user su ON s.id=su.settings_id WHERE s.code = "ebay/repricing_enabled" AND su.user_id = ' . $row['user_id'];
$stalk->log($SQL);
$res = mysql_query($SQL);
if ($res && mysql_num_rows($res)) {
list($repricing_enabled) = mysql_fetch_row($res);
if ($repricing_enabled == 1) {
$SQL = 'SELECT IFNULL(su.value, s.default_value) FROM settings s LEFT JOIN settings_user su ON s.id=su.settings_id WHERE s.code = "ebay/default_stock_level" AND su.user_id = ' . $row['user_id'];
$stalk->log($SQL);
$res = mysql_query($SQL);
list($default_stock_level) = mysql_fetch_row($res);
$SQL = 'INSERT INTO ebay_update SET ebay_items_id = ' . $row['id'] . ', selling_price = ' . $selling_price . ', stock_level = ' . (stripos($ml['source_stock'],
'OOS') !== false ? 0 : $default_stock_level) . ', complete = 0, run_id = ' . $data['run_id'];
$stalk->log($SQL);
mysql_query($SQL);
if (mysql_affected_rows() > 0) {
$updates[] = mysql_insert_id();
}
}
}
}
} else {
$stalk->log('Unable to get a selling price');
}
}
// }
}
}
DB::close();
$stalk->log('updates:');
$stalk->log($updates);
if (count($updates) > 0) {
$stalk->log('adding ' . count($updates) . ' into the ebay-updates tube');
$stalk->distributeWorkload('ebay-updates', $updates, 'updates');
}
mysql_close($con);
$stalk->delete($job);
}
} else {
$stalk->delete($job);
}
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment