Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
simple python daemon worker
# -*- coding: utf-8 -*-
"""
sync device status data from hive
"""
import argparse
import json
import logging.handlers
import operator
import signal
import threading
import time
from functools import partial
from functools import wraps
import os
import pymysql
import requests
from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait
from redis import StrictRedis
from config_manager import ConfigManager
from daemons.ifflow.cardflow import get_device_cardflow
from daemons.ifflow.presto import Presto
logger = logging.getLogger("root")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"[%(asctime)s %(filename)s:%(lineno)s - %(funcName)15s()] %(message)s"
)
handler = logging.handlers.RotatingFileHandler("/tmp/status_sync.log")
handler.setFormatter(formatter)
logger.addHandler(handler)
DEVICE_STATUS_INFO_CACHE_PRIFIX = "nsfocus:cache:zelda:device:status:"
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.path.pardir, "config.yaml")
cm = ConfigManager(config_file=config_file)
class Timeit(object):
def __init__(self, tag):
self.tag = tag
def __enter__(self):
self.start = time.time()
def __exit__(self, *unused):
self.cost = time.time() - self.start
logger.info("%s took about: %ss" % (self.tag, self.cost))
def tag(name):
def wrapper(func):
func.__tag__ = name
return func
return wrapper
def exec_profile(func):
@wraps(func)
def new(*args, **kwds):
with Timeit(func.__tag__):
return func(*args, **kwds)
return new
def exception_handler(expect_value=None):
def wrapper(func):
@wraps(func)
def handler(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error("{0} raise exception {1}".format(func.__tag__, str(e)))
return expect_value
return handler
return wrapper
def parse_args():
parser = argparse.ArgumentParser(description="Sync Device Network Information To Redis")
parser.add_argument(
"-p", "--period",
type=int,
dest="PERIOD",
default=2 * 60,
help="Sync Time Period"
)
return parser.parse_args()
@exec_profile
@exception_handler(expect_value=[])
@tag("get_device_status_li")
def get_device_status_detail(dev_hashes):
"""
Get all device status
:param hashs:
:return:
"""
url = "http://nscloud.api.nsfocus.com/api/nuri/lan/devicestatus/"
resp = requests.get(url, params={"show_status": 2, "hashs": ",".join(dev_hashes)})
if resp.status_code // 100 not in (2, 3):
return {}
rjson = resp.json()
if rjson["status"] != 2000:
return {}
status_li = {}
for status in rjson:
status_li[status["device_hash"]] = status
return status_li
@exec_profile
@exception_handler(expect_value=[])
@tag("get_device_protocol_li")
def get_device_protocol(hashs):
url = "http://nscloud.api.nsfocus.com/api/nuri/etau/device"
resp = requests.get(url, params={"device_hash": ",".join(hashs)})
if resp.status_code // 100 not in (2, 3):
return {}
rjson = resp.json()
if rjson["status"] != 2000:
return {}
protocols = {}
for device in rjson:
protocols[device["device_hash"]] = device["protocol_type"]
return protocols
class Worker(object):
def __init__(self, period, redis_conf, mysql_conf, hv_conf):
self._pool = ThreadPoolExecutor(max_workers=20)
self.period = period
self.running = True
self.mysql_connect_param = {}
self.mysql_connect_param.update(
host=mysql_conf['host'],
port=mysql_conf['port'],
db="zelda",
user=mysql_conf['user'],
password=mysql_conf['password'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
read_timeout=30,
write_timeout=30,
defer_connect=True
)
self.mc = pymysql.Connection(**self.mysql_connect_param)
self.rc = StrictRedis(
socket_timeout=30,
socket_connect_timeout=10,
retry_on_timeout=True,
**redis_conf)
self.hv_conf = hv_conf
self.devices = []
def renew_mysql(self):
try:
if self.mc:
self.mc.close()
except Exception:
pass
finally:
self.mc = pymysql.connect(**self.mysql_connect_param)
def sync_device(self):
@exec_profile
@exception_handler(expect_value=None)
@tag("sync_device")
def syncer():
retry = 3
while retry:
try:
self.mc.ping(reconnect=True)
except:
retry -= 1
logger.warning("mysql connection retrying({0})...".format(retry))
else:
break
if retry <= 0:
return
with self.mc.cursor() as cursor:
cursor.execute("select hash from waf where hash IS NOT NULL")
self.devices = map(operator.itemgetter("hash"), cursor.fetchall())
logger.info("current device number: {0}".format(len(self.devices)))
counter = 0
while self.running:
if counter - 30 >= 0:
job = self._pool.submit(syncer)
wait([job], timeout=30)
counter = 0
time.sleep(1)
counter += 1
@exec_profile
@tag("hv_query")
def deal_devices(self, dev_hash, status_detail_li, protocol):
presto = Presto(**self.hv_conf)
try:
info = dict(netflow={})
base_status = status_detail_li.get(dev_hash, {})
if base_status:
info.update(
cpu=base_status.get("cpu_usage", 0),
mem=base_status.get("mem_usage", 0),
online=base_status.get("online_status", 0),
lasttimepoint=int(time.time()) \
if base_status.get("online_status", 0) \
else base_status.get("lasttimepoint")
)
status_data = get_device_cardflow([dev_hash], protocol=protocol, presto=presto)
if status_data:
info.update(
netflow=status_data[dev_hash]
)
logger.debug("device hash: <{0}>, cache data: {1}".format(dev_hash, status_data))
self.rc.set(
name="{0}{1}".format(DEVICE_STATUS_INFO_CACHE_PRIFIX, dev_hash),
value=json.dumps(info)
)
except Exception as e:
logger.error(str(e), exc_info=True)
@exec_profile
@exception_handler(expect_value=None)
@tag("task_running")
def task(self):
if not self.devices:
return
timeout = len(self.devices) * 3
protocol_li = get_device_protocol(self.devices)
status_detail_li = get_device_status_detail(self.devices)
protocol_old = [key for key, value in protocol_li.iteritems() if value == 0]
protocol_new = list(set(protocol_li.keys()) - set(protocol_old))
new_device_parser = partial(self.deal_devices, protocol=1, status_detail_li=status_detail_li)
old_device_parser = partial(self.deal_devices, protocol=0, status_detail_li=status_detail_li)
fs_new = [self._pool.submit(new_device_parser, dev) for dev in protocol_new]
wait(fs_new, timeout=timeout, return_when=ALL_COMPLETED)
fs_old = [self._pool.submit(old_device_parser, dev) for dev in protocol_old]
wait(fs_old, timeout=timeout, return_when=ALL_COMPLETED)
logger.info("Sync Status Done.")
def stop(self, signum, frame):
self.running = False
self._pool.shutdown()
def run_worker(self):
data_job = threading.Thread(target=self.sync_device)
data_job.start()
while self.running:
self.task()
time.sleep(self.period)
def main():
options = parse_args()
time_period = options.PERIOD
mysql_conf = cm.get("mysql_setting")
redis_conf = cm.get("cache_redis_setting")
hv_conf = cm.get("hv")
runner = Worker(
mysql_conf=mysql_conf,
redis_conf=redis_conf,
hv_conf=hv_conf,
period=time_period
)
signal.signal(signal.SIGTERM, runner.stop)
signal.signal(signal.SIGINT, runner.stop)
signal.signal(signal.SIGQUIT, runner.stop)
runner.run_worker()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.