Last active
December 31, 2015 03:39
-
-
Save DimShadoWWW/7929379 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import re | |
import sys | |
import logging | |
import datetime | |
import time | |
import db_access | |
import subprocess | |
import traceback | |
import multiprocessing | |
import parse_php_assignments | |
class FilesSender(multiprocessing.Process): | |
def __init__(self, task_queue, result_queue, timeout, logger): | |
multiprocessing.Process.__init__(self) | |
self.task_queue = task_queue | |
self.result_queue = result_queue | |
self.logger = logger | |
self.timeout = timeout | |
def run(self): | |
proc_name = self.name | |
global logger | |
lockfile = None | |
filename = None | |
while True: | |
try: | |
try: | |
cmd_data = self.task_queue.get() | |
if cmd_data is None: | |
# Poison pill means shutdown | |
print '%s: Exiting' % proc_name | |
self.task_queue.task_done() | |
break | |
except Exception, e: | |
self.task_queue.task_done() | |
print traceback.format_exc() | |
cmd = cmd_data["cmd"] | |
filename = cmd_data["filename"] | |
hostname = cmd_data["hostname"] | |
plugin_name = "/".join(cmd[1].split("/")[-4:]) | |
lockfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "run", cmd_data['lockfile']) | |
# File to send doesn't exist | |
if not os.path.isfile(filename): | |
# remove lock file if exist | |
if os.path.isfile(lockfile): | |
os.remove(lockfile) | |
raise LockFileNotFoundError("File not found error on: %s " % (filename)) | |
# if its been more than 2 minutes from lock file creation and file to send is not there, remove the lock file | |
if os.path.isfile(lockfile) and datetime.datetime.fromtimestamp(os.path.getctime(lockfile)) < datetime.datetime.now()-datetime.timedelta(minutes=2): | |
self.logger.debug("[DEBUG] Deleting old lockfile: %s" % lockfile) | |
os.remove(lockfile) | |
if not os.path.isfile(lockfile): | |
self.logger.debug("[DEBUG] Creating lockfile: %s" % lockfile) | |
self.touch(lockfile) | |
startup = time.time() | |
# send file | |
self.logger.debug("[DEBUG] %s Executing %s" % (proc_name, " ".join(cmd))) | |
# print '%s: %s' % (proc_name, cmd_data) | |
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
while time.time()-startup <= self.timeout: | |
if process.poll() != None: | |
break | |
time.sleep(1) | |
if time.time()-startup > self.timeout: | |
self.logger.warning("[WARN] File sending process took a long time..") | |
print "_________" | |
print(process.stdout.read()) | |
print "_________" | |
process.stdout.close() | |
process.stdin.close() | |
process.stderr.close() | |
except Exception, e: | |
print traceback.format_exc() | |
# self.logger.error(e) | |
self.queue.task_done() | |
self.result_queue.put((cmd_data["siti_id"], filename , | |
"""File failed to send: | |
filename='{filename}' | |
plugin='{plugin_name}' | |
hostname='{hostname}' | |
""".format( | |
filename = filename, plugin_name = plugin_name, | |
hostname = hostname | |
))) | |
finally: | |
if lockfile: | |
if os.path.isfile(lockfile): | |
os.remove(lockfile) | |
self.task_queue.task_done() | |
self.queue.task_done() | |
return | |
def touch(self, fname): | |
if os.path.exists(fname): | |
os.utime(fname, None) | |
else: | |
open(fname, 'w').close() | |
os.chmod(fname, 0777) | |
def processFile(q, fileList, siti_id, db, nthreads, protocol, timeout, logger=None): | |
print("processFile called") | |
outqueue = multiprocessing.Queue() | |
files_with_errors = [] | |
print 'Creating %d file senders' % nthreads | |
consumers = [ FilesSender(q, outqueue, timeout, logger) | |
for i in xrange(nthreads) ] | |
for w in consumers: | |
w.start() | |
q.join() # <---- aqui es donde se quedaba | |
print("checking protocols") | |
# remove files that are not in output queue, | |
# but only if all site configured protocols were used | |
if protocol == "all": | |
if outqueue.empty(): | |
for f in fileList: | |
logger.debug("[DEBUG] Removing file %s" % f) | |
db.removeFile(f, siti_id) | |
os.remove(f) | |
else: | |
while not outqueue.empty(): | |
my_siti_id, f, output = outqueue.get() | |
files_with_errors.append(f) | |
for f in set(fileList).difference(set(files_with_errors)): | |
logger.debug("[DEBUG] Removing file %s" % f) | |
db.removeFile(f, siti_id) | |
os.remove(f) | |
print("processFile ended") | |
def addToQueue(siti_id, config, protocol, logger): | |
logger = logger or logging.getLogger(__name__) | |
nthreads=config["n_threads_per_queue"] | |
time_to_sleep=config["time_to_sleep"] | |
timeout=config["time_to_wait_for_send"] | |
plist = [] | |
loggers = {} | |
while True: | |
logger = logger or logging.getLogger(__name__) | |
senders = json.load(open(os.path.expanduser(os.path.join( | |
os.path.dirname(os.path.realpath(__file__)), "..", "conf", "senders.json")))) | |
config = parse_php_assignments( | |
os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..", "..", "config.php")) | |
db_access_config = {'hostname': config['HOST_DS'], | |
'database': config['NAME_DS'], | |
'username': config['USER_DS'], | |
'password': config['PASS_DS']} | |
db = db_access.DFPGAccess(hostname=db_access_config['hostname'], | |
database=db_access_config['database'], | |
username=db_access_config['username'], | |
password=db_access_config['password'], | |
deliver_basedir=config['DIR_PUBLIC'], logger=logger) | |
db.connect() | |
fileList = db.getFileList(config["DF_SITI_SERVER"], siti_id) | |
if len(fileList) > 0: | |
files_with_errors = {} | |
for siti_id_local in fileList.keys(): | |
if siti_id_local not in loggers.keys(): | |
LOG_FILENAME = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "log", "%s_%s.log" % | |
(siti_id_local, datetime.datetime.today().strftime("%Y%m%d"))) | |
if not os.path.isdir(os.path.dirname(LOG_FILENAME)): | |
os.mkdir(os.path.dirname(LOG_FILENAME)) | |
loggers[siti_id_local] = logging.getLogger('siti_%s_files_sender'% siti_id_local) | |
loggers[siti_id_local].setLevel(logging.DEBUG) | |
# create a file handler | |
handler = logging.FileHandler(LOG_FILENAME) | |
handler.setLevel(logging.DEBUG) | |
# create a logging format | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
# add the handlers to the logger | |
loggers[siti_id_local].addHandler(handler) | |
loggers[siti_id_local].setLevel(logging.DEBUG) | |
files_with_errors[siti_id_local] = [] | |
queue = multiprocessing.JoinableQueue() | |
siteConfig = json.loads(db.getSiteConfig(siti_id_local)) | |
if protocol == "all": | |
protocols = siteConfig.keys() | |
elif protocol in siteConfig.keys(): | |
protocols = [protocol] | |
else: | |
logger.error("[ERROR] %s is not into configured senders for %s" % | |
protocol, siti_id_local) | |
loggers[siti_id_local].error("[ERROR] %s is not into configured senders for %s" % | |
protocol, siti_id_local) | |
# list of inserted files | |
localFileList = [] | |
for filename in fileList[siti_id_local]: | |
# if file path is in this server and exist | |
if os.path.isfile(filename): | |
localFileList.append(filename) | |
logger.info("[INFO] Adding %s to the sender pool" % | |
os.path.basename(filename)) | |
loggers[siti_id_local].info("[INFO] Adding %s to the sender pool" % | |
os.path.basename(filename)) | |
for k in protocols: | |
exec_plugin_path = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..", senders[k])) | |
if os.path.isfile(exec_plugin_path): | |
cmd = [ | |
"php", | |
exec_plugin_path, | |
"--arch_url=%s" % filename, | |
"--siti_id=%s" % siti_id_local, | |
"--localdir=%s/" % os.path.join(config['DIR_PUBLIC'], "ftp_clientes") | |
] | |
for l in siteConfig[k]: | |
lcmd = [] | |
for m in l.keys(): | |
lcmd.append("--%s=%s" % (m, l[m])) | |
queue.put({"cmd" : cmd+lcmd, "siti_id": siti_id_local, | |
"lockfile": "%s_%s_%s_%s_%d" % ( | |
siti_id_local, | |
k, | |
os.path.basename(filename), | |
l["hostname"], | |
siteConfig[k].index(l)), | |
"hostname": l["hostname"], | |
"filename": filename | |
}) | |
else: | |
logger.error("Configured plugin not found error on: %s " % (exec_plugin_path)) | |
loggers[siti_id_local].error("Configured plugin not found error on: %s " % (exec_plugin_path)) | |
else: | |
files_with_errors[siti_id_local].append(filename) | |
logger.error("File not found error on: %s " % (filename)) | |
loggers[siti_id_local].error("File not found error on: %s " % (filename)) | |
p = multiprocessing.Process(target=processFile, args=(queue, | |
localFileList, siti_id_local, db, nthreads, protocol, timeout, loggers[siti_id_local])) | |
plist.append(p) | |
p.start() | |
else: | |
logger.info("[INFO] Sin actualizaciones a procesar") | |
time.sleep(time_to_sleep) | |
for p in plist: | |
p.join() | |
def processFileQueue(siti_id, config, protocol="all", logger=None): | |
p = multiprocessing.Process(target=addToQueue, args=(siti_id, config, protocol, logger)) | |
p.start() | |
p.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment