Skip to content

Instantly share code, notes, and snippets.

@DimShadoWWW
Last active December 31, 2015 03:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DimShadoWWW/7929379 to your computer and use it in GitHub Desktop.
Save DimShadoWWW/7929379 to your computer and use it in GitHub Desktop.
import json
import os
import re
import sys
import logging
import datetime
import time
import db_access
import subprocess
import traceback
import multiprocessing
import parse_php_assignments
class FilesSender(multiprocessing.Process):
def __init__(self, task_queue, result_queue, timeout, logger):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.logger = logger
self.timeout = timeout
def run(self):
proc_name = self.name
global logger
lockfile = None
filename = None
while True:
try:
try:
cmd_data = self.task_queue.get()
if cmd_data is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
except Exception, e:
self.task_queue.task_done()
print traceback.format_exc()
cmd = cmd_data["cmd"]
filename = cmd_data["filename"]
hostname = cmd_data["hostname"]
plugin_name = "/".join(cmd[1].split("/")[-4:])
lockfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "run", cmd_data['lockfile'])
# File to send doesn't exist
if not os.path.isfile(filename):
# remove lock file if exist
if os.path.isfile(lockfile):
os.remove(lockfile)
raise LockFileNotFoundError("File not found error on: %s " % (filename))
# if its been more than 2 minutes from lock file creation and file to send is not there, remove the lock file
if os.path.isfile(lockfile) and datetime.datetime.fromtimestamp(os.path.getctime(lockfile)) < datetime.datetime.now()-datetime.timedelta(minutes=2):
self.logger.debug("[DEBUG] Deleting old lockfile: %s" % lockfile)
os.remove(lockfile)
if not os.path.isfile(lockfile):
self.logger.debug("[DEBUG] Creating lockfile: %s" % lockfile)
self.touch(lockfile)
startup = time.time()
# send file
self.logger.debug("[DEBUG] %s Executing %s" % (proc_name, " ".join(cmd)))
# print '%s: %s' % (proc_name, cmd_data)
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
while time.time()-startup <= self.timeout:
if process.poll() != None:
break
time.sleep(1)
if time.time()-startup > self.timeout:
self.logger.warning("[WARN] File sending process took a long time..")
print "_________"
print(process.stdout.read())
print "_________"
process.stdout.close()
process.stdin.close()
process.stderr.close()
except Exception, e:
print traceback.format_exc()
# self.logger.error(e)
self.queue.task_done()
self.result_queue.put((cmd_data["siti_id"], filename ,
"""File failed to send:
filename='{filename}'
plugin='{plugin_name}'
hostname='{hostname}'
""".format(
filename = filename, plugin_name = plugin_name,
hostname = hostname
)))
finally:
if lockfile:
if os.path.isfile(lockfile):
os.remove(lockfile)
self.task_queue.task_done()
self.queue.task_done()
return
def touch(self, fname):
if os.path.exists(fname):
os.utime(fname, None)
else:
open(fname, 'w').close()
os.chmod(fname, 0777)
def processFile(q, fileList, siti_id, db, nthreads, protocol, timeout, logger=None):
print("processFile called")
outqueue = multiprocessing.Queue()
files_with_errors = []
print 'Creating %d file senders' % nthreads
consumers = [ FilesSender(q, outqueue, timeout, logger)
for i in xrange(nthreads) ]
for w in consumers:
w.start()
q.join() # <---- aqui es donde se quedaba
print("checking protocols")
# remove files that are not in output queue,
# but only if all site configured protocols were used
if protocol == "all":
if outqueue.empty():
for f in fileList:
logger.debug("[DEBUG] Removing file %s" % f)
db.removeFile(f, siti_id)
os.remove(f)
else:
while not outqueue.empty():
my_siti_id, f, output = outqueue.get()
files_with_errors.append(f)
for f in set(fileList).difference(set(files_with_errors)):
logger.debug("[DEBUG] Removing file %s" % f)
db.removeFile(f, siti_id)
os.remove(f)
print("processFile ended")
def addToQueue(siti_id, config, protocol, logger):
logger = logger or logging.getLogger(__name__)
nthreads=config["n_threads_per_queue"]
time_to_sleep=config["time_to_sleep"]
timeout=config["time_to_wait_for_send"]
plist = []
loggers = {}
while True:
logger = logger or logging.getLogger(__name__)
senders = json.load(open(os.path.expanduser(os.path.join(
os.path.dirname(os.path.realpath(__file__)), "..", "conf", "senders.json"))))
config = parse_php_assignments(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..", "..", "config.php"))
db_access_config = {'hostname': config['HOST_DS'],
'database': config['NAME_DS'],
'username': config['USER_DS'],
'password': config['PASS_DS']}
db = db_access.DFPGAccess(hostname=db_access_config['hostname'],
database=db_access_config['database'],
username=db_access_config['username'],
password=db_access_config['password'],
deliver_basedir=config['DIR_PUBLIC'], logger=logger)
db.connect()
fileList = db.getFileList(config["DF_SITI_SERVER"], siti_id)
if len(fileList) > 0:
files_with_errors = {}
for siti_id_local in fileList.keys():
if siti_id_local not in loggers.keys():
LOG_FILENAME = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "log", "%s_%s.log" %
(siti_id_local, datetime.datetime.today().strftime("%Y%m%d")))
if not os.path.isdir(os.path.dirname(LOG_FILENAME)):
os.mkdir(os.path.dirname(LOG_FILENAME))
loggers[siti_id_local] = logging.getLogger('siti_%s_files_sender'% siti_id_local)
loggers[siti_id_local].setLevel(logging.DEBUG)
# create a file handler
handler = logging.FileHandler(LOG_FILENAME)
handler.setLevel(logging.DEBUG)
# create a logging format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# add the handlers to the logger
loggers[siti_id_local].addHandler(handler)
loggers[siti_id_local].setLevel(logging.DEBUG)
files_with_errors[siti_id_local] = []
queue = multiprocessing.JoinableQueue()
siteConfig = json.loads(db.getSiteConfig(siti_id_local))
if protocol == "all":
protocols = siteConfig.keys()
elif protocol in siteConfig.keys():
protocols = [protocol]
else:
logger.error("[ERROR] %s is not into configured senders for %s" %
protocol, siti_id_local)
loggers[siti_id_local].error("[ERROR] %s is not into configured senders for %s" %
protocol, siti_id_local)
# list of inserted files
localFileList = []
for filename in fileList[siti_id_local]:
# if file path is in this server and exist
if os.path.isfile(filename):
localFileList.append(filename)
logger.info("[INFO] Adding %s to the sender pool" %
os.path.basename(filename))
loggers[siti_id_local].info("[INFO] Adding %s to the sender pool" %
os.path.basename(filename))
for k in protocols:
exec_plugin_path = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "..", senders[k]))
if os.path.isfile(exec_plugin_path):
cmd = [
"php",
exec_plugin_path,
"--arch_url=%s" % filename,
"--siti_id=%s" % siti_id_local,
"--localdir=%s/" % os.path.join(config['DIR_PUBLIC'], "ftp_clientes")
]
for l in siteConfig[k]:
lcmd = []
for m in l.keys():
lcmd.append("--%s=%s" % (m, l[m]))
queue.put({"cmd" : cmd+lcmd, "siti_id": siti_id_local,
"lockfile": "%s_%s_%s_%s_%d" % (
siti_id_local,
k,
os.path.basename(filename),
l["hostname"],
siteConfig[k].index(l)),
"hostname": l["hostname"],
"filename": filename
})
else:
logger.error("Configured plugin not found error on: %s " % (exec_plugin_path))
loggers[siti_id_local].error("Configured plugin not found error on: %s " % (exec_plugin_path))
else:
files_with_errors[siti_id_local].append(filename)
logger.error("File not found error on: %s " % (filename))
loggers[siti_id_local].error("File not found error on: %s " % (filename))
p = multiprocessing.Process(target=processFile, args=(queue,
localFileList, siti_id_local, db, nthreads, protocol, timeout, loggers[siti_id_local]))
plist.append(p)
p.start()
else:
logger.info("[INFO] Sin actualizaciones a procesar")
time.sleep(time_to_sleep)
for p in plist:
p.join()
def processFileQueue(siti_id, config, protocol="all", logger=None):
p = multiprocessing.Process(target=addToQueue, args=(siti_id, config, protocol, logger))
p.start()
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment